From 4df10b876a959d727f3162dc5349bfa95d90b32d Mon Sep 17 00:00:00 2001 From: Mykhailo Date: Thu, 30 May 2024 09:34:24 +0300 Subject: [PATCH] Added support of new features to the `step_functions` resource: - update - publish version - state machine aliases --- CHANGELOG.md | 7 + requirements.txt | 4 +- setup.py | 2 +- .../connection/step_functions_connection.py | 66 ++++- syndicate/core/constants.py | 3 +- .../step_function_generator.py | 2 + syndicate/core/groups/meta.py | 4 + .../core/resources/processors_mapping.py | 2 + .../core/resources/resources_provider.py | 1 + .../core/resources/step_functions_resource.py | 231 ++++++++++++++---- 10 files changed, 260 insertions(+), 62 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dd28a241..e6153577 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +# [1.12.0] - 2024-05-30 +- Raised version of libraries `boto3` and `botocore` +- Added support of new features to the `step_functions` resource: + * update + * publish version + * state machine aliases + # [1.11.6] - 2024-05-24 - Added support of custom authorizer names in Open API specification security schemes - Fixed quietness of errors while deploying/updating API Gateway via OpenAPI specification diff --git a/requirements.txt b/requirements.txt index 54b54114..46abb9c0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ click==7.1.2 -botocore==1.29.80 -boto3==1.26.80 +botocore==1.34.113 +boto3==1.34.113 tqdm==4.65.2 colorama==0.4.5 requests==2.31.0 diff --git a/setup.py b/setup.py index dceb41c2..2400a447 100644 --- a/setup.py +++ b/setup.py @@ -27,7 +27,7 @@ setup( name='aws-syndicate', - version='1.11.6', + version='1.12.0', packages=find_packages(), include_package_data=True, install_requires=[ diff --git a/syndicate/connection/step_functions_connection.py b/syndicate/connection/step_functions_connection.py index 7013e83b..45d5f7f5 100644 --- a/syndicate/connection/step_functions_connection.py +++ b/syndicate/connection/step_functions_connection.py @@ -36,12 +36,58 @@ def __init__(self, region=None, aws_access_key_id=None, aws_session_token=aws_session_token) _LOG.debug('Opened new Step Functions connection.') - def create_state_machine(self, machine_name, definition, role_arn): + def create_state_machine(self, machine_name, definition, role_arn, + publish_version=False, version_description=None): + params = { + 'name': machine_name, + 'roleArn': role_arn + } + + if isinstance(definition, dict): + params['definition'] = dumps(definition) + + if publish_version: + params['publish'] = True + if version_description: + params['versionDescription'] = str(version_description) + return self.client.create_state_machine(**params) + + def update_state_machine(self, machine_arn, definition, role_arn, + publish_version=False, version_description=None): + params = { + 'stateMachineArn': machine_arn, + 'roleArn': role_arn + } + if isinstance(definition, dict): - definition = dumps(definition) - return self.client.create_state_machine(name=machine_name, - definition=definition, - roleArn=role_arn) + params['definition'] = dumps(definition) + + if publish_version: + params['publish'] = True + if version_description: + params['versionDescription'] = str(version_description) + return self.client.update_state_machine(**params) + + def create_state_machine_alias(self, name, routing_config, + description=None): + params = { + 'name': name, + 'routingConfiguration': routing_config + } + if description: + params['description'] = description + response = self.client.create_state_machine_alias(**params) + return response['stateMachineAliasArn'] + + def update_state_machine_alias(self, arn, routing_config, + description=None): + params = { + 'stateMachineAliasArn': arn, + 'routingConfiguration': routing_config + } + if description: + params['description'] = description + self.client.update_state_machine_alias(**params) def describe_state_machine(self, arn): try: @@ -52,6 +98,16 @@ def describe_state_machine(self, arn): else: raise e + def describe_state_machine_alias(self, arn): + try: + return self.client.describe_state_machine_alias( + stateMachineAliasArn=arn) + except ClientError as e: + if 'ResourceNotFound' in str(e): + pass # valid exception + else: + raise e + def delete_state_machine(self, arn): return self.client.delete_state_machine(stateMachineArn=arn) diff --git a/syndicate/core/constants.py b/syndicate/core/constants.py index 62f09798..ac0a839e 100644 --- a/syndicate/core/constants.py +++ b/syndicate/core/constants.py @@ -150,7 +150,8 @@ EC2_LAUNCH_TEMPLATE_TYPE: 6, BATCH_JOBDEF_TYPE: 7, BATCH_COMPENV_TYPE: 8, - SWAGGER_UI_TYPE: 9 + SWAGGER_UI_TYPE: 9, + STEP_FUNCTION_TYPE: 14 } RESOURCE_LIST = list(DEPLOY_RESOURCE_TYPE_PRIORITY.keys()) diff --git a/syndicate/core/generators/deployment_resources/step_function_generator.py b/syndicate/core/generators/deployment_resources/step_function_generator.py index 8bd6b4b9..978e5cf7 100644 --- a/syndicate/core/generators/deployment_resources/step_function_generator.py +++ b/syndicate/core/generators/deployment_resources/step_function_generator.py @@ -18,4 +18,6 @@ class StepFunctionGenerator(BaseDeploymentResourceGenerator): "event_sources": list, "dependencies": list, "iam_role": None, + "publish_version": bool, + "alias": None } diff --git a/syndicate/core/groups/meta.py b/syndicate/core/groups/meta.py index bd667462..53b121e3 100644 --- a/syndicate/core/groups/meta.py +++ b/syndicate/core/groups/meta.py @@ -447,6 +447,10 @@ def sns_topic(ctx, **kwargs): help="Step function name") @click.option('--iam_role', type=str, required=True, help="IAM role to use for this state machine") +@click.option('--publish_version', type=bool, default=False, + help="Defines whether to publish the step function version") +@click.option('--alias', type=str, + help="Step function alias name") @verbose_option @click.pass_context @timeit() diff --git a/syndicate/core/resources/processors_mapping.py b/syndicate/core/resources/processors_mapping.py index 0973bf85..23ea4e65 100644 --- a/syndicate/core/resources/processors_mapping.py +++ b/syndicate/core/resources/processors_mapping.py @@ -273,6 +273,8 @@ def update_handlers(self): create_update_swagger_ui, EC2_LAUNCH_TEMPLATE_TYPE: self.resources_provider.ec2().update_launch_template, + STEP_FUNCTION_TYPE: + self.resources_provider.step_functions().update_state_machine } def resource_configuration_processor(self): diff --git a/syndicate/core/resources/resources_provider.py b/syndicate/core/resources/resources_provider.py index 9a8425a7..5fb60eef 100644 --- a/syndicate/core/resources/resources_provider.py +++ b/syndicate/core/resources/resources_provider.py @@ -279,6 +279,7 @@ def step_functions(self): iam_conn=self._conn_provider.iam(), cw_events_conn=self._conn_provider.cw_events(), lambda_conn=self._conn_provider.lambda_conn(), + lambda_res=self.lambda_resource(), region=self.config.region, account_id=self.config.account_id ) diff --git a/syndicate/core/resources/step_functions_resource.py b/syndicate/core/resources/step_functions_resource.py index a3627d6c..d182f137 100644 --- a/syndicate/core/resources/step_functions_resource.py +++ b/syndicate/core/resources/step_functions_resource.py @@ -23,23 +23,35 @@ from syndicate.core.resources.helper import (build_description_obj, validate_params) +DEFAULT_ROUTING_CONFIG_WEIGHT = 100 + _LOG = get_logger('core.resources.step_function_resource') class StepFunctionResource(BaseResource): def __init__(self, sf_conn, iam_conn, cw_events_conn, lambda_conn, - account_id, region) -> None: + lambda_res, account_id, region) -> None: self.sf_conn = sf_conn self.iam_conn = iam_conn self.cw_events_conn = cw_events_conn self.lambda_conn = lambda_conn + self.lambda_res = lambda_res self.account_id = account_id self.region = region + self.CREATE_TRIGGER = { + 'cloudwatch_rule_trigger': + self._create_cloud_watch_trigger_from_meta, + 'eventbridge_rule_trigger': + self._create_cloud_watch_trigger_from_meta + } def create_state_machine(self, args): return self.create_pool(self._create_state_machine_from_meta, args) + def update_state_machine(self, args): + return self.create_pool(self._update_state_machine, args) + def create_activities(self, args): return self.create_pool(self._create_activity_from_meta, args) @@ -92,6 +104,11 @@ def __remove_key_from_dict(self, obj, name): @unpack_kwargs def _create_state_machine_from_meta(self, name, meta): + definition = meta.get('definition', {}) + publish_version = meta.get('publish_version', False) + version_description = meta.get('version_description') + alias_name = meta.get('alias') + alias_description = meta.get('alias_description') arn = self._build_sm_arn(name, self.region) response = self.sf_conn.describe_state_machine(arn) if response: @@ -106,54 +123,118 @@ def _create_state_machine_from_meta(self, name, meta): raise AssertionError( 'IAM role {0} does not exist.'.format(iam_role)) - # check resource exists and get arn - definition = meta['definition'] - definition_copy = definition.copy() - for key in definition['States']: - definition_meta = definition['States'][key] - if definition_meta.get('Lambda'): - lambda_name = definition_meta['Lambda'] - # alias has a higher priority than version in arn resolving - lambda_version = definition_meta.get('Lambda_version') - lambda_alias = definition_meta.get('Lambda_alias') - lambda_arn = self.resolve_lambda_arn_by_version_and_alias( - lambda_name, - lambda_version, - lambda_alias) - self.__remove_key_from_dict(definition_copy['States'][key], - 'Lambda') - self.__remove_key_from_dict(definition_copy['States'][key], - 'Lambda_version') - self.__remove_key_from_dict(definition_copy['States'][key], - 'Lambda_alias') + machine_info = self.sf_conn.create_state_machine( + machine_name=name, + role_arn=role_arn, + definition=self._resolve_sm_definition(definition), + publish_version=publish_version, + version_description=version_description) - definition_copy['States'][key]['Resource'] = lambda_arn + alias_arn = None + if alias_name is not None: + if publish_version: + alias_arn = self._create_state_machine_alias( + name=alias_name, + version_arn=machine_info['stateMachineVersionArn'], + description=alias_description) + _LOG.debug(f"An alias with ARN '{alias_arn}' was created " + f"successfully.") + else: + _LOG.warn(f"The alias with the name '{alias_name}' can't be " + f"created because no publishing version is " + f"configured.") - if definition_meta.get('Activity'): - activity_name = definition_meta['Activity'] - activity_arn = 'arn:aws:states:{0}:{1}:activity:{2}'.format( - self.region, self.account_id, activity_name) - activity_info = self.sf_conn.describe_activity( - arn=activity_arn) - if not activity_info: - raise AssertionError('Activity does not exists: %s', - activity_name) - activity_arn = activity_info['activityArn'] - del definition_copy['States'][key]['Activity'] - definition_copy['States'][key]['Resource'] = activity_arn - machine_info = self.sf_conn.create_state_machine(machine_name=name, - role_arn=role_arn, - definition=definition_copy) - - event_sources = meta.get('event_sources') - if event_sources: - for trigger_meta in event_sources: - trigger_type = trigger_meta['resource_type'] - func = self.CREATE_TRIGGER[trigger_type] - func(name, trigger_meta) + event_sources = meta.get('event_sources', []) + self._process_event_sources( + sf_name=name, + event_sources=event_sources, + alias_name=alias_name if alias_arn else None) _LOG.info('Created state machine %s.', machine_info['stateMachineArn']) return self.describe_step_function(name=name, meta=meta, arn=arn) + def _create_state_machine_alias(self, name, version_arn, description=None): + routing_config = [ + { + 'stateMachineVersionArn': version_arn, + 'weight': DEFAULT_ROUTING_CONFIG_WEIGHT + } + ] + return self.sf_conn.create_state_machine_alias( + name=name, + routing_config=routing_config, + description=description + ) + + @unpack_kwargs + def _update_state_machine(self, name, meta, context): + definition = meta.get('definition', {}) + publish_version = meta.get('publish_version', False) + version_description = meta.get('version_description') + alias_name = meta.get('alias') + alias_description = meta.get('alias_description') + + sf_arn = self._build_sm_arn(name, self.region) + response = self.sf_conn.describe_state_machine(sf_arn) + if not response: + raise AssertionError(f"Step function with name '{name}' not found") + + iam_role = meta['iam_role'] + role_arn = self.iam_conn.check_if_role_exists(iam_role) + if not role_arn: + raise AssertionError( + 'IAM role {0} does not exist.'.format(iam_role)) + + machine_info = self.sf_conn.update_state_machine( + machine_arn=sf_arn, + role_arn=role_arn, + definition=self._resolve_sm_definition(definition), + publish_version=publish_version, + version_description=version_description) + + alias_arn = None + if alias_name is not None: + if publish_version: + alias_arn = f"{sf_arn}:{alias_name}" + if self.sf_conn.describe_state_machine_alias(alias_arn): + self._update_state_machine_alias( + arn=alias_arn, + version_arn=machine_info['stateMachineVersionArn'], + description=alias_description) + _LOG.debug(f"An alias with ARN '{alias_arn}' was updated " + f"successfully.") + else: + alias_arn = self._create_state_machine_alias( + name=alias_name, + version_arn=machine_info['stateMachineVersionArn'], + description=alias_description) + _LOG.debug(f"An alias with ARN '{alias_arn}' was created " + f"successfully.") + else: + _LOG.warn(f"The alias with the name '{alias_name}' can't be " + f"updated because no publishing version is " + f"configured.") + + event_sources = meta.get('event_sources', []) + self._process_event_sources( + sf_name=name, + event_sources=event_sources, + alias_name=alias_name if alias_arn else None) + _LOG.info(f'Updated state machine {sf_arn}.') + return self.describe_step_function(name=name, meta=meta, arn=sf_arn) + + def _update_state_machine_alias(self, arn, version_arn, description=None): + routing_config = [ + { + 'stateMachineVersionArn': version_arn, + 'weight': DEFAULT_ROUTING_CONFIG_WEIGHT + } + ] + return self.sf_conn.update_state_machine_alias( + arn=arn, + routing_config=routing_config, + description=description + ) + def describe_step_function(self, name, meta, arn=None): if not arn: arn = self._build_sm_arn(name, self.region) @@ -165,7 +246,14 @@ def describe_step_function(self, name, meta, arn=None): def _build_sm_arn(self, name, region): return f'arn:aws:states:{region}:{self.account_id}:stateMachine:{name}' - def _create_cloud_watch_trigger_from_meta(self, name, trigger_meta): + def _process_event_sources(self, sf_name, event_sources, alias_name=None): + for trigger_meta in event_sources: + trigger_type = trigger_meta['resource_type'] + func = self.CREATE_TRIGGER[trigger_type] + func(sf_name, trigger_meta, alias_name) + + def _create_cloud_watch_trigger_from_meta(self, name, trigger_meta, + alias_name=None): required_parameters = ['target_rule', 'input', 'iam_role'] validate_params(name, trigger_meta, required_parameters) rule_name = trigger_meta['target_rule'] @@ -173,21 +261,19 @@ def _create_cloud_watch_trigger_from_meta(self, name, trigger_meta): sf_role = trigger_meta['iam_role'] sf_arn = self._build_sm_arn(name, self.region) + target_arn = f'{sf_arn}:{alias_name}' if alias_name else sf_arn sf_description = self.sf_conn.describe_state_machine(arn=sf_arn) if sf_description.get('status') == 'ACTIVE': sf_role_arn = self.iam_conn.check_if_role_exists(sf_role) if sf_role_arn: - self.cw_events_conn.add_rule_sf_target(rule_name, sf_arn, - input, - sf_role_arn) + self.cw_events_conn.add_rule_sf_target( + rule_name=rule_name, + target_arn=target_arn, + input=input, + role_arn=sf_role_arn) _LOG.info('State machine %s subscribed to cloudwatch rule %s', name, rule_name) - CREATE_TRIGGER = { - 'cloudwatch_rule_trigger': _create_cloud_watch_trigger_from_meta, - 'eventbridge_rule_trigger': _create_cloud_watch_trigger_from_meta - } - @unpack_kwargs def _create_activity_from_meta(self, name, meta): arn = self.build_activity_arn(name=name) @@ -215,3 +301,42 @@ def build_activity_arn(self, name): self.account_id, name) return arn + + def _resolve_sm_definition(self, definition): + # check resource exists and get arn + definition_copy = definition.copy() + for key in definition['States']: + definition_meta = definition['States'][key] + if definition_meta.get('Lambda'): + lambda_name = definition_meta['Lambda'] + # alias has a higher priority than version in arn resolving + lambda_version = definition_meta.get('Lambda_version') + lambda_alias = definition_meta.get('Lambda_alias') + lambda_arn = \ + self.lambda_res.resolve_lambda_arn_by_version_and_alias( + lambda_name, + lambda_version, + lambda_alias) + self.__remove_key_from_dict(definition_copy['States'][key], + 'Lambda') + self.__remove_key_from_dict(definition_copy['States'][key], + 'Lambda_version') + self.__remove_key_from_dict(definition_copy['States'][key], + 'Lambda_alias') + + definition_copy['States'][key]['Resource'] = lambda_arn + + if definition_meta.get('Activity'): + activity_name = definition_meta['Activity'] + activity_arn = 'arn:aws:states:{0}:{1}:activity:{2}'.format( + self.region, self.account_id, activity_name) + activity_info = self.sf_conn.describe_activity( + arn=activity_arn) + if not activity_info: + raise AssertionError('Activity does not exists: %s', + activity_name) + activity_arn = activity_info['activityArn'] + del definition_copy['States'][key]['Activity'] + definition_copy['States'][key]['Resource'] = activity_arn + + return definition_copy