Skip to content

Commit

Permalink
Merge pull request #277 from 4dn-dcic/readlog
Browse files Browse the repository at this point in the history
Readlog
  • Loading branch information
SooLee committed Mar 25, 2020
2 parents 5a3250d + 1bdbdfe commit 64c88b0
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 37 deletions.
23 changes: 23 additions & 0 deletions docs/execution_json.rst
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,29 @@ Dependency specification
}


Custom error handling
#####################

:custom_errors:
- List of dictionaries describing custom error types
- This field allows users to define workflow-specific errors based on a string pattern in log. Tibanna CheckTask step will parse the logs and detect this error.
- This does not serve as error detection - it serves as error identification once the run has failed.
- If the matching error happens, you'll see the error type and the corresponding line(s) of the error in the log file printed as the Exception in Step function.
- ``error_type`` is a short tag that defines the name of the error.
- ``pattern`` is the regex pattern to be detected in the log.
- ``multiline`` (optional) should be set True if ``pattern`` is multi-line (e.g. contains ``\n``).

::

[
{
"error_type": "Unmatching pairs in fastq"
"pattern": "paired reads have different names: .+",
"multiline": False
}
]


config
------

Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ python-lambda-4dn==0.12.3
boto3>=1.9.0
botocore>=1.12.1
urllib3>=1.24
requests==2.22.0
requests>=2.22.0
Benchmark-4dn>=0.5.8
108 changes: 108 additions & 0 deletions tests/tibanna/unicorn/test_exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import pytest
from tibanna.exceptions import (
AWSEMErrorHandler,
AWSEMJobErrorException
)


def test_general_awsem_error_msg():
eh = AWSEMErrorHandler()
res = eh.general_awsem_error_msg('somejobid')
assert res == 'Job encountered an error check log using tibanna log --job-id=somejobid [--sfn=stepfunction]'


def test_general_awsem_check_log_msg():
eh = AWSEMErrorHandler()
res = eh.general_awsem_check_log_msg('somejobid')
assert res == 'check log using tibanna log --job-id=somejobid [--sfn=stepfunction]'


def test_awsem_exception_not_enough_space_for_input():
log = "sometext some text some other text " + \
"download failed: s3://somebucket/somefile to ../../data1/input/somefile " + \
"[Errno 28] No space left on device " + \
"some other text some other text"
eh = AWSEMErrorHandler()
res = eh.parse_log(log)
assert res
with pytest.raises(AWSEMJobErrorException) as exec_info:
raise res
assert 'Not enough space for input files' in str(exec_info)


def test_awsem_exception_no_space_for_docker():
log = "failed to register layer: Error processing tar file(exit status 1): " + \
"write /miniconda3/pkgs/python-3.7.6-h0371630_2.tar.bz2: no space left on device" + \
"some text some text"
eh = AWSEMErrorHandler()
res = eh.parse_log(log)
assert res
with pytest.raises(AWSEMJobErrorException) as exec_info:
raise res
assert 'No space for docker' in str(exec_info)
assert 'tar.bz2: no space left' in str(exec_info)


def test_awsem_exception_no_space():
log = '[fputs] No space left on device' + \
'some text some text'
eh = AWSEMErrorHandler()
res = eh.parse_log(log)
assert res
with pytest.raises(AWSEMJobErrorException) as exec_info:
raise res
assert 'Not enough space' in str(exec_info)
assert '[fputs]' in str(exec_info)


def test_awsem_exception_cwl_missing_input():
log = "Workflow error, try again with --debug for more information:\n" + \
"Invalid job input record:\n" + \
"workflow_gatk-GenotypeGVCFs_plus_vcf-integrity-check.cwl:28:5: Missing required input parameter\n" + \
" 'chromosomes'\n" + \
"some text some text"
eh = AWSEMErrorHandler()
res = eh.parse_log(log)
assert res
with pytest.raises(AWSEMJobErrorException) as exec_info:
raise res
assert 'CWL missing input' in str(exec_info)
assert 'chromosomes' in str(exec_info)


def test_add_custom_errors():
log = "[M::mem_pestat] low and high boundaries for proper pairs: (1, 22)" + \
"[mem_sam_pe] paired reads have different names: " + \
"\"H3MVTCCXX:4:1101:1174861:0\", \"H3MVTCCXX:4:1101:743397:0\""
eh = AWSEMErrorHandler()
eh.add_custom_errors([{"error_type": "Unmatching pairs in fastq",
"pattern": "paired reads have different names: .+",
"multiline": False}])
assert len(eh.ErrorList) == len(eh._ErrorList) + 1
res = eh.parse_log(log)
assert res
with pytest.raises(AWSEMJobErrorException) as exec_info:
raise res
assert 'Unmatching pairs in fastq' in str(exec_info)
assert 'H3MVTCCXX:4:1101:1174861:0' in str(exec_info)


def test_add_custom_errors2():
log = "sometext some text some other text " + \
"Exception: File is empty (1234567890abcdefg.regionPeak.gz) some other text"
eh = AWSEMErrorHandler()
eh.add_custom_errors([{"error_type": "No peak called",
"pattern": "Exception: File is empty (.+.regionPeak.gz)"}])
res = eh.parse_log(log)
assert res
with pytest.raises(AWSEMJobErrorException) as exec_info:
raise res
assert 'No peak called' in str(exec_info)
assert '1234567890abcdefg.regionPeak.gz' in str(exec_info)


def test_no_matching_error():
log = 'some text some text no error just some text'
eh = AWSEMErrorHandler()
res = eh.parse_log(log)
assert not res
2 changes: 1 addition & 1 deletion tibanna/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Version information."""

# The following line *must* be the last in the module, exactly as formatted:
__version__ = "0.16.0"
__version__ = "0.17.0"
77 changes: 47 additions & 30 deletions tibanna/check_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
AWSEMJobErrorException,
EC2UnintendedTerminationException,
EC2IdleException,
MetricRetrievalException
MetricRetrievalException,
AWSEMErrorHandler
)
from .core import API


RESPONSE_JSON_CONTENT_INCLUSION_LIMIT = 30000 # strictly it is 32,768 but just to be safe.

Expand All @@ -31,43 +34,52 @@ def check_task(input_json):

class CheckTask(object):
TibannaResource = TibannaResource
API = API

def __init__(self, input_json):
self.input_json = copy.deepcopy(input_json)

def run(self):
input_json_copy = self.input_json

# s3 bucket that stores the output
bucket_name = input_json_copy['config']['log_bucket']

# info about the jobby job
jobid = input_json_copy['jobid']
job_started = "%s.job_started" % jobid
job_success = "%s.success" % jobid
job_error = "%s.error" % jobid

public_postrun_json = input_json_copy['config'].get('public_postrun_json', False)

# check to see ensure this job has started else fail
if not does_key_exist(bucket_name, job_started):
raise EC2StartingException("Failed to find jobid %s, ec2 is probably still booting" % jobid)

# check to see if job has error, report if so
if does_key_exist(bucket_name, job_error):
try:
self.handle_postrun_json(bucket_name, jobid, input_json_copy, public_read=public_postrun_json)
except Exception as e:
printlog("error handling postrun json %s" % str(e))
errmsg = "Job encountered an error check log using tibanna log --job-id=%s [--sfn=stepfunction]" % jobid
raise AWSEMJobErrorException(errmsg)

eh = AWSEMErrorHandler()
if 'custom_errors' in input_json_copy['args']:
eh.add_custom_errors(input_json_copy['args']['custom_errors'])
log = API().log(job_id=jobid)
ex = eh.parse_log(log)
if ex:
msg_aug = str(ex) + ". For more info - " + eh.general_awsem_check_log_msg(jobid)
raise AWSEMJobErrorException(msg_aug)
else:
raise AWSEMJobErrorException(eh.general_awsem_error_msg(jobid))

# check to see if job has completed
if does_key_exist(bucket_name, job_success):
self.handle_postrun_json(bucket_name, jobid, input_json_copy, public_read=public_postrun_json)
print("completed successfully")
return input_json_copy

# checking if instance is terminated for no reason
instance_id = input_json_copy['config'].get('instance_id', '')
if instance_id: # skip test for instance_id by not giving it to input_json_copy
Expand All @@ -86,7 +98,7 @@ def run(self):
errmsg = "EC2 is terminated unintendedly for job %s - please rerun." % jobid
printlog(errmsg)
raise EC2UnintendedTerminationException(errmsg)

# check CPU utilization for the past hour
filesystem = '/dev/nvme1n1' # doesn't matter for cpu utilization
end = datetime.now(tzutc())
Expand All @@ -98,26 +110,31 @@ def run(self):
except Exception as e:
raise MetricRetrievalException(e)
if 'max_cpu_utilization_percent' in cw_res:
if not cw_res['max_cpu_utilization_percent'] or cw_res['max_cpu_utilization_percent'] < 1.0:
# the instance wasn't terminated - otherwise it would have been captured in the previous error.
if not cw_res['max_ebs_read_bytes'] or cw_res['max_ebs_read_bytes'] < 1000: # minimum 1kb
# in case the instance is copying files using <1% cpu for more than 1hr, do not terminate it.
try:
boto3.client('ec2').terminate_instances(InstanceIds=[instance_id])
errmsg = "Nothing has been running for the past hour for job %s " + \
"(CPU utilization %s and EBS read %s bytes)." % \
(jobid, str(cw_res['max_cpu_utilization_percent']), str(cw_res['max_ebs_read_bytes']))
raise EC2IdleException(errmsg)
except Exception as e:
errmsg = "Nothing has been running for the past hour for job %s," + \
"but cannot terminate the instance (cpu utilization (%s) : %s" % \
jobid, str(cw_res['max_cpu_utilization_percent']), str(e)
printlog(errmsg)
raise EC2IdleException(errmsg)

self.terminate_idle_instance(jobid,
instance_id,
cw_res['max_cpu_utilization_percent'],
cw_res['max_ebs_read_bytes'])
# if none of the above
raise StillRunningException("job %s still running" % jobid)

def terminate_idle_instance(self, jobid, instance_id, cpu, ebs_read):
if not cpu or cpu < 1.0:
# the instance wasn't terminated - otherwise it would have been captured in the previous error.
if not ebs_read or ebs_read < 1000: # minimum 1kb
# in case the instance is copying files using <1% cpu for more than 1hr, do not terminate it.
try:
boto3.client('ec2').terminate_instances(InstanceIds=[instance_id])
errmsg = "Nothing has been running for the past hour for job %s " + \
"(CPU utilization %s and EBS read %s bytes)." % \
(jobid, str(cpu), str(ebs_read))
raise EC2IdleException(errmsg)
except Exception as e:
errmsg = "Nothing has been running for the past hour for job %s," + \
"but cannot terminate the instance (cpu utilization (%s) : %s" % \
jobid, str(cpu), str(e)
printlog(errmsg)
raise EC2IdleException(errmsg)

def handle_postrun_json(self, bucket_name, jobid, input_json, public_read=False):
postrunjson = "%s.postrun.json" % jobid
if not does_key_exist(bucket_name, postrunjson):
Expand All @@ -141,7 +158,7 @@ def handle_postrun_json(self, bucket_name, jobid, input_json, public_read=False)
raise "error in updating postrunjson %s" % str(e)
# add postrun json to the input json
self.add_postrun_json(prj, input_json, RESPONSE_JSON_CONTENT_INCLUSION_LIMIT)

def add_postrun_json(self, prj, input_json, limit):
prjd = prj.as_dict()
if len(str(prjd)) + len(str(input_json)) < limit:
Expand All @@ -153,7 +170,7 @@ def add_postrun_json(self, prj, input_json, limit):
input_json['postrunjson'] = prjd
else:
input_json['postrunjson'] = {'log': 'postrun json not included due to data size limit'}

def handle_metrics(self, prj):
try:
resources = self.TibannaResource(prj.Job.instance_id,
Expand Down
3 changes: 3 additions & 0 deletions tibanna/ec2_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ def fill_default(self):
'additional_benchmarking_parameters']:
if not hasattr(self, field):
setattr(self, field, {})
for field in ['custom_errors']:
if not hasattr(self, field):
setattr(self, field, [])
for field in ['app_version']:
if not hasattr(self, field):
setattr(self, field, '')
Expand Down
75 changes: 70 additions & 5 deletions tibanna/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,74 @@
import re


# custom exceptions
class AWSEMJobErrorException(Exception):
"""There is an error from a worklow run on the EC2 AWSEM instance."""
pass


class AWSEMErrorHandler(object):

def __init__(self):
self.ErrorList = self._ErrorList # initial error list, custom errors can be added

class AWSEMError(object):
def __init__(self, error_type, pattern_in_log, multiline=False):
self.error_type = error_type
if multiline:
self.pattern_in_log = re.compile(pattern_in_log, re.MULTILINE)
else:
self.pattern_in_log = pattern_in_log

def add_custom_errors(self, custom_err_list):
"""add custom errors to ErrorList.
custom_err_list is a list of dictionaries w/ keys 'error_type', 'pattern', 'multiline'"""
for err in custom_err_list:
self.ErrorList.append(self.AWSEMError(err['error_type'], err['pattern'], err.get('multiline', False)))

@property
def _ErrorList(self):
"""add any specific error types with recognizable strings or patterns here.
the order is important. The earlier ones are checked first and if there is a match,
the later ones will not be checked."""
return [
# input download failure due to not enough disk space
self.AWSEMError('Not enough space for input files', 'download failed: .+ No space left on device'),
# Docker pull failure due to not enough root disk space
self.AWSEMError('No space for docker', 'failed to register layer.+no space left on device'),
# not enough disk space
self.AWSEMError('Not enough space', '.+No space left on device'),
# CWL missing input error
self.AWSEMError('CWL missing input', 'Missing required input parameter\n.+\n', True)
]

def parse_log(self, log):
# for ex in self.AWSEMErrorExceptionList:
for ex in self.ErrorList:
res = re.search(ex.pattern_in_log, log)
if res:
match = res.string[res.regs[0][0]:res.regs[0][1]]
match = re.sub('\n', ' ', match) # \n not recognized and subsequent content is dropped from Exception
match = re.sub(' +', ' ', match)
msg = "%s: %s" % (ex.error_type, match)
return AWSEMJobErrorException(msg)
return

@property
def general_awsem_check_log_msg_template(self):
return "check log using tibanna log --job-id=%s [--sfn=stepfunction]"

def general_awsem_check_log_msg(self, job_id):
return self.general_awsem_check_log_msg_template % job_id

@property
def general_awsem_error_msg_template(self):
return "Job encountered an error " + self.general_awsem_check_log_msg_template

def general_awsem_error_msg(self, job_id):
return self.general_awsem_error_msg_template % job_id


class StillRunningException(Exception):
"""EC2 AWSEM instance is still running (job not complete)"""
pass
Expand All @@ -9,11 +79,6 @@ class EC2StartingException(Exception):
pass


class AWSEMJobErrorException(Exception):
"""There is an error from a worklow run on the EC2 AWSEM instance"""
pass


class DependencyStillRunningException(Exception):
pass

Expand Down

0 comments on commit 64c88b0

Please sign in to comment.