From b28b468091e26eb632292ee46fd8aa1e97bbb15e Mon Sep 17 00:00:00 2001 From: Arjun Singh Bora Date: Mon, 26 Aug 2024 14:43:59 -0700 Subject: [PATCH] [GOBBLIN-2142] fix bug in determining if a dag is finished or not (#4037) * fix bug in determining if a dag is finished or not * address review comments --- .../DagManagementStateStore.java | 6 - .../orchestration/DagManagerUtils.java | 3 +- .../MySqlDagManagementStateStore.java | 8 - .../orchestration/proc/DagProcUtils.java | 77 +++- .../orchestration/proc/ReevaluateDagProc.java | 6 +- .../orchestration/DagManagerUtilsTest.java | 358 ++++++++++++++++++ .../orchestration/proc/KillDagProcTest.java | 1 + .../orchestration/proc/LaunchDagProcTest.java | 8 + .../proc/ReevaluateDagProcTest.java | 18 +- .../orchestration/proc/ResumeDagProcTest.java | 3 + 10 files changed, 464 insertions(+), 24 deletions(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java index 139082545b2..0a8514f2740 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java @@ -143,12 +143,6 @@ Pair>, Optional> getDagNodeWit */ Optional getJobStatus(DagNodeId dagNodeId); - /** - * Returns true if the {@link Dag} identified by the given {@link org.apache.gobblin.service.modules.orchestration.DagManager.DagId} - * has any running job, false otherwise. - */ - boolean hasRunningJobs(DagManager.DagId dagId) throws IOException; - /** * Check if an action exists in dagAction store by flow group, flow name, flow execution id, and job name. * @param flowGroup flow group for the dag action diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java index 45ad84f91ac..a6f6d527bfe 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java @@ -219,6 +219,7 @@ public static Set> getNext(Dag dag) switch (failureOption) { case FINISH_RUNNING: return new HashSet<>(); + // todo - FINISH_ALL_POSSIBLE should probably `continue` not `break` case FINISH_ALL_POSSIBLE: default: break; @@ -228,7 +229,7 @@ public static Set> getNext(Dag dag) return nextNodesToExecute; } - static FailureOption getFailureOption(Dag dag) { + public static FailureOption getFailureOption(Dag dag) { if (dag.isEmpty()) { return null; } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java index c0984f835b2..45ee013c7dc 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java @@ -43,7 +43,6 @@ import org.apache.gobblin.service.modules.flowgraph.Dag; import org.apache.gobblin.service.modules.flowgraph.DagNodeId; import org.apache.gobblin.service.modules.spec.JobExecutionPlan; -import org.apache.gobblin.service.monitoring.FlowStatusGenerator; import org.apache.gobblin.service.monitoring.JobStatus; import org.apache.gobblin.service.monitoring.JobStatusRetriever; import org.apache.gobblin.util.ConfigUtils; @@ -209,13 +208,6 @@ public Optional getJobStatus(DagNodeId dagNodeId) { } } - - @Override - public boolean hasRunningJobs(DagManager.DagId dagId) throws IOException { - return getDagNodes(dagId).stream() - .anyMatch(node -> !FlowStatusGenerator.FINISHED_STATUSES.contains(node.getValue().getExecutionStatus().name())); - } - @Override public boolean existsJobDagAction(String flowGroup, String flowName, long flowExecutionId, String jobName, DagActionStore.DagActionType dagActionType) throws IOException { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java index 289454502f6..0a9f6dcd67b 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.sql.SQLIntegrityConstraintViolationException; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; @@ -54,7 +55,7 @@ import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.PropertiesUtils; -import static org.apache.gobblin.service.ExecutionStatus.CANCELLED; +import static org.apache.gobblin.service.ExecutionStatus.*; /** @@ -175,7 +176,7 @@ public static void cancelDagNode(Dag.DagNode dagNodeToCancel, } DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(), cancelJobArgs).get(); // add back the dag node with updated states in the store - dagNodeToCancel.getValue().setExecutionStatus(CANCELLED); + dagNodeToCancel.getValue().setExecutionStatus(ExecutionStatus.CANCELLED); dagManagementStateStore.addDagNodeState(dagNodeToCancel, dagId); // send cancellation event after updating the state, because cancellation event triggers a ReevaluateDagAction // that will delete the dag. Due to race condition between adding dag node and deleting dag, state store may get @@ -291,4 +292,76 @@ public static void removeFlowFinishDeadlineDagAction(DagManagementStateStore dag log.warn("Failed to delete dag action {}", enforceFlowFinishDeadlineDagAction); } } + + /** + * Returns true if all dag nodes are finished, and it is not possible to run any new dag node. + * If failure option is {@link org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption#FINISH_RUNNING}, + * no new jobs should be orchestrated, so even if some job can run, dag should be considered finished. + */ + public static boolean isDagFinished(Dag dag) { + /* + The algo for this method is that it adds all the dag nodes into a set `canRun` that signifies all the nodes that can + run in this dag. This also includes all the jobs that are completed. It scans all the nodes and if the node is + completed it adds it to the `completed` set; if the node is failed/cancelled it removes all its dependant nodes from + `canRun` set. In the end if there are more nodes that "canRun" than "completed", dag is not finished. + For FINISH_RUNNING failure option, there is an additional condition that all the remaining `canRun` jobs should already + be running/orchestrated/pending_retry/pending_resume. Basically they should already be out of PENDING state, in order + for dag to be considered "NOT FINISHED". + */ + List> nodes = dag.getNodes(); + Set> canRun = new HashSet<>(nodes); + Set> completed = new HashSet<>(); + boolean anyFailure = false; + + for (Dag.DagNode node : nodes) { + if (!canRun.contains(node)) { + continue; + } + ExecutionStatus status = node.getValue().getExecutionStatus(); + if (status == ExecutionStatus.FAILED || status == ExecutionStatus.CANCELLED) { + anyFailure = true; + removeDescendantsFromCanRun(node, dag, canRun); + completed.add(node); + } else if (status == ExecutionStatus.COMPLETE) { + completed.add(node); + } else if (status == ExecutionStatus.PENDING) { + // Remove PENDING node if its parents are not in canRun, this means remove the pending nodes also from canRun set + // if its parents cannot run + if (!areAllParentsInCanRun(node, canRun)) { + canRun.remove(node); + } + } else if (!(status == COMPILED || status == PENDING_RESUME || status == PENDING_RETRY || status == ORCHESTRATED || + status == RUNNING)) { + throw new RuntimeException("Unexpected status " + status + " for dag node " + node); + } + } + + assert canRun.size() >= completed.size(); + + DagManager.FailureOption failureOption = DagManagerUtils.getFailureOption(dag); + + if (!anyFailure || failureOption == DagManager.FailureOption.FINISH_ALL_POSSIBLE) { + // In the end, check if there are more nodes in canRun than completed + return canRun.size() == completed.size(); + } else if (failureOption == DagManager.FailureOption.FINISH_RUNNING) { + // if all the remaining jobs are pending/compiled (basically not started yet) return true + canRun.removeAll(completed); + return canRun.stream().allMatch(node -> (node.getValue().getExecutionStatus() == PENDING || node.getValue().getExecutionStatus() == COMPILED)); + } else { + throw new RuntimeException("Unexpected failure option " + failureOption); + } + } + + private static void removeDescendantsFromCanRun(Dag.DagNode node, Dag dag, + Set> canRun) { + for (Dag.DagNode child : dag.getChildren(node)) { + canRun.remove(child); + removeDescendantsFromCanRun(child, dag, canRun); // Recursively remove all descendants + } + } + + private static boolean areAllParentsInCanRun(Dag.DagNode node, + Set> canRun) { + return node.getParentNodes() == null || canRun.containsAll(node.getParentNodes()); + } } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java index d55d5a425e1..ef554f64189 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java @@ -104,7 +104,7 @@ protected void act(DagManagementStateStore dagManagementStateStore, Pair dag = + DagManagerTest.buildDag(id, flowExecutionId, flowFailureOption, 1, proxyUser, additionalConfig); + + setJobStatuses(dag, Collections.singletonList(COMPLETE)); + Assert.assertTrue(DagProcUtils.isDagFinished(dag)); + + setJobStatuses(dag, Collections.singletonList(FAILED)); + Assert.assertTrue(DagProcUtils.isDagFinished(dag)); + + setJobStatuses(dag, Collections.singletonList(CANCELLED)); + Assert.assertTrue(DagProcUtils.isDagFinished(dag)); + + setJobStatuses(dag, Collections.singletonList(PENDING)); + Assert.assertFalse(DagProcUtils.isDagFinished(dag)); + + setJobStatuses(dag, Collections.singletonList(PENDING_RETRY)); + Assert.assertFalse(DagProcUtils.isDagFinished(dag)); + + setJobStatuses(dag, Collections.singletonList(PENDING_RESUME)); + Assert.assertFalse(DagProcUtils.isDagFinished(dag)); + + setJobStatuses(dag, Collections.singletonList(ORCHESTRATED)); + Assert.assertFalse(DagProcUtils.isDagFinished(dag)); + + setJobStatuses(dag, Collections.singletonList(RUNNING)); + Assert.assertFalse(DagProcUtils.isDagFinished(dag)); + } + + @Test + public void testIsDagFinishedTwoNodes() throws URISyntaxException { + Dag dag = + DagManagerTest.buildDag(id, flowExecutionId, flowFailureOption, 2, proxyUser, additionalConfig); + + setJobStatuses(dag, Arrays.asList(COMPLETE, PENDING)); + Assert.assertFalse(DagProcUtils.isDagFinished(dag)); + + setJobStatuses(dag, Arrays.asList(COMPLETE, FAILED)); + Assert.assertTrue(DagProcUtils.isDagFinished(dag)); + + setJobStatuses(dag, Arrays.asList(FAILED, PENDING)); + Assert.assertTrue(DagProcUtils.isDagFinished(dag)); + + setJobStatuses(dag, Arrays.asList(CANCELLED, PENDING)); + Assert.assertTrue(DagProcUtils.isDagFinished(dag)); + } + + @Test + public void testIsDagFinishedThreeNodes() throws URISyntaxException { + Dag dag = buildComplexDag3(); + + setJobStatuses(dag, Arrays.asList(COMPLETE, PENDING, PENDING)); + Assert.assertFalse(DagProcUtils.isDagFinished(dag)); + + setJobStatuses(dag, Arrays.asList(COMPLETE, FAILED, PENDING)); + Assert.assertTrue(DagProcUtils.isDagFinished(dag)); + + setJobStatuses(dag, Arrays.asList(COMPLETE, CANCELLED, PENDING)); + Assert.assertTrue(DagProcUtils.isDagFinished(dag)); + } + + @Test + public void testIsDagFinishedFourNodes() throws URISyntaxException { + Dag dag = buildLinearDagOf4Nodes(); + + setJobStatuses(dag, Arrays.asList(COMPLETE, PENDING, PENDING, PENDING)); + Assert.assertFalse(DagProcUtils.isDagFinished(dag)); + + setJobStatuses(dag, Arrays.asList(FAILED, PENDING, PENDING, PENDING)); + Assert.assertTrue(DagProcUtils.isDagFinished(dag)); + + setJobStatuses(dag, Arrays.asList(CANCELLED, PENDING, PENDING, PENDING)); + Assert.assertTrue(DagProcUtils.isDagFinished(dag)); + + setJobStatuses(dag, Arrays.asList(PENDING, PENDING, PENDING, PENDING)); + Assert.assertFalse(DagProcUtils.isDagFinished(dag)); + } + + @Test + public void testIsDagFinishedMultiNodes() throws URISyntaxException { + Dag dag = buildComplexDag1(); + setJobStatuses(dag, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE)); + Assert.assertTrue(DagProcUtils.isDagFinished(dag)); + Collections.shuffle(dag.getNodes()); + Assert.assertTrue(DagProcUtils.isDagFinished(dag)); + + Dag dag2 = buildComplexDag1(); + setJobStatuses(dag2, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, PENDING, COMPLETE, COMPLETE, PENDING, COMPLETE, PENDING)); + Assert.assertFalse(DagProcUtils.isDagFinished(dag2)); + Collections.shuffle(dag2.getNodes()); + Assert.assertFalse(DagProcUtils.isDagFinished(dag2)); + + Dag dag3 = buildComplexDag1(); + setJobStatuses(dag3, Arrays.asList(FAILED, COMPLETE, COMPLETE, COMPLETE, PENDING, COMPLETE, COMPLETE, PENDING, COMPLETE, PENDING)); + Assert.assertTrue(DagProcUtils.isDagFinished(dag3)); + Collections.shuffle(dag3.getNodes()); + Assert.assertTrue(DagProcUtils.isDagFinished(dag3)); + + Dag dag4 = buildComplexDag1(); + setJobStatuses(dag4, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, CANCELLED, COMPLETE, PENDING, PENDING, PENDING)); + Assert.assertFalse(DagProcUtils.isDagFinished(dag4)); + Collections.shuffle(dag4.getNodes()); + Assert.assertFalse(DagProcUtils.isDagFinished(dag4)); + + Dag dag5 = buildComplexDag1(); + setJobStatuses(dag5, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, CANCELLED, COMPLETE, COMPLETE, PENDING, PENDING)); + Assert.assertTrue(DagProcUtils.isDagFinished(dag5)); + Collections.shuffle(dag5.getNodes()); + Assert.assertTrue(DagProcUtils.isDagFinished(dag5)); + + Dag dag6 = buildComplexDag1(); + setJobStatuses(dag6, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, PENDING_RESUME, COMPLETE, COMPLETE, PENDING, PENDING)); + Assert.assertFalse(DagProcUtils.isDagFinished(dag6)); + Collections.shuffle(dag6.getNodes()); + Assert.assertFalse(DagProcUtils.isDagFinished(dag6)); + + Dag dag7 = buildComplexDag1(); + setJobStatuses(dag7, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, PENDING_RETRY, COMPLETE, COMPLETE, PENDING, PENDING)); + Assert.assertFalse(DagProcUtils.isDagFinished(dag7)); + Collections.shuffle(dag7.getNodes()); + Assert.assertFalse(DagProcUtils.isDagFinished(dag7)); + + Dag dag8 = buildComplexDag1(); + setJobStatuses(dag8, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, RUNNING, COMPLETE, COMPLETE, PENDING, PENDING)); + Assert.assertFalse(DagProcUtils.isDagFinished(dag8)); + Collections.shuffle(dag8.getNodes()); + Assert.assertFalse(DagProcUtils.isDagFinished(dag8)); + + Dag dag9 = buildComplexDag1(); + setJobStatuses(dag9, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, FAILED, COMPLETE, COMPLETE, PENDING, COMPLETE, PENDING, COMPLETE)); + Assert.assertFalse(DagProcUtils.isDagFinished(dag9)); + Collections.shuffle(dag9.getNodes()); + Assert.assertFalse(DagProcUtils.isDagFinished(dag9)); + } + + @Test + public void testIsDagFinishedWithFinishRunningFailureOptionTwoNodes() throws URISyntaxException { + Dag dag = + DagManagerTest.buildDag(id, flowExecutionId, flowFailureOption, 2, proxyUser, additionalConfig); + + setJobStatuses(dag, Arrays.asList(FAILED, PENDING)); + Assert.assertTrue(DagProcUtils.isDagFinished(dag)); + + setJobStatuses(dag, Arrays.asList(CANCELLED, PENDING)); + Assert.assertTrue(DagProcUtils.isDagFinished(dag)); + } + + @Test + public void testIsDagFinishedWithFinishRunningFailureOptionMultiNodes() throws URISyntaxException { + Dag dag = buildComplexDagWithFinishRunningFailureOption(); + + setJobStatuses(dag, Arrays.asList(COMPLETE, CANCELLED, COMPLETE, PENDING, PENDING)); + Assert.assertTrue(DagProcUtils.isDagFinished(dag)); + + setJobStatuses(dag, Arrays.asList(COMPLETE, CANCELLED, COMPLETE, RUNNING, PENDING)); + Assert.assertFalse(DagProcUtils.isDagFinished(dag)); + } + + private void setJobStatuses(Dag dag, List statuses) { + int i=0; + for (ExecutionStatus status : statuses) { + dag.getNodes().get(i++).getValue().setExecutionStatus(status); + } + } + + // This creates a dag like this + // D0 D1 D2 D3 + // | | | \ | + // D4 D5 | D6 + // | | \| + // D7 | D8 + // \ | / + // D9 + + public static Dag buildComplexDag1() throws URISyntaxException { + List jobExecutionPlans = new ArrayList<>(); + String id = "1"; + String flowGroup = "fg"; + String flowName = "fn"; + long flowExecutionId = 12345L; + String flowFailureOption = DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(); + String proxyUser = "user5"; + Config additionalConfig = ConfigFactory.empty() + .withValue(ConfigurationKeys.FLOW_GROUP_KEY, ConfigValueFactory.fromAnyRef(flowGroup)) + .withValue(ConfigurationKeys.FLOW_NAME_KEY, ConfigValueFactory.fromAnyRef(flowName)) + .withValue(ConfigurationKeys.JOB_GROUP_KEY, ConfigValueFactory.fromAnyRef(flowGroup)) + .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, ConfigValueFactory.fromAnyRef( + MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)); + + for (int i = 0; i < 10; i++) { + String suffix = Integer.toString(i); + Config jobConfig = ConfigBuilder.create(). + addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "group" + id). + addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flow" + id). + addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId). + addPrimitive(ConfigurationKeys.JOB_GROUP_KEY, "group" + id). + addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job" + suffix). + addPrimitive(ConfigurationKeys.FLOW_FAILURE_OPTION, flowFailureOption). + addPrimitive(AzkabanProjectConfig.USER_TO_PROXY, proxyUser).build(); + jobConfig = additionalConfig.withFallback(jobConfig); + if (i == 4) { + jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, ConfigValueFactory.fromAnyRef("job0")); + } else if (i == 5) { + jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, ConfigValueFactory.fromAnyRef("job1")); + } if (i == 6) { + jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, ConfigValueFactory.fromAnyRef("job2,job3")); + } else if (i == 7) { + jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, ConfigValueFactory.fromAnyRef("job4")); + } else if (i == 8) { + jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, ConfigValueFactory.fromAnyRef("job5,job2")); + } else if (i == 9) { + jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, ConfigValueFactory.fromAnyRef("job7,job5,job8")); + } + JobSpec js = JobSpec.builder("test_job" + suffix).withVersion(suffix).withConfig(jobConfig). + withTemplate(new URI("job" + suffix)).build(); + SpecExecutor specExecutor = MockedSpecExecutor.createDummySpecExecutor(new URI( + ConfigUtils.getString(additionalConfig, ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,"job" + i))); + JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js, specExecutor); + jobExecutionPlans.add(jobExecutionPlan); + } + return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans); + } + + // This creates a dag like this + // D0 -> D1 -> D2 -> D3 + public static Dag buildLinearDagOf4Nodes() throws URISyntaxException { + List jobExecutionPlans = new ArrayList<>(); + + for (int i = 0; i < 4; i++) { + String suffix = Integer.toString(i); + Config jobConfig = ConfigBuilder.create(). + addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "group" + id). + addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flow" + id). + addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId). + addPrimitive(ConfigurationKeys.JOB_GROUP_KEY, "group" + id). + addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job" + suffix). + addPrimitive(ConfigurationKeys.FLOW_FAILURE_OPTION, flowFailureOption). + addPrimitive(AzkabanProjectConfig.USER_TO_PROXY, proxyUser).build(); + jobConfig = additionalConfig.withFallback(jobConfig); + if (i == 1) { + jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, ConfigValueFactory.fromAnyRef("job0")); + } else if (i == 2) { + jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, ConfigValueFactory.fromAnyRef("job1")); + } if (i == 3) { + jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, ConfigValueFactory.fromAnyRef("job2")); + } + JobSpec js = JobSpec.builder("test_job" + suffix).withVersion(suffix).withConfig(jobConfig). + withTemplate(new URI("job" + suffix)).build(); + SpecExecutor specExecutor = MockedSpecExecutor.createDummySpecExecutor(new URI( + ConfigUtils.getString(additionalConfig, ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,"job" + i))); + JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js, specExecutor); + jobExecutionPlans.add(jobExecutionPlan); + } + return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans); + } + + // This creates a dag like this + // D0 D1 + // \/ + // D2 + public static Dag buildComplexDag3() throws URISyntaxException { + List jobExecutionPlans = new ArrayList<>(); + + for (int i = 0; i < 3; i++) { + String suffix = Integer.toString(i); + Config jobConfig = ConfigBuilder.create(). + addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "group" + id). + addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flow" + id). + addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId). + addPrimitive(ConfigurationKeys.JOB_GROUP_KEY, "group" + id). + addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job" + suffix). + addPrimitive(ConfigurationKeys.FLOW_FAILURE_OPTION, flowFailureOption). + addPrimitive(AzkabanProjectConfig.USER_TO_PROXY, proxyUser).build(); + jobConfig = additionalConfig.withFallback(jobConfig); + if (i == 2) { + jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, ConfigValueFactory.fromAnyRef("job0,job1")); + } + JobSpec js = JobSpec.builder("test_job" + suffix).withVersion(suffix).withConfig(jobConfig). + withTemplate(new URI("job" + suffix)).build(); + SpecExecutor specExecutor = MockedSpecExecutor.createDummySpecExecutor(new URI( + ConfigUtils.getString(additionalConfig, ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,"job" + i))); + JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js, specExecutor); + jobExecutionPlans.add(jobExecutionPlan); + } + return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans); + } + + // This creates a dag like this + // D0 + // / \ + // D1 D2 + // / \ + // D3 D4 + public static Dag buildComplexDagWithFinishRunningFailureOption() throws URISyntaxException { + List jobExecutionPlans = new ArrayList<>(); + + for (int i = 0; i < 5; i++) { + String suffix = Integer.toString(i); + Config jobConfig = ConfigBuilder.create(). + addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "group" + id). + addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flow" + id). + addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId). + addPrimitive(ConfigurationKeys.JOB_GROUP_KEY, "group" + id). + addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job" + suffix). + addPrimitive(ConfigurationKeys.FLOW_FAILURE_OPTION, DagManager.FailureOption.FINISH_RUNNING.name()). + addPrimitive(AzkabanProjectConfig.USER_TO_PROXY, proxyUser).build(); + jobConfig = additionalConfig.withFallback(jobConfig); + if (i == 1) { + jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, ConfigValueFactory.fromAnyRef("job0")); + } else if (i == 2) { + jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, ConfigValueFactory.fromAnyRef("job0")); + } else if (i == 3 || i == 4) { + jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, ConfigValueFactory.fromAnyRef("job2")); + } + JobSpec js = JobSpec.builder("test_job" + suffix).withVersion(suffix).withConfig(jobConfig). + withTemplate(new URI("job" + suffix)).build(); + SpecExecutor specExecutor = MockedSpecExecutor.createDummySpecExecutor(new URI( + ConfigUtils.getString(additionalConfig, ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,"job" + i))); + JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js, specExecutor); + jobExecutionPlans.add(jobExecutionPlan); + } + return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans); + } } diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java index cd34b47cb9f..33dc8f49e22 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java @@ -180,6 +180,7 @@ public void killDagNode() throws IOException, URISyntaxException, InterruptedExc .withValue(ConfigurationKeys.FLOW_GROUP_KEY, ConfigValueFactory.fromAnyRef("fg")) .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, ConfigValueFactory.fromAnyRef( MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))); + DagManager.DagId dagId = DagManagerUtils.generateDagId(dag); FlowCompilationValidationHelper flowCompilationValidationHelper = mock(FlowCompilationValidationHelper.class); JobStatus jobStatus = JobStatus.builder().flowName("job0").flowGroup("fg").jobGroup("fg").jobName("job0").flowExecutionId(flowExecutionId). diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java index 99caa9a0055..ab426dedf18 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java @@ -29,6 +29,7 @@ import org.mockito.Mockito; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; +import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; @@ -59,6 +60,7 @@ import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; import org.apache.gobblin.service.modules.orchestration.DagManager; import org.apache.gobblin.service.modules.orchestration.DagManagerTest; +import org.apache.gobblin.service.modules.orchestration.DagManagerUtils; import org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStore; import org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStoreTest; import org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics; @@ -121,6 +123,7 @@ public void launchDag() throws IOException, InterruptedException, URISyntaxExcep .withValue(ConfigurationKeys.FLOW_NAME_KEY, ConfigValueFactory.fromAnyRef(flowName)) .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, ConfigValueFactory.fromAnyRef( MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))); + DagManager.DagId dagId = DagManagerUtils.generateDagId(dag); FlowCompilationValidationHelper flowCompilationValidationHelper = mock(FlowCompilationValidationHelper.class); doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any()); List> specProducers = ReevaluateDagProcTest.getDagSpecProducers(dag); @@ -144,6 +147,8 @@ public void launchDag() throws IOException, InterruptedException, URISyntaxExcep // FLOW_RUNNING is emitted exactly once per flow during the execution of LaunchDagProc Mockito.verify(this.mockedEventSubmitter, Mockito.times(1)) .submit(eq(TimingEvent.FlowTimings.FLOW_RUNNING), anyMap()); + + Assert.assertFalse(DagProcUtils.isDagFinished(this.dagManagementStateStore.getDag(dagId).get())); } @Test @@ -157,6 +162,7 @@ public void launchDagWithMultipleParallelJobs() throws IOException, InterruptedE .withValue(ConfigurationKeys.FLOW_NAME_KEY, ConfigValueFactory.fromAnyRef(flowName)) .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, ConfigValueFactory.fromAnyRef( MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))); + DagManager.DagId dagId = DagManagerUtils.generateDagId(dag); FlowCompilationValidationHelper flowCompilationValidationHelper = mock(FlowCompilationValidationHelper.class); doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any()); LaunchDagProc launchDagProc = new LaunchDagProc( @@ -174,6 +180,8 @@ public void launchDagWithMultipleParallelJobs() throws IOException, InterruptedE // FLOW_RUNNING is emitted exactly once per flow during the execution of LaunchDagProc Mockito.verify(this.mockedEventSubmitter, Mockito.times(1)) .submit(eq(TimingEvent.FlowTimings.FLOW_RUNNING), anyMap()); + + Assert.assertFalse(DagProcUtils.isDagFinished(this.dagManagementStateStore.getDag(dagId).get())); } // This creates a dag like this diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java index bcf16c7e407..651cc441dfe 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java @@ -128,6 +128,9 @@ public void testOneNextJobToRun() throws Exception { // assert that the first job is completed Assert.assertEquals(ExecutionStatus.COMPLETE, this.dagManagementStateStore.getDag(dagId).get().getStartNodes().get(0).getValue().getExecutionStatus()); + + // note that only assertFalse can be tested on DagProcUtils.isDagFinished, because if it had returned true, dag must have been cleaned + Assert.assertFalse(DagProcUtils.isDagFinished(this.dagManagementStateStore.getDag(dagId).get())); } // test when there does not exist a next job in the dag when the current job's reevaluate dag action is processed @@ -161,7 +164,7 @@ public void testNoNextJobToRun() throws Exception { doReturn(new ImmutablePair<>(Optional.of(mockedDag.getNodes().get(0)), Optional.of(jobStatus))) .when(dagManagementStateStore).getDagNodeWithJobStatus(any()); - Assert.assertTrue(dagManagementStateStore.hasRunningJobs(dagId)); + Assert.assertFalse(DagProcUtils.isDagFinished(this.dagManagementStateStore.getDag(dagId).get())); List> specProducers = getDagSpecProducers(dag); @@ -180,8 +183,6 @@ public void testNoNextJobToRun() throws Exception { Assert.assertEquals(Mockito.mockingDetails(this.dagManagementStateStore).getInvocations().stream() .filter(a -> a.getMethod().getName().equals("deleteDagAction")).count(), 1); - - Assert.assertFalse(dagManagementStateStore.hasRunningJobs(dagId)); } @Test @@ -195,6 +196,7 @@ public void testCurrentJobToRun() throws Exception { .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, ConfigValueFactory.fromAnyRef( MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)) ); + DagManager.DagId dagId = DagManagerUtils.generateDagId(dag); List> specProducers = getDagSpecProducers(dag); dagManagementStateStore.addDag(dag); doReturn(new ImmutablePair<>(Optional.of(dag.getNodes().get(0)), Optional.empty())) @@ -216,6 +218,8 @@ public void testCurrentJobToRun() throws Exception { Mockito.verify(dagManagementStateStore, Mockito.never()).deleteDagAction(any()); Mockito.verify(dagManagementStateStore, Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(), eq(DagActionStore.DagActionType.REEVALUATE)); + + Assert.assertFalse(DagProcUtils.isDagFinished(this.dagManagementStateStore.getDag(dagId).get())); } @Test @@ -229,6 +233,7 @@ public void testMultipleNextJobToRun() throws Exception { .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, ConfigValueFactory.fromAnyRef( MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)) ); + DagManager.DagId dagId = DagManagerUtils.generateDagId(dag); JobStatus jobStatus = JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup) .jobName("job3").flowExecutionId(flowExecutionId).message("Test message").eventName(ExecutionStatus.COMPLETE.name()) .startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build(); @@ -260,6 +265,8 @@ public void testMultipleNextJobToRun() throws Exception { // when there are parallel jobs to launch, they are not directly sent to spec producers, instead reevaluate dag action is created specProducers.forEach(sp -> Mockito.verify(sp, Mockito.never()).addSpec(any())); + + Assert.assertFalse(DagProcUtils.isDagFinished(this.dagManagementStateStore.getDag(dagId).get())); } @Test @@ -271,8 +278,8 @@ public void testRetryCurrentFailedJob() throws Exception { .withValue(ConfigurationKeys.FLOW_NAME_KEY, ConfigValueFactory.fromAnyRef(flowName)) .withValue(ConfigurationKeys.JOB_GROUP_KEY, ConfigValueFactory.fromAnyRef(flowGroup)) .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, ConfigValueFactory.fromAnyRef( - MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)) - ); + MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))); + DagManager.DagId dagId = DagManagerUtils.generateDagId(dag); List> specProducers = getDagSpecProducers(dag); dagManagementStateStore.addDag(dag); // a job status with shouldRetry=true, it should have execution status = PENDING_RETRY @@ -294,6 +301,7 @@ public void testRetryCurrentFailedJob() throws Exception { specProducers.stream().skip(numOfLaunchedJobs) // separately verified `specProducers.get(0)` .forEach(sp -> Mockito.verify(sp, Mockito.never()).addSpec(any())); + Assert.assertFalse(DagProcUtils.isDagFinished(this.dagManagementStateStore.getDag(dagId).get())); Mockito.verify(dagManagementStateStore, Mockito.never()).deleteDagAction(any()); Mockito.verify(dagManagementStateStore, Mockito.never()).deleteDag(any()); Mockito.verify(dagManagementStateStore, Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(), diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java index 20e47145b6c..150bed10e66 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java @@ -21,6 +21,7 @@ import java.net.URISyntaxException; import org.mockito.Mockito; +import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -109,5 +110,7 @@ different methods create different spec executors (e.g. MockedSpecExecutor.creat the result will be that after serializing/deserializing the test dag, the spec executor (and producer) type may change */ Mockito.verify(this.dagManagementStateStore, Mockito.times(expectedNumOfResumedJobs)).addDagNodeState(any(), any()); + + Assert.assertFalse(DagProcUtils.isDagFinished(this.dagManagementStateStore.getDag(dagId).get())); } }