Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle batch jobs that completed but still have pending actions #186

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions src/python/dart/service/pending_actions_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from dart.context.locator import injectable
from dart.model.workflow import WorkflowState, WorkflowInstanceState
from dart.model.action import ActionState

import logging
import boto3

_logger = logging.getLogger(__name__)

@injectable
class PendingActionsCheck(object):
def __init__(self, action_service):
self._action_service = action_service
self._batch_client = boto3.client('batch')

def get_not_completed_workflow_instances(self, workflow_id, workflow_service):
wf = workflow_service.get_workflow(workflow_id, raise_when_missing=False)
if not wf:
_logger.info('Zombie Check: workflow (id={wf_id}) not found. log-info: {log_info}'.
format(wf_id=workflow_id, log_info=workflow_msg.get('log_info')))
return None

if wf.data.state != WorkflowState.ACTIVE:
_logger.info('Zombie Check: expected workflow (id={wf_id}) to be in ACTIVE state. log-info: {log_info}'.
format(wf_id=workflow_id, log_info=workflow_msg.get('log_info')))

# get all workflow_instances of current workflow:
NOT_COMPLETE_STATES = ['QUEUED', 'RUNNING']
all_wf_instances = workflow_service.find_workflow_instances(workflow_id)
current_wf_instances = [wf for wf in all_wf_instances if wf.data.state in NOT_COMPLETE_STATES]
_logger.info('Zombie Check: Found workflow instance ids (workflow_id={0}) instances = {1}'.format(workflow_id, current_wf_instances))

return current_wf_instances

def get_instance_actions(self, current_wf_instances):
# get all actions of not completed workflow_instances
incomplete_actions = []
action_2_wf_instance = {}
for wf_instance in current_wf_instances:
wf_instance_actions = self._action_service.find_actions(workflow_instance_id=wf_instance.id)
incomplete_actions.extend(wf_instance_actions)
for action in wf_instance_actions:
action_2_wf_instance[action.id] = wf_instance

jobs_2_actions = {}
for action in incomplete_actions:
if action.data.batch_job_id:
jobs_2_actions[action.data.batch_job_id] = action

return incomplete_actions, jobs_2_actions, action_2_wf_instance

def handle_done_batch_jobs_with_not_complete_wf_instances(self, batch_jobs, jobs_2_actions, action_2_wf_instance, workflow_service):
for job in batch_jobs.get('jobs'):
# jobs fail + action not-failed => fail workflow instance and action
action = jobs_2_actions[job.get('jobId')]
if action:
wf_instance = action_2_wf_instance[action.id]
if job.get('status') == 'FAILED' and not (action.data.state in ['FAILED', 'COMPLETED']):
_logger.info("Zombie Check: Job {0} is failed but action {0} is not failed/completed. Updating action and workflow_instance to FAILED".format(job.get('jobId'), action.id))
self._action_service.update_action_state(action, ActionState.FAILED, action.data.error_message)
workflow_service.update_workflow_instance_state(wf_instance, WorkflowInstanceState.FAILED)

# Jobs complete + action not-failed => mark workflow instance as complete and mark actions as complete
if job.get('status') == 'COMPLETED' and not (action.data.state in ['FAILED', 'COMPLETED']):
_logger.info("Zombie Check: Job {0} is completed but action {0} is not failed/completed. Updating action to COMPLETED".format(job.get("jobId"), action.id))
self._action_service.update_action_state(action, ActionState.COMPLETED, action.data.error_message)
workflow_service.update_workflow_instance_state(wf_instance, WorkflowInstanceState.FAILED)

def find_pending_dart_actions(self, workflow_id, workflow_service):
''' We send workflow_service to avoid cyclical injection from workflow_service '''
current_wf_instances = self.get_not_completed_workflow_instances(workflow_id, workflow_service)
if current_wf_instances:
incomplete_actions, jobs_2_actions, action_2_wf_instance = self.get_instance_actions(current_wf_instances)
batch_job_ids = [job.data.batch_job_id for job in incomplete_actions]
_logger.info("Zombie Check: extract job_ids {0} form incomplete actions {1}".format(batch_job_ids, [act.id for act in incomplete_actions]))

try:
batch_jobs = self._batch_client.describe_jobs(jobs=batch_job_ids)
except Exception as err:
_logger.error("Zombie Check: failed to execute batch's describe_jobs. err = {0}".format(err))
else:
self.handle_done_batch_jobs_with_not_complete_wf_instances(batch_jobs, jobs_2_actions, action_2_wf_instance, workflow_service)


9 changes: 7 additions & 2 deletions src/python/dart/service/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
class TriggerService(object):
def __init__(self, action_service, datastore_service, workflow_service, manual_trigger_processor,
subscription_batch_trigger_processor, workflow_completion_trigger_processor, event_trigger_processor,
scheduled_trigger_processor, super_trigger_processor, retry_trigger_processor, filter_service,
subscription_service, dart_config):
scheduled_trigger_processor, super_trigger_processor, retry_trigger_processor,
zombie_check_trigger_processor, filter_service, subscription_service, dart_config):
self._action_service = action_service
self._datastore_service = datastore_service
self._workflow_service = workflow_service
Expand All @@ -33,6 +33,7 @@ def __init__(self, action_service, datastore_service, workflow_service, manual_t
self._scheduled_trigger_processor = scheduled_trigger_processor
self._super_trigger_processor = super_trigger_processor
self._retry_trigger_processor = retry_trigger_processor
self._zombie_check_trigger_processor = zombie_check_trigger_processor
self._filter_service = filter_service
self._subscription_service = subscription_service
self._nudge_config = dart_config['nudge']
Expand All @@ -45,6 +46,7 @@ def __init__(self, action_service, datastore_service, workflow_service, manual_t
scheduled_trigger_processor.trigger_type().name: scheduled_trigger_processor,
super_trigger_processor.trigger_type().name: super_trigger_processor,
retry_trigger_processor.trigger_type().name: retry_trigger_processor,
zombie_check_trigger_processor.trigger_type().name: zombie_check_trigger_processor
}

params_schemas = []
Expand Down Expand Up @@ -218,6 +220,9 @@ def delete_trigger_retryable(trigger_id):
db.session.delete(trigger_dao)
db.session.commit()

def check_zombie_workflows(self, workflow_json):
self._zombie_check_trigger_processor.send_evaluation_message(workflow_json)

def trigger_workflow_async(self, workflow_json):
self._manual_trigger_processor.send_evaluation_message(workflow_json)

Expand Down
4 changes: 3 additions & 1 deletion src/python/dart/service/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@
@injectable
class WorkflowService(object):
def __init__(self, datastore_service, action_service, trigger_proxy, filter_service, subscription_service,
subscription_element_service, emailer):
subscription_element_service, emailer, pending_actions_check):
self._datastore_service = datastore_service
self._action_service = action_service
self._trigger_proxy = trigger_proxy
self._filter_service = filter_service
self._subscription_service = subscription_service
self._subscription_element_service = subscription_element_service
self._emailer = emailer
self._pending_actions_check = pending_actions_check

@staticmethod
def save_workflow(workflow, commit=True, flush=False):
Expand Down Expand Up @@ -263,6 +264,7 @@ def run_triggered_workflow(self, workflow_msg, trigger_type, trigger_id=None, re
states = [WorkflowInstanceState.QUEUED, WorkflowInstanceState.RUNNING]
if self.find_workflow_instances_count(wf.id, states) >= wf.data.concurrency:
_logger.info('workflow (id={wf_id}) has already reached max concurrency of {concurrency}. log-info: {log_info}'.format(wf_id=wf.id, concurrency=wf.data.concurrency, log_info=workflow_msg.get('log_info')))
self._pending_actions_check.find_pending_dart_actions(wf.id, self)
return

wf_instance = self.save_workflow_instance(
Expand Down
53 changes: 53 additions & 0 deletions src/python/dart/trigger/zombie_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from dart.context.locator import injectable
from dart.model.trigger import TriggerType
from dart.trigger.base import TriggerProcessor
from dart.model.workflow import WorkflowState, WorkflowInstanceState
from dart.model.action import ActionState

import logging
import boto3

_logger = logging.getLogger(__name__)

zombie_check_trigger = TriggerType(
name='zombie_check',
description='Check if the actions of current workflow instances are not in (FAILED, SUCCESS) states in Batch'
)

@injectable
class ZombieCheckTriggerProcessor(TriggerProcessor):
def __init__(self, trigger_proxy, action_service, workflow_service, pending_actions_check):
self._trigger_proxy = trigger_proxy
self._action_service = action_service
self._workflow_service = workflow_service
self._trigger_type = zombie_check_trigger
self._pending_actions_check = pending_actions_check
self._batch_client = boto3.client('batch')

def trigger_type(self):
return self._trigger_type

def initialize_trigger(self, trigger, trigger_service):
# manual triggers should never be saved, thus never initialized
pass

def update_trigger(self, unmodified_trigger, modified_trigger):
return modified_trigger

def evaluate_message(self, workflow_msg, trigger_service):
""" :type message: dict
:type trigger_service: dart.service.trigger.TriggerService """

workflow_id = workflow_msg.get('workflow_id')
self._pending_actions_check.find_pending_dart_actions(workflow_id, self._workflow_service)

# return an empty list since this is not associated with a particular trigger instance
return []

def teardown_trigger(self, trigger, trigger_service):
pass

def send_evaluation_message(self, workflow_msg):
self._trigger_proxy.process_trigger(self._trigger_type, workflow_msg)


9 changes: 7 additions & 2 deletions src/python/dart/web/api/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,17 @@ def trigger_workflow(workflow):
if wf.data.state != WorkflowState.ACTIVE:
return {'results': 'ERROR', 'error_message': 'This workflow is not ACTIVE'}, 400, None

wf_uuid = uuid.uuid4().hex # to avoid uuid serialization issues
current_user_id = current_user.email if hasattr(current_user, 'email') else 'anonymous'

states = [WorkflowInstanceState.QUEUED, WorkflowInstanceState.RUNNING]
if workflow_service().find_workflow_instances_count(wf.id, states) >= wf.data.concurrency:
_logger.info("Checking for Batch 'stuck' workflows, workflow_id={workflow_id} for user={user_id} with uuid={wf_uuid}".
format(workflow_id=workflow.id, user_id=current_user_id, wf_uuid=wf_uuid))
trigger_service().check_zombie_workflows({'workflow_id': workflow.id,
'log_info': {'user_id': current_user_id, 'wf_uuid': wf_uuid}})
return {'results': 'ERROR', 'error_message': 'Max concurrency reached: %s' % wf.data.concurrency}, 400, None

wf_uuid = uuid.uuid4().hex # to avoid uuid serialization issues
current_user_id = current_user.email if hasattr(current_user, 'email') else 'anonymous'
_logger.info("Launching Workflow {workflow_id} for user={user_id} with uuid={wf_uuid}".
format(workflow_id=workflow.id, user_id=current_user_id, wf_uuid=wf_uuid))

Expand Down