Skip to content

Commit

Permalink
[GOBBLIN-2142] fix bug in determining if a dag is finished or not (ap…
Browse files Browse the repository at this point in the history
…ache#4037)

* fix bug in determining if a dag is finished or not
* address review comments
  • Loading branch information
arjun4084346 authored Aug 26, 2024
1 parent 7c8127c commit b28b468
Show file tree
Hide file tree
Showing 10 changed files with 464 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,6 @@ Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>> getDagNodeWit
*/
Optional<JobStatus> getJobStatus(DagNodeId dagNodeId);

/**
* Returns true if the {@link Dag} identified by the given {@link org.apache.gobblin.service.modules.orchestration.DagManager.DagId}
* has any running job, false otherwise.
*/
boolean hasRunningJobs(DagManager.DagId dagId) throws IOException;

/**
* Check if an action exists in dagAction store by flow group, flow name, flow execution id, and job name.
* @param flowGroup flow group for the dag action
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ public static Set<DagNode<JobExecutionPlan>> getNext(Dag<JobExecutionPlan> dag)
switch (failureOption) {
case FINISH_RUNNING:
return new HashSet<>();
// todo - FINISH_ALL_POSSIBLE should probably `continue` not `break`
case FINISH_ALL_POSSIBLE:
default:
break;
Expand All @@ -228,7 +229,7 @@ public static Set<DagNode<JobExecutionPlan>> getNext(Dag<JobExecutionPlan> dag)
return nextNodesToExecute;
}

static FailureOption getFailureOption(Dag<JobExecutionPlan> dag) {
public static FailureOption getFailureOption(Dag<JobExecutionPlan> dag) {
if (dag.isEmpty()) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.service.monitoring.JobStatus;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.util.ConfigUtils;
Expand Down Expand Up @@ -209,13 +208,6 @@ public Optional<JobStatus> getJobStatus(DagNodeId dagNodeId) {
}
}


@Override
public boolean hasRunningJobs(DagManager.DagId dagId) throws IOException {
return getDagNodes(dagId).stream()
.anyMatch(node -> !FlowStatusGenerator.FINISHED_STATUSES.contains(node.getValue().getExecutionStatus().name()));
}

@Override
public boolean existsJobDagAction(String flowGroup, String flowName, long flowExecutionId, String jobName,
DagActionStore.DagActionType dagActionType) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -54,7 +55,7 @@
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PropertiesUtils;

import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
import static org.apache.gobblin.service.ExecutionStatus.*;


/**
Expand Down Expand Up @@ -175,7 +176,7 @@ public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel,
}
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(), cancelJobArgs).get();
// add back the dag node with updated states in the store
dagNodeToCancel.getValue().setExecutionStatus(CANCELLED);
dagNodeToCancel.getValue().setExecutionStatus(ExecutionStatus.CANCELLED);
dagManagementStateStore.addDagNodeState(dagNodeToCancel, dagId);
// send cancellation event after updating the state, because cancellation event triggers a ReevaluateDagAction
// that will delete the dag. Due to race condition between adding dag node and deleting dag, state store may get
Expand Down Expand Up @@ -291,4 +292,76 @@ public static void removeFlowFinishDeadlineDagAction(DagManagementStateStore dag
log.warn("Failed to delete dag action {}", enforceFlowFinishDeadlineDagAction);
}
}

/**
* Returns true if all dag nodes are finished, and it is not possible to run any new dag node.
* If failure option is {@link org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption#FINISH_RUNNING},
* no new jobs should be orchestrated, so even if some job can run, dag should be considered finished.
*/
public static boolean isDagFinished(Dag<JobExecutionPlan> dag) {
/*
The algo for this method is that it adds all the dag nodes into a set `canRun` that signifies all the nodes that can
run in this dag. This also includes all the jobs that are completed. It scans all the nodes and if the node is
completed it adds it to the `completed` set; if the node is failed/cancelled it removes all its dependant nodes from
`canRun` set. In the end if there are more nodes that "canRun" than "completed", dag is not finished.
For FINISH_RUNNING failure option, there is an additional condition that all the remaining `canRun` jobs should already
be running/orchestrated/pending_retry/pending_resume. Basically they should already be out of PENDING state, in order
for dag to be considered "NOT FINISHED".
*/
List<Dag.DagNode<JobExecutionPlan>> nodes = dag.getNodes();
Set<Dag.DagNode<JobExecutionPlan>> canRun = new HashSet<>(nodes);
Set<Dag.DagNode<JobExecutionPlan>> completed = new HashSet<>();
boolean anyFailure = false;

for (Dag.DagNode<JobExecutionPlan> node : nodes) {
if (!canRun.contains(node)) {
continue;
}
ExecutionStatus status = node.getValue().getExecutionStatus();
if (status == ExecutionStatus.FAILED || status == ExecutionStatus.CANCELLED) {
anyFailure = true;
removeDescendantsFromCanRun(node, dag, canRun);
completed.add(node);
} else if (status == ExecutionStatus.COMPLETE) {
completed.add(node);
} else if (status == ExecutionStatus.PENDING) {
// Remove PENDING node if its parents are not in canRun, this means remove the pending nodes also from canRun set
// if its parents cannot run
if (!areAllParentsInCanRun(node, canRun)) {
canRun.remove(node);
}
} else if (!(status == COMPILED || status == PENDING_RESUME || status == PENDING_RETRY || status == ORCHESTRATED ||
status == RUNNING)) {
throw new RuntimeException("Unexpected status " + status + " for dag node " + node);
}
}

assert canRun.size() >= completed.size();

DagManager.FailureOption failureOption = DagManagerUtils.getFailureOption(dag);

if (!anyFailure || failureOption == DagManager.FailureOption.FINISH_ALL_POSSIBLE) {
// In the end, check if there are more nodes in canRun than completed
return canRun.size() == completed.size();
} else if (failureOption == DagManager.FailureOption.FINISH_RUNNING) {
// if all the remaining jobs are pending/compiled (basically not started yet) return true
canRun.removeAll(completed);
return canRun.stream().allMatch(node -> (node.getValue().getExecutionStatus() == PENDING || node.getValue().getExecutionStatus() == COMPILED));
} else {
throw new RuntimeException("Unexpected failure option " + failureOption);
}
}

private static void removeDescendantsFromCanRun(Dag.DagNode<JobExecutionPlan> node, Dag<JobExecutionPlan> dag,
Set<Dag.DagNode<JobExecutionPlan>> canRun) {
for (Dag.DagNode<JobExecutionPlan> child : dag.getChildren(node)) {
canRun.remove(child);
removeDescendantsFromCanRun(child, dag, canRun); // Recursively remove all descendants
}
}

private static boolean areAllParentsInCanRun(Dag.DagNode<JobExecutionPlan> node,
Set<Dag.DagNode<JobExecutionPlan>> canRun) {
return node.getParentNodes() == null || canRun.containsAll(node.getParentNodes());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ protected void act(DagManagementStateStore dagManagementStateStore, Pair<Optiona
// set to PASS, which would be incorrect.
dag.setFlowEvent(null);
DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode, getDagId());
} else if (!dagManagementStateStore.hasRunningJobs(getDagId())) {
} else if (DagProcUtils.isDagFinished(dag)) {
if (dag.getFlowEvent() == null) {
// If the dag flow event is not set and there are no more jobs running, then it is successful
// also note that `onJobFinish` method does whatever is required to do after job finish, determining a Dag's
Expand Down Expand Up @@ -160,7 +160,9 @@ private void onJobFinish(DagManagementStateStore dagManagementStateStore, Dag.Da
break;
case COMPLETE:
dagManagementStateStore.getDagManagerMetrics().incrementExecutorSuccess(dagNode);
DagProcUtils.submitNextNodes(dagManagementStateStore, dag, getDagId());
if (!DagProcUtils.isDagFinished(dag)) { // this may fail when dag failure option is finish_running and some dag node has failed
DagProcUtils.submitNextNodes(dagManagementStateStore, dag, getDagId());
}
break;
default:
log.warn("It should not reach here. Job status {} is unexpected.", executionStatus);
Expand Down
Loading

0 comments on commit b28b468

Please sign in to comment.