Skip to content

Commit

Permalink
Merge pull request #140 from anmolbabu/fixes
Browse files Browse the repository at this point in the history
Flow fixes
  • Loading branch information
r0h4n authored Jan 31, 2017
2 parents 26e6e4b + e375267 commit 2bb3ae3
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 19 deletions.
33 changes: 17 additions & 16 deletions tendrl/commons/flows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,28 +79,28 @@ def run(self):

if not ret_val:
msg = "Failed pre-run: %s for flow: %s" % \
(atom_fqn, self.job['run'])
(atom_fqn, self.help)
LOG.error(msg)
flow_utils.update_job_status(self.request_id, msg,
self.log,
'error', self.etcd_orm)
'error', tendrl_ns.etcd_orm)

raise AtomExecutionFailedError(
"Error executing pre run function: %s for flow: %s" %
(atom_fqn, self.job['run'])
(atom_fqn, self.help)
)
else:
msg = "Finished pre-run: %s for flow: %s" %\
(atom_fqn, self.job['run'])
(atom_fqn, self.help)
LOG.info(msg)
self.log['all'].append(msg)
self.log['info'].append(msg)

# Execute the atoms for the flow
msg = "Processing atoms for flow: %s" % self.job['run']
msg = "Processing atoms for flow: %s" % self.help
LOG.info(msg)
flow_utils.update_job_status(self.request_id, msg, self.log,
'info', self.etcd_orm)
'info', tendrl_ns.etcd_orm)

for atom_fqn in self.atoms:
msg = "Start atom : %s" % atom_fqn
Expand All @@ -112,27 +112,27 @@ def run(self):

if not ret_val:
msg = "Failed atom: %s on flow: %s" % \
(atom_fqn, self.job['run'])
(atom_fqn, self.help)
LOG.error(msg)
flow_utils.update_job_status(self.request_id, msg, self.log,
'error', self.etcd_orm)
'error', tendrl_ns.etcd_orm)

raise AtomExecutionFailedError(
"Error executing atom: %s on flow: %s" %
(atom_fqn, self.job['run'])
(atom_fqn, self.help)
)
else:
msg = 'Finished atom %s for flow: %s' %\
(atom_fqn, self.job['run'])
(atom_fqn, self.help)
LOG.info(msg)
self.log['all'].append(msg)
self.log['info'].append(msg)

# Execute the post runs for the flow
msg = "Processing post-runs for flow: %s" % self.job['run']
msg = "Processing post-runs for flow: %s" % self.help
LOG.info(msg)
flow_utils.update_job_status(self.request_id, msg, self.log,
'info', self.etcd_orm)
'info', tendrl_ns.etcd_orm)
if self.post_run is not None:
for atom_fqn in self.post_run:
msg = "Start post-run : %s" % atom_fqn
Expand All @@ -144,29 +144,30 @@ def run(self):

if not ret_val:
msg = "Failed post-run: %s for flow: %s" % \
(atom_fqn, self.job['run'])
(atom_fqn, self.help)
LOG.error(msg)
flow_utils.update_job_status(self.request_id, msg,
self.log,
'error', self.etcd_orm)
'error', tendrl_ns.etcd_orm)

raise AtomExecutionFailedError(
"Error executing post run function: %s" % atom_fqn
)
else:
msg = "Finished post-run: %s for flow: %s" %\
(atom_fqn, self.job['run'])
(atom_fqn, self.help)
LOG.info(msg)
flow_utils.update_job_status(self.request_id, msg,
self.log,
'info', self.etcd_orm)
'info', tendrl_ns.etcd_orm)

def execute_atom(self, atom_fqn):
# atom_fqn eg: tendrl.node_agent.objects.abc.atoms.xyz

if "tendrl" in atom_fqn and "atoms" in atom_fqn:
obj_name, atom_name = atom_fqn.split(".objects.")[-1].split(
".atoms.")
atom_name = atom_name.split(".")[-1]
atom = tendrl_ns.get_atom(obj_name, atom_name)
try:
ret_val = atom(
Expand Down
2 changes: 1 addition & 1 deletion tendrl/commons/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def _process_job(self, raw_job, job_key):
# Generate a request ID for tracking this job
# further by tendrl-api
req_id = str(uuid.uuid4())
if tendrl_ns.type == "node":
if tendrl_ns.type == "node" or tendrl_ns.type == "monitoring":
raw_job['request_id'] = "nodes/%s/_jobs/%s_%s" % (
tendrl_ns.node_context.node_id, raw_job['run'],
req_id)
Expand Down
4 changes: 2 additions & 2 deletions tendrl/commons/objects/atoms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def __init__(
help=None
):

obj_def = tendrl_ns.definitions.get_obj_defs(tendrl_ns.to_str,
obj_def = tendrl_ns.definitions.get_obj_definition(tendrl_ns.to_str,
self.obj.__name__)
atom_def = obj_def.atoms[self.__class__.__name__]

Expand All @@ -24,7 +24,7 @@ def __init__(
self.outputs = outputs or atom_def['outputs']
self.uuid = uuid or atom_def['uuid']
self.help = help or atom_def['help']
self.paramaters = parameters
self.parameters = parameters

@abc.abstractmethod
def run(self):
Expand Down

0 comments on commit 2bb3ae3

Please sign in to comment.