Skip to content

Commit

Permalink
DW-5997: locating pending workflows in dart.
Browse files Browse the repository at this point in the history
  • Loading branch information
aophir-rmn committed Aug 15, 2017
1 parent c1c13af commit 6aedf9c
Showing 1 changed file with 85 additions and 10 deletions.
95 changes: 85 additions & 10 deletions src/python/dart/trigger/zombie_check.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
from dart.context.locator import injectable
from dart.model.trigger import TriggerType
from dart.trigger.base import TriggerProcessor
from dart.model.workflow import WorkflowState, WorkflowInstanceState
from dart.model.action import ActionState

import logging
import boto3

_logger = logging.getLogger(__name__)

zombie_check_trigger = TriggerType(
name='zombie_check',
Expand All @@ -10,10 +16,12 @@

@injectable
class ZombieCheckTriggerProcessor(TriggerProcessor):
def __init__(self, trigger_proxy, workflow_service):
def __init__(self, trigger_proxy, action_service, workflow_service):
self._trigger_proxy = trigger_proxy
self._action_service = action_service
self._workflow_service = workflow_service
self._trigger_type = zombie_check_trigger
self._batch_client = boto3.client('batch')

def trigger_type(self):
return self._trigger_type
Expand All @@ -25,21 +33,88 @@ def initialize_trigger(self, trigger, trigger_service):
def update_trigger(self, unmodified_trigger, modified_trigger):
return modified_trigger

def evaluate_message(self, message, trigger_service):
def get_not_completed_workflow_instances(self, workflow_msg):
# get workflow associated with currently triggered workflow
workflow_id = workflow_msg.get('workflow_id')
wf = self._workflow_service.get_workflow(workflow_id, raise_when_missing=False)
if not wf:
_logger.info('Zombie Check: workflow (id={wf_id}) not found. log-info: {log_info}'.
format(wf_id=workflow_id, log_info=workflow_msg.get('log_info')))
return None

if wf.data.state != WorkflowState.ACTIVE:
_logger.info('Zombie Check: expected workflow (id={wf_id}) to be in ACTIVE state. log-info: {log_info}'.
format(wf_id=workflow_id, log_info=workflow_msg.get('log_info')))

# get all workflow_instances of current workflow:
NOT_COMPLETE_STATES = ['QUEUED', 'RUNNING']
all_wf_instances = self._workflow_service.find_workflow_instances(workflow_id)
current_wf_instances = [wf for wf in all_wf_instances if wf.data.state in NOT_COMPLETE_STATES]
_logger.info('Zombie Check: Found workflow instance ids (workflow_id={0}) instances = {1}'.format(workflow_id, current_wf_instances))

return current_wf_instances

def get_instance_actions(self, current_wf_instances):
# get all actions of nt completed workflow_instances and map ach action to its workflow_instance in action_2_wf_instance
incomplete_actions = []
action_2_wf_instance = {}
for wf_instance in current_wf_instances:
wf_instance_actions = self._action_service.find_actions(workflow_instance_id=wf_instance.id)
incomplete_actions.extend(wf_instance_actions)
for action in wf_instance_actions:
action_2_wf_instance[action.id] = wf_instance

jobs_2_actions = {}
for action in incomplete_actions:
if action.data.batch_job_id:
jobs_2_actions[action.data.batch_job_id] = action

return incomplete_actions, jobs_2_actions, action_2_wf_instance

def handle_done_batch_jobs_with_not_complete_wf_instances(self, batch_jobs, jobs_2_actions, action_2_wf_instance):
for job in batch_jobs.get('jobs'):
# jobs fail + action not-failed => fail workflow instance and action
action = jobs_2_actions[job.get('jobId')]
if action:
if job.get('status') == 'FAILED' and not (action.data.state in ['FAILED', 'COMPLETED']):
_logger.info("Zombie Check: Job {0} is failed but action {0} is not failed/completed. Updating action and workflow_instance to FAILED".format(job.get('jobId'), action.id))
self._action_service.update_action_state(action, ActionState.FAILED, action.data.error_message)
self._workflow_service.update_workflow_instance_state(action, WorkflowInstanceState.FAILED)

# Jobs complete + action not-failed => mark workflow instance as complete and mark actions as complete
if job.get('status') == 'COMPLETED' and not (action.data.state in ['FAILED', 'COMPLETED']):
_logger.info("Zombie Check: Job {0} is completed but action {0} is not failed/completed. Updating action to COMPLETED".format(job.get("jobId"), action.id))
self._action_service.update_action_state(action, ActionState.COMPLETED, action.data.error_message)
self._workflow_service.update_workflow_instance_state(action, WorkflowInstanceState.FAILED)

def evaluate_message(self, workflow_msg, trigger_service):
""" :type message: dict
:type trigger_service: dart.service.trigger.TriggerService """
# always trigger a manual message
##self._workflow_service.run_triggered_workflow(message, self._trigger_type)
## TODO - check batch vs dart here!
TODO = "Check batch state vs DART here."
# 1. retrieve running instance workflows (if any) actions.
#

current_wf_instances = self.get_not_completed_workflow_instances(workflow_msg)
if not current_wf_instances:
return []

# get all actions of not completed workflow_instances and map ach action to its workflow_instance in action_2_wf_instance
incomplete_actions, jobs_2_actions, action_2_wf_instance = self.get_instance_actions(current_wf_instances)
batch_job_ids = [job.data.batch_job_id for job in incomplete_actions]
_logger.info("Zombie Check: extract job_ids {0} form incomplete actions {1}".format(batch_job_ids, [act.id for act in incomplete_actions]))

try:
batch_jobs = self._batch_client.describe_jobs(jobs=batch_job_ids)
except Exception as err:
_logger.error("Zombie Check: failed to execute batch's describe_jobs. err = {0}".format(err))
return []

self.handle_done_batch_jobs_with_not_complete_wf_instances(batch_jobs, jobs_2_actions, action_2_wf_instance)

# return an empty list since this is not associated with a particular trigger instance
return []

def teardown_trigger(self, trigger, trigger_service):
pass

def send_evaluation_message(self, workflow_json):
self._trigger_proxy.process_trigger(self._trigger_type, workflow_json)
def send_evaluation_message(self, workflow_msg):
self._trigger_proxy.process_trigger(self._trigger_type, workflow_msg)


0 comments on commit 6aedf9c

Please sign in to comment.