Skip to content

Commit

Permalink
Merge pull request #341 from 4dn-dcic/softkill
Browse files Browse the repository at this point in the history
Softkill
  • Loading branch information
SooLee committed May 27, 2021
2 parents 506afd8 + 3609547 commit 2614ac4
Show file tree
Hide file tree
Showing 8 changed files with 276 additions and 160 deletions.
19 changes: 18 additions & 1 deletion tests/tibanna/unicorn/check_task_awsem/test_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
EC2StartingException,
StillRunningException,
MetricRetrievalException,
EC2IdleException
EC2IdleException,
JobAbortedException
)
import pytest
import boto3
Expand Down Expand Up @@ -59,6 +60,22 @@ def test_check_task_awsem_fails_if_no_job_started_for_too_long(check_task_input,
assert 'Failed to find jobid' in str(excinfo.value)


def test_check_task_awsem_aborted(check_task_input, s3):
jobid = 'lalala'
check_task_input_modified = check_task_input
check_task_input_modified['jobid'] = jobid
job_started = "%s.job_started" % jobid
job_aborted = "%s.aborted" % jobid
s3.put_object(Body=b'', Key=job_started)
s3.put_object(Body=b'', Key=job_aborted)
with pytest.raises(JobAbortedException) as excinfo:
service.handler(check_task_input, '')
assert 'aborted' in str(excinfo.value)
# cleanup
s3.delete_objects(Delete={'Objects': [{'Key': job_started}]})
s3.delete_objects(Delete={'Objects': [{'Key': job_aborted}]})


@pytest.mark.webtest
def test_check_task_awsem_throws_exception_if_not_done(check_task_input):
with pytest.raises(StillRunningException) as excinfo:
Expand Down
60 changes: 59 additions & 1 deletion tests/tibanna/unicorn/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@
from tibanna.vars import DYNAMODB_TABLE, EXECUTION_ARN
import mock
import boto3
import pytest


@pytest.fixture
def raise_error():
raise Exception()


def test_stepfunction_exists():
Expand All @@ -20,7 +26,7 @@ def test_stepfunction_exists2():

def test_job_check_output():
job = Job(job_id='jid1')
job.exec_desc = {'output': '{"somefield": "someoutput"}'}
job._exec_desc = {'output': '{"somefield": "someoutput"}'}
with mock.patch('tibanna.job.Job.check_status', return_value='SUCCEEDED'):
res = job.check_output()
assert res == {'somefield': 'someoutput'}
Expand Down Expand Up @@ -53,6 +59,15 @@ def test_jobs_status_failed():
assert res['completed_jobs'] == []


def test_jobs_status_failed_exec_arn():
with mock.patch('tibanna.job.Job.check_status', return_value='FAILED'):
res = Jobs.status(exec_arns=['ex1', 'ex2', 'ex3'])
assert len(res) == 3
assert res['running_jobs'] == []
assert res['failed_jobs'] == ['ex1', 'ex2', 'ex3']
assert res['completed_jobs'] == []


def test_describe_exec():
exec_desc = {'vanilla': 'cinnamon'}
with mock.patch('botocore.client.BaseClient._make_api_call', return_value=exec_desc):
Expand Down Expand Up @@ -102,3 +117,46 @@ def test_add_to_dd_and_info():
assert info['Job Id'] == jobid
assert info['Log Bucket'] == logbucket
assert 'Time Stamp' in info


def test_exec_arn():
assert Job(exec_arn='somearn').exec_arn == 'somearn'
with mock.patch('tibanna.job.Job.get_exec_arn_from_job_id', return_value='somearn'):
assert Job(job_id='somejobid').exec_arn == 'somearn'


def test_job_id():
assert Job(job_id='somejobid').job_id == 'somejobid'
with mock.patch('tibanna.job.Job.get_job_id_from_exec_arn', return_value='somejobid'):
assert Job(exec_arn='somearn').job_id == 'somejobid'


def test_log_bucket():
job = Job(job_id='somejobid')
job._log_bucket = 'somebucket'
assert job.log_bucket == 'somebucket'


def test_log_bucket_from_dd():
job = Job(job_id='somejobid')
with mock.patch('tibanna.job.Job.get_log_bucket_from_job_id', return_value='somebucket'):
assert job.log_bucket == 'somebucket'


def test_log_bucket_dd2():
job = Job(job_id='somejobid')
with mock.patch('tibanna.job.Job.info', return_value={'Log Bucket': 'somebucket'}):
assert job.log_bucket == 'somebucket'


def test_log_bucket_no_dd():
job = Job(job_id='somejobid')
job.sfn = 'somesfn'
with mock.patch('tibanna.job.Job.info', return_value={}):
with mock.patch('tibanna.job.Job.get_log_bucket_from_job_id_and_sfn_wo_dd', return_value='somebucket'):
assert job.log_bucket == 'somebucket'


def test_get_log_bucket_from_job_id():
with mock.patch('tibanna.job.Job.info', return_value={'job_id': 'somejobid', 'Log Bucket': 'somelogbucket'}):
assert Job.get_log_bucket_from_job_id(job_id='somejobid') == 'somelogbucket'
39 changes: 27 additions & 12 deletions tibanna/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,20 @@ def args(self):
{'flag': ["-s", "--sfn"],
'help': "tibanna step function name (e.g. 'tibanna_unicorn_monty'); " +
"your current default is %s)" % TIBANNA_DEFAULT_STEP_FUNCTION_NAME,
'default': TIBANNA_DEFAULT_STEP_FUNCTION_NAME}],
'default': TIBANNA_DEFAULT_STEP_FUNCTION_NAME},
{'flag': ["-z", "--soft"],
'help': "instead of directly killing the execution, " +
"send abort signal to s3 so that step function can handle it",
'action': "store_true"}],
'kill_all':
[{'flag': ["-s", "--sfn"],
'help': "tibanna step function name (e.g. 'tibanna_unicorn_monty'); " +
"your current default is %s)" % TIBANNA_DEFAULT_STEP_FUNCTION_NAME,
'default': TIBANNA_DEFAULT_STEP_FUNCTION_NAME}],
'default': TIBANNA_DEFAULT_STEP_FUNCTION_NAME},
{'flag': ["-z", "--soft"],
'help': "instead of directly killing the execution, " +
"send abort signal to s3 so that step function can handle it",
'action': "store_true"}],
'log':
[{'flag': ["-e", "--exec-arn"],
'help': "execution arn of the specific job to log"},
Expand Down Expand Up @@ -265,7 +273,10 @@ def args(self):
{'flag': ["-P", "--do-not-delete-public-access-block"],
'action': "store_true",
'help': "Do not delete public access block from buckets" +
"(this way postrunjson and metrics reports will not be public)"}],
"(this way postrunjson and metrics reports will not be public)"},
{'flag': ["-q", "--quiet"],
'action': "store_true",
'help': "minimize standard output from deployment"}],
'deploy_core':
[{'flag': ["-n", "--name"],
'help': "name of the lambda function to deploy (e.g. run_task_awsem)"},
Expand All @@ -274,7 +285,10 @@ def args(self):
"Lambda function, within the same usergroup"},
{'flag': ["-g", "--usergroup"],
'default': '',
'help': "Tibanna usergroup for the AWS Lambda function"}],
'help': "Tibanna usergroup for the AWS Lambda function"},
{'flag': ["-q", "--quiet"],
'action': "store_true",
'help': "minimize standard output from deployment"}],
'plot_metrics':
[{'flag': ["-j", "--job-id"],
'help': "job id of the specific job to log (alternative to --exec-arn/-e)"},
Expand Down Expand Up @@ -354,11 +368,11 @@ def args(self):
}


def deploy_core(name, suffix=None, usergroup=''):
def deploy_core(name, suffix=None, usergroup='', quiet=False):
"""
New method of deploying packaged lambdas (BETA)
"""
API().deploy_core(name=name, suffix=suffix, usergroup=usergroup)
API().deploy_core(name=name, suffix=suffix, usergroup=usergroup, quiet=quiet)


def run_workflow(input_json, sfn=TIBANNA_DEFAULT_STEP_FUNCTION_NAME, jobid='', do_not_open_browser=False, sleep=3):
Expand All @@ -381,11 +395,12 @@ def setup_tibanna_env(buckets='', usergroup_tag='default', no_randomize=False,


def deploy_unicorn(suffix=None, no_setup=False, buckets='',
no_setenv=False, usergroup='', do_not_delete_public_access_block=False, deploy_costupdater = False):
no_setenv=False, usergroup='', do_not_delete_public_access_block=False,
deploy_costupdater=False, quiet=False):
"""deploy tibanna unicorn to AWS cloud"""
API().deploy_unicorn(suffix=suffix, no_setup=no_setup, buckets=buckets, no_setenv=no_setenv,
usergroup=usergroup, do_not_delete_public_access_block=do_not_delete_public_access_block,
deploy_costupdater=deploy_costupdater)
deploy_costupdater=deploy_costupdater, quiet=quiet)


def add_user(user, usergroup):
Expand All @@ -410,14 +425,14 @@ def log(exec_arn=None, job_id=None, exec_name=None, sfn=TIBANNA_DEFAULT_STEP_FUN
top=top, top_latest=top_latest))


def kill_all(sfn=TIBANNA_DEFAULT_STEP_FUNCTION_NAME):
def kill_all(sfn=TIBANNA_DEFAULT_STEP_FUNCTION_NAME, soft=False):
"""kill all the running jobs on a step function"""
API().kill_all(sfn)
API().kill_all(sfn, soft=soft)


def kill(exec_arn=None, job_id=None, sfn=TIBANNA_DEFAULT_STEP_FUNCTION_NAME):
def kill(exec_arn=None, job_id=None, sfn=TIBANNA_DEFAULT_STEP_FUNCTION_NAME, soft=False):
"""kill a specific job"""
API().kill(exec_arn, job_id, sfn)
API().kill(exec_arn, job_id, sfn, soft=soft)


def info(job_id):
Expand Down
2 changes: 1 addition & 1 deletion tibanna/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Version information."""

# The following line *must* be the last in the module, exactly as formatted:
__version__ = "1.2.8"
__version__ = "1.3.1b"
10 changes: 10 additions & 0 deletions tibanna/check_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
EC2UnintendedTerminationException,
EC2IdleException,
MetricRetrievalException,
JobAbortedException,
AWSEMErrorHandler
)
from .vars import PARSE_AWSEM_TIME
Expand Down Expand Up @@ -53,6 +54,7 @@ def run(self):
job_started = "%s.job_started" % jobid
job_success = "%s.success" % jobid
job_error = "%s.error" % jobid
job_aborted = "%s.aborted" % jobid

public_postrun_json = self.input_json['config'].get('public_postrun_json', False)

Expand All @@ -69,6 +71,14 @@ def run(self):
raise EC2IdleException("Failed to find jobid %s, ec2 is not initializing for too long. Terminating the instance." % jobid)
raise EC2StartingException("Failed to find jobid %s, ec2 is probably still booting" % jobid)

# check to see if job has been aborted (by user or admin)
if does_key_exist(bucket_name, job_aborted):
try:
self.handle_postrun_json(bucket_name, jobid, self.input_json, public_read=public_postrun_json)
except Exception as e:
logger.warning("error occurred while handling postrun json but continuing. %s" % str(e))
raise JobAbortedException("job aborted")

# check to see if job has error, report if so
if does_key_exist(bucket_name, job_error):
try:
Expand Down
Loading

0 comments on commit 2614ac4

Please sign in to comment.