From b3f41eb89ea4170581eedad64fc86194abdcea54 Mon Sep 17 00:00:00 2001 From: SooLee Date: Fri, 21 May 2021 13:21:31 -0400 Subject: [PATCH 1/5] soft kill --- tests/tibanna/unicorn/test_job.py | 51 +++++++++++- tibanna/__main__.py | 20 +++-- tibanna/_version.py | 2 +- tibanna/check_task.py | 10 +++ tibanna/core.py | 131 ++++++------------------------ tibanna/exceptions.py | 3 + tibanna/job.py | 123 ++++++++++++++++++++++------ 7 files changed, 205 insertions(+), 135 deletions(-) diff --git a/tests/tibanna/unicorn/test_job.py b/tests/tibanna/unicorn/test_job.py index c1248ec9..ebee04a4 100644 --- a/tests/tibanna/unicorn/test_job.py +++ b/tests/tibanna/unicorn/test_job.py @@ -4,6 +4,12 @@ from tibanna.vars import DYNAMODB_TABLE, EXECUTION_ARN import mock import boto3 +import pytest + + +@pytest.fixture +def raise_error(): + raise Exception() def test_stepfunction_exists(): @@ -20,7 +26,7 @@ def test_stepfunction_exists2(): def test_job_check_output(): job = Job(job_id='jid1') - job.exec_desc = {'output': '{"somefield": "someoutput"}'} + job._exec_desc = {'output': '{"somefield": "someoutput"}'} with mock.patch('tibanna.job.Job.check_status', return_value='SUCCEEDED'): res = job.check_output() assert res == {'somefield': 'someoutput'} @@ -102,3 +108,46 @@ def test_add_to_dd_and_info(): assert info['Job Id'] == jobid assert info['Log Bucket'] == logbucket assert 'Time Stamp' in info + + +def test_exec_arn(): + assert Job(exec_arn='somearn').exec_arn == 'somearn' + with mock.patch('tibanna.job.Job.get_exec_arn_from_job_id', return_value='somearn'): + assert Job(job_id='somejobid').exec_arn == 'somearn' + + +def test_job_id(): + assert Job(job_id='somejobid').job_id == 'somejobid' + with mock.patch('tibanna.job.Job.get_job_id_from_exec_arn', return_value='somejobid'): + assert Job(exec_arn='somearn').job_id == 'somejobid' + + +def test_log_bucket(): + job = Job(job_id='somejobid') + job._log_bucket = 'somebucket' + assert job.log_bucket == 'somebucket' + + +def test_log_bucket_from_dd(): + job = Job(job_id='somejobid') + with mock.patch('tibanna.job.Job.get_log_bucket_from_job_id', return_value='somebucket'): + assert job.log_bucket == 'somebucket' + + +def test_log_bucket_dd2(): + job = Job(job_id='somejobid') + with mock.patch('tibanna.job.Job.info', return_value={'Log Bucket': 'somebucket'}): + assert job.log_bucket == 'somebucket' + + +def test_log_bucket_no_dd(): + job = Job(job_id='somejobid') + job.sfn = 'somesfn' + with mock.patch('tibanna.job.Job.info', return_value={}): + with mock.patch('tibanna.job.Job.get_log_bucket_from_job_id_and_sfn_wo_dd', return_value='somebucket'): + assert job.log_bucket == 'somebucket' + + +def test_get_log_bucket_from_job_id(): + with mock.patch('tibanna.job.Job.info', return_value={'job_id': 'somejobid', 'Log Bucket': 'somelogbucket'}): + assert Job.get_log_bucket_from_job_id(job_id='somejobid') == 'somelogbucket' diff --git a/tibanna/__main__.py b/tibanna/__main__.py index 5c12a8fd..c725d8f3 100755 --- a/tibanna/__main__.py +++ b/tibanna/__main__.py @@ -101,12 +101,20 @@ def args(self): {'flag': ["-s", "--sfn"], 'help': "tibanna step function name (e.g. 'tibanna_unicorn_monty'); " + "your current default is %s)" % TIBANNA_DEFAULT_STEP_FUNCTION_NAME, - 'default': TIBANNA_DEFAULT_STEP_FUNCTION_NAME}], + 'default': TIBANNA_DEFAULT_STEP_FUNCTION_NAME}, + {'flag': ["-z", "--soft"], + 'help': "instead of directly killing the execution, " + + "send abort signal to s3 so that step function can handle it", + 'action': "store_true"}], 'kill_all': [{'flag': ["-s", "--sfn"], 'help': "tibanna step function name (e.g. 'tibanna_unicorn_monty'); " + "your current default is %s)" % TIBANNA_DEFAULT_STEP_FUNCTION_NAME, - 'default': TIBANNA_DEFAULT_STEP_FUNCTION_NAME}], + 'default': TIBANNA_DEFAULT_STEP_FUNCTION_NAME}, + {'flag': ["-z", "--soft"], + 'help': "instead of directly killing the execution, " + + "send abort signal to s3 so that step function can handle it", + 'action': "store_true"}], 'log': [{'flag': ["-e", "--exec-arn"], 'help': "execution arn of the specific job to log"}, @@ -407,14 +415,14 @@ def log(exec_arn=None, job_id=None, exec_name=None, sfn=TIBANNA_DEFAULT_STEP_FUN top=top, top_latest=top_latest)) -def kill_all(sfn=TIBANNA_DEFAULT_STEP_FUNCTION_NAME): +def kill_all(sfn=TIBANNA_DEFAULT_STEP_FUNCTION_NAME, soft=False): """kill all the running jobs on a step function""" - API().kill_all(sfn) + API().kill_all(sfn, soft=soft) -def kill(exec_arn=None, job_id=None, sfn=TIBANNA_DEFAULT_STEP_FUNCTION_NAME): +def kill(exec_arn=None, job_id=None, sfn=TIBANNA_DEFAULT_STEP_FUNCTION_NAME, soft=False): """kill a specific job""" - API().kill(exec_arn, job_id, sfn) + API().kill(exec_arn, job_id, sfn, soft=soft) def info(job_id): diff --git a/tibanna/_version.py b/tibanna/_version.py index 6ba8aac2..85dae371 100755 --- a/tibanna/_version.py +++ b/tibanna/_version.py @@ -1,4 +1,4 @@ """Version information.""" # The following line *must* be the last in the module, exactly as formatted: -__version__ = "1.2.8" +__version__ = "1.3.0" diff --git a/tibanna/check_task.py b/tibanna/check_task.py index 4e58f694..c6c6e143 100755 --- a/tibanna/check_task.py +++ b/tibanna/check_task.py @@ -20,6 +20,7 @@ EC2UnintendedTerminationException, EC2IdleException, MetricRetrievalException, + JobAbortedException, AWSEMErrorHandler ) from .vars import PARSE_AWSEM_TIME @@ -53,6 +54,7 @@ def run(self): job_started = "%s.job_started" % jobid job_success = "%s.success" % jobid job_error = "%s.error" % jobid + job_aborted = "%s.aborted" % jobid public_postrun_json = self.input_json['config'].get('public_postrun_json', False) @@ -69,6 +71,14 @@ def run(self): raise EC2IdleException("Failed to find jobid %s, ec2 is not initializing for too long. Terminating the instance." % jobid) raise EC2StartingException("Failed to find jobid %s, ec2 is probably still booting" % jobid) + # check to see if job has been aborted (by user or admin) + if does_key_exist(bucket_name, job_aborted): + try: + self.handle_postrun_json(bucket_name, jobid, self.input_json, public_read=public_postrun_json) + except Exception as e: + logger.warning("error occurred while handling postrun json but continuing. %s" % str(e)) + raise JobAbortedException("job aborted") + # check to see if job has error, report if so if does_key_exist(bucket_name, job_error): try: diff --git a/tibanna/core.py b/tibanna/core.py index 4d7c269a..dc8938ee 100755 --- a/tibanna/core.py +++ b/tibanna/core.py @@ -287,79 +287,37 @@ def info(self, job_id): '''returns content from dynamodb for a given job id in a dictionary form''' return Job.info(job_id) - def kill(self, exec_arn=None, job_id=None, sfn=None): - sf = boto3.client('stepfunctions') - if exec_arn: - desc = sf.describe_execution(executionArn=exec_arn) - if desc['status'] == 'RUNNING': - jobid = str(json.loads(desc['input'])['jobid']) - ec2 = boto3.resource('ec2') - terminated = None - for i in ec2.instances.all(): - if i.tags: - for tag in i.tags: - if tag['Key'] == 'Type' and tag['Value'] != 'awsem': - continue - if tag['Key'] == 'Name' and tag['Value'] == 'awsem-' + jobid: - logger.info("terminating EC2 instance") - response = i.terminate() - logger.info("Successfully terminated instance: " + str(response)) - terminated = True - break - if terminated: - break - logger.info("terminating step function execution") - resp_sf = sf.stop_execution(executionArn=exec_arn, error="Aborted") - logger.info("Successfully terminated step function execution: " + str(resp_sf)) - elif job_id: + def kill(self, exec_arn=None, job_id=None, sfn=None, soft=False): + """kills a running job, identified by either execution arn (exec_arn) or + job_id, (or job_id and sfn, for old tibanna versions). + This function kills both ec2 instance and the step function. + if soft is set True, it will not directly kill the step function + but instead sends a abort signal to S3 and let the step fucntion handle + the signal. The abort signal handling is available for >=1.3.0. + """ + job = Job(exec_arn=exec_arn, job_id=job_id, sfn=sfn) + if job.check_status() == 'RUNNING': + # kill awsem ec2 instance ec2 = boto3.client('ec2') - res = ec2.describe_instances(Filters=[{'Name': 'tag:Name', 'Values': ['awsem-' + job_id]}]) + res = ec2.describe_instances(Filters=[{'Name': 'tag:Name', 'Values': ['awsem-' + job.job_id]}]) if not res['Reservations']: raise("instance not available - if you just submitted the job, try again later") instance_id = res['Reservations'][0]['Instances'][0]['InstanceId'] logger.info("terminating EC2 instance") resp_term = ec2.terminate_instances(InstanceIds=[instance_id]) logger.info("Successfully terminated instance: " + str(resp_term)) - # first try dynanmodb to get logbucket - ddres = dict() - try: - dd = boto3.client('dynamodb') - ddres = dd.query(TableName=DYNAMODB_TABLE, - KeyConditions={'Job Id': {'AttributeValueList': [{'S': job_id}], - 'ComparisonOperator': 'EQ'}}) - except Exception as e: - pass - if 'Items' in ddres: - exec_name = ddres['Items'][0]['Execution Name']['S'] - sfn = ddres['Items'][0]['Step Function']['S'] - exec_arn = EXECUTION_ARN(exec_name, sfn) + + if soft: + logger.info("sending abort signal to s3") + s3 = boto3.client('s3') + s3.put_object(Body=b'', Bucket=job.log_bucket, Key=job.job_id + '.aborted') + logger.info("Successfully sent abort signal") else: - if not sfn: - logger.warning("Can't stop step function because step function name is not given.") - return None - stateMachineArn = STEP_FUNCTION_ARN(sfn) - res = sf.list_executions(stateMachineArn=stateMachineArn, statusFilter='RUNNING') - exec_arn = None - while True: - if 'executions' not in res or not res['executions']: - break - for exc in res['executions']: - desc = sf.describe_execution(executionArn=exc['executionArn']) - if job_id == str(json.loads(desc['input'])['jobid']): - exec_arn = exc['executionArn'] - break - if exec_arn: - break - if 'nextToken' in res: - res = sf.list_executions(nextToken=res['nextToken'], - stateMachineArn=stateMachineArn, statusFilter='RUNNING') - else: - break - if not exec_arn: - raise Exception("can't find the execution") - logger.info("terminating step function execution") - resp_sf = sf.stop_execution(executionArn=exec_arn, error="Aborted") - logger.info("Successfully terminated step function execution: " + str(resp_sf)) + # kill step function execution + logger.info("terminating step function execution") + sf = boto3.client('stepfunctions') + resp_sf = sf.stop_execution(executionArn=job.exec_arn, error="Aborted") + logger.info("Successfully terminated step function execution: " + str(resp_sf)) def kill_all(self, sfn=None): """killing all the running jobs""" @@ -399,46 +357,9 @@ def log(self, exec_arn=None, job_id=None, exec_name=None, sfn=None, sf = boto3.client('stepfunctions') if not exec_arn and exec_name: exec_arn = EXECUTION_ARN(exec_name, sfn) - if exec_arn: - desc = sf.describe_execution(executionArn=exec_arn) - job_id = str(json.loads(desc['input'])['jobid']) - if not logbucket: - logbucket = str(json.loads(desc['input'])['config']['log_bucket']) - elif job_id: - if not logbucket: - # first try dynanmodb to get logbucket - try: - logbucket = self.info(job_id)['Log Bucket'] - except Exception as e: - pass - if not logbucket: - # search through executions to get logbucket - stateMachineArn = STEP_FUNCTION_ARN(sfn) - try: - res = sf.list_executions(stateMachineArn=stateMachineArn) - while True: - if 'executions' not in res or not res['executions']: - break - breakwhile = False - for exc in res['executions']: - desc = sf.describe_execution(executionArn=exc['executionArn']) - if job_id == str(json.loads(desc['input'])['jobid']): - logbucket = str(json.loads(desc['input'])['config']['log_bucket']) - breakwhile = True - break - if breakwhile: - break - if 'nextToken' in res: - res = sf.list_executions(nextToken=res['nextToken'], - stateMachineArn=stateMachineArn) - else: - break - except: - raise Exception("Cannot retrieve job. Try again later.") - else: - raise Exception("Either job_id, exec_arn or exec_name must be provided.") + job = Job(exec_arn=exec_arn, job_id=job_id, sfn=sfn) try: - res_s3 = boto3.client('s3').get_object(Bucket=logbucket, Key=job_id + suffix) + res_s3 = boto3.client('s3').get_object(Bucket=job.log_bucket, Key=job.job_id + suffix) except Exception as e: if 'NoSuchKey' in str(e): if not quiet: @@ -597,6 +518,8 @@ def count_status(self, sfn_arn, client): def clear_input_json_template(self, input_json_template): """clear awsem template for reuse""" + if 'jobid' in input_json_template: + del input_json_template['jobid'] if 'response' in input_json_template['_tibanna']: del(input_json_template['_tibanna']['response']) if 'run_name' in input_json_template['_tibanna'] and len(input_json_template['_tibanna']['run_name']) > 40: diff --git a/tibanna/exceptions.py b/tibanna/exceptions.py index 74dccac4..6f04495a 100755 --- a/tibanna/exceptions.py +++ b/tibanna/exceptions.py @@ -139,3 +139,6 @@ def __init__(self, message=None): message = "CWL draft3 is no longer supported. Please switched to v1" super().__init__(message) + +class JobAbortedException(Exception): + pass diff --git a/tibanna/job.py b/tibanna/job.py index 9b6e6911..99ef0fa0 100644 --- a/tibanna/job.py +++ b/tibanna/job.py @@ -36,16 +36,29 @@ def status(job_ids=None, exec_arns=None): class Job(object): def __init__(self, job_id=None, exec_arn=None, sfn=None): + """A job can be identified with either a job_id or an exec_arn. + For old tibanna versions (with no dynamoDB deployed), + a job can be identified with a job_id and sfn. + """ if not job_id and not exec_arn: raise Exception("Provide either through job id or execution arn to retrieve a job.") - self.job_id = job_id - self.exec_arn = exec_arn + self._job_id = job_id + self._exec_arn = exec_arn self.sfn = sfn # only for old tibanna - self.exec_desc = None - self.log_bucket = None + self._exec_desc = None + self._log_bucket = None self.costupdater_exec_arn = None self.costupdater_exec_desc = None + # cache for boto3 client for step function + self._client_sfn = None + + @property + def client_sfn(self): + if not self._client_sfn: + self._client_sfn = boto3.client('sfn', region=AWS_REGION) + return self._client_sfn + def check_costupdater_status(self): self.update_costupdater_exec_desc() return self.costupdater_exec_desc['status'] @@ -53,25 +66,98 @@ def check_costupdater_status(self): def check_status(self): '''checking status of an execution. It works only if the execution info is still in the step function.''' - self.update_exec_desc() return self.exec_desc['status'] def check_output(self): '''checking status of an execution first and if it's success, get output. It works only if the execution info is still in the step function.''' if self.check_status() == 'SUCCEEDED': - self.update_exec_desc() if 'output' in self.exec_desc: return json.loads(self.exec_desc['output']) else: return None - def update_exec_desc(self): + @property + def exec_arn(self): + if self._exec_arn: + return self._exec_arn + elif self.job_id: # figure out exec_arn frmo job_id + try: + self._exec_arn = self.get_exec_arn_from_job_id(self.job_id) + except Exception as e: + if self.sfn: + self._exec_arn = self.get_exec_arn_from_job_id_and_sfn_wo_dd(self.job_id, sfn=self.sfn) + else: + raise e + return self._exec_arn + + @property + def exec_desc(self): """sfn is needed only for old tibanna """ - if not self.exec_desc: - self.update_exec_arn_from_job_id() - self.exec_desc = self.describe_exec(self.exec_arn) + if not self._exec_desc: + self._exec_desc = self.describe_exec(self.exec_arn) + return self._exec_desc + + @property + def job_id(self): + if not self._job_id: + if self.exec_arn: + self._job_id = self.get_job_id_from_exec_arn(self.exec_arn) + else: + raise("Can't find job_id - either provide job_id or exec_arn") + return self._job_id + + @property + def log_bucket(self): + if not self._log_bucket: + if self.job_id: + try: + # first try dynanmodb to get logbucket + self._log_bucket = self.get_log_bucket_from_job_id(self.job_id) + except Exception as e: + if self.sfn: + self._log_bucket = self.get_log_bucket_from_job_id_and_sfn_wo_dd(self.job_id, self.sfn) + if not self._log_bucket: + raise Exception("Cannot retrieve log bucket.") + return self._log_bucket + + @classmethod + def get_log_bucket_from_job_id(cls, job_id): + return cls.info(job_id)['Log Bucket'] + + @staticmethod + def get_log_bucket_from_job_id_and_sfn_wo_dd(job_id, sfn): + stateMachineArn = STEP_FUNCTION_ARN(sfn) + sf = boto3.client('stepfunctions') + res = sf.list_executions(stateMachineArn=stateMachineArn) + while True: + if 'executions' not in res or not res['executions']: + break + breakwhile = False + for exc in res['executions']: + desc = sf.describe_execution(executionArn=exc['executionArn']) + if job_id == str(json.loads(desc['input'])['jobid']): + logbucket = str(json.loads(desc['input'])['config']['log_bucket']) + breakwhile = True + break + if breakwhile: + break + if 'nextToken' in res: + res = sf.list_executions(nextToken=res['nextToken'], + stateMachineArn=stateMachineArn) + else: + break + if logbucket: + return logbucket + else: + raise Exception("Cannot retrieve log bucket.") + + @staticmethod + def get_job_id_from_exec_arn(exec_arn): + sf = boto3.client('stepfunctions') + desc = sf.describe_execution(executionArn=exec_arn) + return str(json.loads(desc['input'])['jobid']) def update_costupdater_exec_desc(self): if not self.costupdater_exec_desc: @@ -83,16 +169,6 @@ def update_costupdater_exec_arn_from_job_id(self): self.costupdater_exec_arn = self.get_costupdater_exec_arn_from_job_id(self.job_id) # older tibanna does not have cost updater so we don't need to try the old way of doing it without dd. - def update_exec_arn_from_job_id(self): - if self.job_id and not self.exec_arn: - try: - self.exec_arn = self.get_exec_arn_from_job_id(self.job_id) - except Exception as e: - if self.sfn: - self.exec_arn = self.get_exec_arn_from_job_id_and_sfn_wo_dd(self.job_id, sfn=self.sfn) - else: - raise e - @staticmethod def stepfunction_exists(sfn_name): sf = boto3.client('stepfunctions') @@ -160,14 +236,15 @@ def get_exec_arn_from_job_id_and_sfn_wo_dd(job_id, sfn): @classmethod def info(cls, job_id): - '''returns content from dynamodb for a given job id in a dictionary form''' + '''returns content from dynamodb for a given job id in a dictionary form. + returns None if the entry does not exist in dynamoDB''' ddres = cls.get_dd(job_id) return cls.get_info_from_dd(ddres) @staticmethod def describe_exec(exec_arn): - sts = boto3.client('stepfunctions', region_name=AWS_REGION) - return sts.describe_execution(executionArn=exec_arn) + sf = boto3.client('stepfunctions', region_name=AWS_REGION) + return sf.describe_execution(executionArn=exec_arn) @staticmethod def get_dd(job_id): From 8ece559358651057fb38c0bae2cf60162c5a5bbe Mon Sep 17 00:00:00 2001 From: SooLee Date: Fri, 21 May 2021 14:04:21 -0400 Subject: [PATCH 2/5] minor fix --- tibanna/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tibanna/core.py b/tibanna/core.py index dc8938ee..e4420367 100755 --- a/tibanna/core.py +++ b/tibanna/core.py @@ -301,7 +301,7 @@ def kill(self, exec_arn=None, job_id=None, sfn=None, soft=False): ec2 = boto3.client('ec2') res = ec2.describe_instances(Filters=[{'Name': 'tag:Name', 'Values': ['awsem-' + job.job_id]}]) if not res['Reservations']: - raise("instance not available - if you just submitted the job, try again later") + raise Exception("instance not available - if you just submitted the job, try again later") instance_id = res['Reservations'][0]['Instances'][0]['InstanceId'] logger.info("terminating EC2 instance") resp_term = ec2.terminate_instances(InstanceIds=[instance_id]) From a299d481e6721bda4c77b63a7cc33944c60dfc5b Mon Sep 17 00:00:00 2001 From: SooLee Date: Fri, 21 May 2021 20:31:54 -0400 Subject: [PATCH 3/5] quiet option for deploy --- .../unicorn/check_task_awsem/test_handler.py | 16 ++++++- tibanna/__main__.py | 19 +++++--- tibanna/core.py | 45 ++++++++++++------- 3 files changed, 57 insertions(+), 23 deletions(-) diff --git a/tests/tibanna/unicorn/check_task_awsem/test_handler.py b/tests/tibanna/unicorn/check_task_awsem/test_handler.py index 7740febe..b65468d1 100755 --- a/tests/tibanna/unicorn/check_task_awsem/test_handler.py +++ b/tests/tibanna/unicorn/check_task_awsem/test_handler.py @@ -3,7 +3,8 @@ EC2StartingException, StillRunningException, MetricRetrievalException, - EC2IdleException + EC2IdleException, + JobAbortedException ) import pytest import boto3 @@ -59,6 +60,19 @@ def test_check_task_awsem_fails_if_no_job_started_for_too_long(check_task_input, assert 'Failed to find jobid' in str(excinfo.value) +def test_check_task_awsem_aborted(check_task_input, s3): + jobid = 'lalala' + check_task_input_modified = check_task_input + check_task_input_modified['jobid'] = jobid + job_started = "%s.job_started" % jobid + job_aborted = "%s.aborted" % jobid + s3.put_object(Body=b'', Key=job_started) + s3.put_object(Body=b'', Key=job_aborted) + with pytest.raises(JobAbortedException) as excinfo: + service.handler(check_task_input, '') + assert 'aborted' in str(excinfo.value) + + @pytest.mark.webtest def test_check_task_awsem_throws_exception_if_not_done(check_task_input): with pytest.raises(StillRunningException) as excinfo: diff --git a/tibanna/__main__.py b/tibanna/__main__.py index c725d8f3..a492013a 100755 --- a/tibanna/__main__.py +++ b/tibanna/__main__.py @@ -273,7 +273,10 @@ def args(self): {'flag': ["-P", "--do-not-delete-public-access-block"], 'action': "store_true", 'help': "Do not delete public access block from buckets" + - "(this way postrunjson and metrics reports will not be public)"}], + "(this way postrunjson and metrics reports will not be public)"}, + {'flag': ["-q", "--quiet"], + 'action': "store_true", + 'help': "minimize standard output from deployment"}], 'deploy_core': [{'flag': ["-n", "--name"], 'help': "name of the lambda function to deploy (e.g. run_task_awsem)"}, @@ -282,7 +285,10 @@ def args(self): "Lambda function, within the same usergroup"}, {'flag': ["-g", "--usergroup"], 'default': '', - 'help': "Tibanna usergroup for the AWS Lambda function"}], + 'help': "Tibanna usergroup for the AWS Lambda function"}, + {'flag': ["-q", "--quiet"], + 'action': "store_true", + 'help': "minimize standard output from deployment"}], 'plot_metrics': [{'flag': ["-j", "--job-id"], 'help': "job id of the specific job to log (alternative to --exec-arn/-e)"}, @@ -359,11 +365,11 @@ def args(self): } -def deploy_core(name, suffix=None, usergroup=''): +def deploy_core(name, suffix=None, usergroup='', quiet=False): """ New method of deploying packaged lambdas (BETA) """ - API().deploy_core(name=name, suffix=suffix, usergroup=usergroup) + API().deploy_core(name=name, suffix=suffix, usergroup=usergroup, quiet=quiet) def run_workflow(input_json, sfn=TIBANNA_DEFAULT_STEP_FUNCTION_NAME, jobid='', do_not_open_browser=False, sleep=3): @@ -386,11 +392,12 @@ def setup_tibanna_env(buckets='', usergroup_tag='default', no_randomize=False, def deploy_unicorn(suffix=None, no_setup=False, buckets='', - no_setenv=False, usergroup='', do_not_delete_public_access_block=False, deploy_costupdater = False): + no_setenv=False, usergroup='', do_not_delete_public_access_block=False, + deploy_costupdater=False, quiet=False): """deploy tibanna unicorn to AWS cloud""" API().deploy_unicorn(suffix=suffix, no_setup=no_setup, buckets=buckets, no_setenv=no_setenv, usergroup=usergroup, do_not_delete_public_access_block=do_not_delete_public_access_block, - deploy_costupdater=deploy_costupdater) + deploy_costupdater=deploy_costupdater, quiet=quiet) def add_user(user, usergroup): diff --git a/tibanna/core.py b/tibanna/core.py index e4420367..16704ddd 100755 --- a/tibanna/core.py +++ b/tibanna/core.py @@ -634,7 +634,7 @@ def env_list(self, name): ) return envlist.get(name, '') - def deploy_lambda(self, name, suffix, usergroup=''): + def deploy_lambda(self, name, suffix, usergroup='', quiet=False): """ deploy a single lambda using the aws_lambda.deploy_function (BETA) """ @@ -683,20 +683,32 @@ def deploy_lambda(self, name, suffix, usergroup=''): full_function_name = function_name_prefix if name not in self.do_not_delete: try: - boto3.client('lambda').get_function(FunctionName=full_function_name) - logger.info("deleting existing lambda") - boto3.client('lambda').delete_function(FunctionName=full_function_name) + if quiet: + res = boto3.client('lambda').get_function(FunctionName=full_function_name) + logger.info("deleting existing lambda") + res = boto3.client('lambda').delete_function(FunctionName=full_function_name) + else: + boto3.client('lambda').get_function(FunctionName=full_function_name) + logger.info("deleting existing lambda") + boto3.client('lambda').delete_function(FunctionName=full_function_name) except Exception as e: if 'Function not found' in str(e): pass - aws_lambda.deploy_function(lambda_fxn_module, - function_name_suffix=function_name_suffix, - package_objects=self.tibanna_packages, - requirements_fpath=requirements_fpath, - extra_config=extra_config) + if quiet: + res = aws_lambda.deploy_function(lambda_fxn_module, + function_name_suffix=function_name_suffix, + package_objects=self.tibanna_packages, + requirements_fpath=requirements_fpath, + extra_config=extra_config) + else: + aws_lambda.deploy_function(lambda_fxn_module, + function_name_suffix=function_name_suffix, + package_objects=self.tibanna_packages, + requirements_fpath=requirements_fpath, + extra_config=extra_config) - def deploy_core(self, name, suffix=None, usergroup=''): + def deploy_core(self, name, suffix=None, usergroup='', quiet=False): """deploy/update lambdas only""" logger.info("preparing for deploy...") if name == 'all': @@ -706,7 +718,7 @@ def deploy_core(self, name, suffix=None, usergroup=''): else: names = [name, ] for name in names: - self.deploy_lambda(name, suffix, usergroup) + self.deploy_lambda(name, suffix, usergroup, quiet=quiet) def setup_tibanna_env(self, buckets='', usergroup_tag='default', no_randomize=False, do_not_delete_public_access_block=False, verbose=False): @@ -742,7 +754,7 @@ def setup_tibanna_env(self, buckets='', usergroup_tag='default', no_randomize=Fa def deploy_tibanna(self, suffix=None, usergroup='', setup=False, buckets='', setenv=False, do_not_delete_public_access_block=False, - deploy_costupdater=False): + deploy_costupdater=False, quiet=False): """deploy tibanna unicorn or pony to AWS cloud (pony is for 4DN-DCIC only)""" if setup: if usergroup: @@ -765,21 +777,22 @@ def deploy_tibanna(self, suffix=None, usergroup='', setup=False, outfile.write("\nexport TIBANNA_DEFAULT_STEP_FUNCTION_NAME=%s\n" % step_function_name) logger.info("deploying lambdas...") - self.deploy_core('all', suffix=suffix, usergroup=usergroup) + self.deploy_core('all', suffix=suffix, usergroup=usergroup, quiet=quiet) if(deploy_costupdater): - self.deploy_lambda(self.update_cost_lambda, suffix=suffix, usergroup=usergroup) + self.deploy_lambda(self.update_cost_lambda, suffix=suffix, usergroup=usergroup, quiet=quiet) dd_utils.create_dynamo_table(DYNAMODB_TABLE, DYNAMODB_KEYNAME) return step_function_name def deploy_unicorn(self, suffix=None, no_setup=False, buckets='', - no_setenv=False, usergroup='', do_not_delete_public_access_block=False, deploy_costupdater = False): + no_setenv=False, usergroup='', do_not_delete_public_access_block=False, + deploy_costupdater=False, quiet=False): """deploy tibanna unicorn to AWS cloud""" self.deploy_tibanna(suffix=suffix, usergroup=usergroup, setup=not no_setup, buckets=buckets, setenv=not no_setenv, do_not_delete_public_access_block=do_not_delete_public_access_block, - deploy_costupdater=deploy_costupdater) + deploy_costupdater=deploy_costupdater, quiet=quiet) def add_user(self, user, usergroup): """add a user to a tibanna group""" From 328306c8e0529c3f02cb2a95c1a659c126e5775d Mon Sep 17 00:00:00 2001 From: SooLee Date: Thu, 27 May 2021 10:27:04 -0400 Subject: [PATCH 4/5] exec_arn dependency error fix --- tests/tibanna/unicorn/test_job.py | 9 +++++++++ tibanna/job.py | 2 +- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/tests/tibanna/unicorn/test_job.py b/tests/tibanna/unicorn/test_job.py index ebee04a4..4775f9aa 100644 --- a/tests/tibanna/unicorn/test_job.py +++ b/tests/tibanna/unicorn/test_job.py @@ -59,6 +59,15 @@ def test_jobs_status_failed(): assert res['completed_jobs'] == [] +def test_jobs_status_failed_exec_arn(): + with mock.patch('tibanna.job.Job.check_status', return_value='FAILED'): + res = Jobs.status(exec_arns=['ex1', 'ex2', 'ex3']) + assert len(res) == 3 + assert res['running_jobs'] == [] + assert res['failed_jobs'] == ['ex1', 'ex2', 'ex3'] + assert res['completed_jobs'] == [] + + def test_describe_exec(): exec_desc = {'vanilla': 'cinnamon'} with mock.patch('botocore.client.BaseClient._make_api_call', return_value=exec_desc): diff --git a/tibanna/job.py b/tibanna/job.py index 99ef0fa0..0f79edc1 100644 --- a/tibanna/job.py +++ b/tibanna/job.py @@ -26,7 +26,7 @@ def status(job_ids=None, exec_arns=None): statuses.update({j: Job(job_id=j).check_status()}) if exec_arns: for arn in exec_arns: - statuses.append({arn: Job(exec_arn=arn).check_status()}) + statuses.update({arn: Job(exec_arn=arn).check_status()}) res['completed_jobs'] = [job for job, status in iter(statuses.items()) if status == 'SUCCEEDED'] res['failed_jobs'] = [job for job, status in iter(statuses.items()) if status == 'FAILED'] res['running_jobs'] = [job for job, status in iter(statuses.items()) if status == 'RUNNING'] From 3609547e45345949778da92b04df1d67d1898a06 Mon Sep 17 00:00:00 2001 From: SooLee Date: Thu, 27 May 2021 10:59:02 -0400 Subject: [PATCH 5/5] job aborted test interference fixed --- tests/tibanna/unicorn/check_task_awsem/test_handler.py | 3 +++ tibanna/_version.py | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/tibanna/unicorn/check_task_awsem/test_handler.py b/tests/tibanna/unicorn/check_task_awsem/test_handler.py index b65468d1..1693955b 100755 --- a/tests/tibanna/unicorn/check_task_awsem/test_handler.py +++ b/tests/tibanna/unicorn/check_task_awsem/test_handler.py @@ -71,6 +71,9 @@ def test_check_task_awsem_aborted(check_task_input, s3): with pytest.raises(JobAbortedException) as excinfo: service.handler(check_task_input, '') assert 'aborted' in str(excinfo.value) + # cleanup + s3.delete_objects(Delete={'Objects': [{'Key': job_started}]}) + s3.delete_objects(Delete={'Objects': [{'Key': job_aborted}]}) @pytest.mark.webtest diff --git a/tibanna/_version.py b/tibanna/_version.py index 85dae371..fa797c7a 100755 --- a/tibanna/_version.py +++ b/tibanna/_version.py @@ -1,4 +1,4 @@ """Version information.""" # The following line *must* be the last in the module, exactly as formatted: -__version__ = "1.3.0" +__version__ = "1.3.1b"