Skip to content

Commit

Permalink
[GOBBLIN-2133] provide job future before calling SpecProducer::cancel…
Browse files Browse the repository at this point in the history
…Job (apache#4027)

* correct order in killing
* add more unit tests
  • Loading branch information
arjun4084346 authored Aug 15, 2024
1 parent ac5d6ee commit e857c00
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@
public class MockedSpecExecutor extends InMemorySpecExecutor {
private final SpecProducer<Spec> mockedSpecProducer;
private final Config config;
public static final String dummySerializedFuture = "12345";

public MockedSpecExecutor(Config config) {
super(config);
this.config = config;
this.mockedSpecProducer = Mockito.mock(SpecProducer.class);
when(mockedSpecProducer.addSpec(any())).thenReturn(new CompletedFuture(Boolean.TRUE, null));
when(mockedSpecProducer.serializeAddSpecResponse(any())).thenReturn("");
when(mockedSpecProducer.serializeAddSpecResponse(any())).thenReturn(dummySerializedFuture);
when(mockedSpecProducer.deserializeAddSpecResponse(any())).thenReturn(new CompletedFuture(Boolean.TRUE, null));
when(mockedSpecProducer.cancelJob(any(), any())).thenReturn(new CompletedFuture(Boolean.TRUE, null));
}
Expand Down
2 changes: 2 additions & 0 deletions gobblin-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ dependencies {
testCompile externalDependency.hamcrest
testCompile externalDependency.jhyde
testCompile externalDependency.mockitoInline
testCompile externalDependency.powerMockApi
testCompile externalDependency.powerMockModule
testCompile externalDependency.testContainers
testCompile externalDependency.testContainersMysql
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,17 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;

import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.JobSpec;
Expand Down Expand Up @@ -64,7 +65,7 @@ public class DagProcUtils {

/**
* If there is a single job to run next, it runs it. If there are multiple jobs to run, it creates a
* {@link org.apache.gobblin.service.modules.orchestration.DagActionStore.DagActionType#REEVALUATE} dag action for
* {@link DagActionStore.DagActionType#REEVALUATE} dag action for
* each of them and those jobs will be launched in respective {@link ReevaluateDagProc}.
*/
public static void submitNextNodes(DagManagementStateStore dagManagementStateStore, Dag<JobExecutionPlan> dag,
Expand All @@ -85,7 +86,7 @@ public static void submitNextNodes(DagManagementStateStore dagManagementStateSto

/**
* - submits a {@link JobSpec} to a {@link SpecExecutor}
* - emits a {@link TimingEvent.LauncherTimings#JOB_ORCHESTRATED} {@link org.apache.gobblin.metrics.GobblinTrackingEvent}
* - emits a {@link TimingEvent.LauncherTimings#JOB_ORCHESTRATED} {@link GobblinTrackingEvent}
* that measures the time needed to submit the job to {@link SpecExecutor}
* - increment running jobs counter for the {@link Dag}, the proxy user that submitted the job and the {@link SpecExecutor} job was sent to
* - add updated dag node state to dagManagementStateStore
Expand Down Expand Up @@ -122,7 +123,7 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat

Future<?> addSpecFuture = producer.addSpec(jobSpec);
// todo - we should add future.get() instead of the complete future into the JobExecutionPlan
dagNode.getValue().setJobFuture(com.google.common.base.Optional.of(addSpecFuture));
dagNode.getValue().setJobFuture(Optional.of(addSpecFuture));
addSpecFuture.get();
jobExecutionPlan.setExecutionStatus(ExecutionStatus.ORCHESTRATED);
jobMetadata.put(TimingEvent.METADATA_MESSAGE, producer.getExecutionLink(addSpecFuture, specExecutorUri));
Expand Down Expand Up @@ -155,28 +156,32 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat
}

public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel, DagManagementStateStore dagManagementStateStore) throws IOException {
Properties props = new Properties();
Properties cancelJobArgs = new Properties();
DagManager.DagId dagId = DagManagerUtils.generateDagId(dagNodeToCancel);
String serializedFuture = null;

if (dagNodeToCancel.getValue().getJobSpec().getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
cancelJobArgs.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
dagNodeToCancel.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
}

try {
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(), props).get();
if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
Future<?> future = dagNodeToCancel.getValue().getJobFuture().get();
serializedFuture = DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
cancelJobArgs.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE, serializedFuture);
} else {
log.warn("No Job future when canceling DAG node - {}", dagNodeToCancel.getValue().getId());
}
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(), cancelJobArgs).get();
// add back the dag node with updated states in the store
dagNodeToCancel.getValue().setExecutionStatus(CANCELLED);
dagManagementStateStore.addDagNodeState(dagNodeToCancel, dagId);
// send cancellation event after updating the state, because cancellation event triggers a ReevaluateDagAction
// that will delete the dag. Due to race condition between adding dag node and deleting dag, state store may get
// into inconsistent state.
if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
sendCancellationEvent(dagNodeToCancel, props);
log.info("Cancelled dag node {}, spec_producer_future {}", dagNodeToCancel.getValue().getId(),
props.get(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE));
} else {
log.warn("No Job future when canceling DAG node - {}", dagNodeToCancel.getValue().getId());
}
sendCancellationEvent(dagNodeToCancel);
log.info("Cancelled dag node {}, spec_producer_future {}", dagNodeToCancel.getValue().getId(), serializedFuture);
} catch (Exception e) {
throw new IOException(e);
}
Expand All @@ -191,19 +196,14 @@ public static void cancelDag(Dag<JobExecutionPlan> dag, DagManagementStateStore
}
}

private static void sendCancellationEvent(Dag.DagNode<JobExecutionPlan> dagNodeToCancel, Properties props)
throws ExecutionException, InterruptedException {
private static void sendCancellationEvent(Dag.DagNode<JobExecutionPlan> dagNodeToCancel) {
JobExecutionPlan jobExecutionPlan = dagNodeToCancel.getValue();
Future<?> future = jobExecutionPlan.getJobFuture().get();
String serializedFuture = DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE, serializedFuture);
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_CANCEL).stop(jobMetadata);
jobExecutionPlan.setExecutionStatus(CANCELLED);
}

/**
* Sets {@link Dag#flowEvent} and emits a {@link org.apache.gobblin.metrics.GobblinTrackingEvent} of the provided
* Sets {@link Dag#flowEvent} and emits a {@link GobblinTrackingEvent} of the provided
* flow event type.
*/
public static void setAndEmitFlowEvent(EventSubmitter eventSubmitter, Dag<JobExecutionPlan> dag, String flowEvent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ static Pair<org.apache.gobblin.configuration.State, NewState> recalcJobStatus(or
NewState newState = newState(jobStatus, states);
String newStatus = jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
if (newState == NewState.FINISHED) {
log.info("Flow {}:{}:{}:{} reached a terminal state {}", flowGroup, flowName, flowExecutionId, jobName, newStatus);
log.info("Flow/Job {}:{}:{}:{} reached a terminal state {}", flowGroup, flowName, flowExecutionId, jobName, newStatus);
}
return ImmutablePair.of(jobStatus, newState);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,20 @@
import java.net.URISyntaxException;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.runner.RunWith;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import com.typesafe.config.ConfigFactory;
Expand All @@ -37,8 +43,12 @@
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
import org.apache.gobblin.metrics.RootMetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
Expand All @@ -56,22 +66,38 @@
import org.apache.gobblin.service.monitoring.JobStatus;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.spy;
import static org.powermock.reflect.Whitebox.setInternalState;


@RunWith(PowerMockRunner.class)
@PrepareForTest(EventSubmitter.class)
public class KillDagProcTest {
private MySqlDagManagementStateStore dagManagementStateStore;
private ITestMetastoreDatabase testDb;
private DagProcessingEngineMetrics mockedDagProcEngineMetrics;
private MockedStatic<DagProc> dagProc;
private EventSubmitter mockedEventSubmitter;

@BeforeClass
public void setUp() throws Exception {
this.testDb = TestMetastoreDatabaseFactory.get();
this.dagManagementStateStore = spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testDb));
LaunchDagProcTest.mockDMSSCommonBehavior(this.dagManagementStateStore);
this.mockedDagProcEngineMetrics = Mockito.mock(DagProcessingEngineMetrics.class);
this.dagProc = mockStatic(DagProc.class);
}

@BeforeMethod
public void resetMocks() {
this.mockedEventSubmitter = spy(new EventSubmitter.Builder(RootMetricContext.get(), "org.apache.gobblin.service").build());
setInternalState(DagProc.class, "eventSubmitter", this.mockedEventSubmitter);
}

@AfterClass(alwaysRun = true)
Expand All @@ -80,8 +106,11 @@ public void tearDown() throws Exception {
// `.close()` to avoid (in the aggregate, across multiple suites) - java.sql.SQLNonTransientConnectionException: Too many connections
this.testDb.close();
}
this.dagProc.close();
}

// launches the flow, submits first job, and then kills the dag.
// all the jobs are killed and first job that was already launched is killed with the job future object.
@Test
public void killDag() throws IOException, URISyntaxException, InterruptedException {
long flowExecutionId = System.currentTimeMillis();
Expand Down Expand Up @@ -112,15 +141,35 @@ public void killDag() throws IOException, URISyntaxException, InterruptedExcepti
null, this.dagManagementStateStore, mockedDagProcEngineMetrics), ConfigFactory.empty());
killDagProc.process(this.dagManagementStateStore, this.mockedDagProcEngineMetrics);

long cancelJobCount = specProducers.stream()
int numOfLaunchedJobs = 1;
int numOfCancelledJobs = 5; // all jobs in the dag
int numOfCancelledFlows = 1;
int numOfCancelledJobsWithJobFuture = numOfLaunchedJobs;
long actualCancelJobCount = specProducers.stream()
.mapToLong(p -> Mockito.mockingDetails(p)
.getInvocations()
.stream()
.filter(a -> a.getMethod().getName().equals("cancelJob"))
.count())
.sum();

// kill dag procs kill only the launched jobs with parameters containing jobFuture
Mockito.verify(specProducers.get(0), Mockito.times(numOfCancelledJobsWithJobFuture)).cancelJob(any(), argThat(props ->
props.getProperty(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE, "ABSENT").equals(MockedSpecExecutor.dummySerializedFuture)));

// job future object is not available for rest of the jobs cancel parameters
specProducers.stream()
.skip(numOfCancelledJobsWithJobFuture) // separately verified `specProducers.get(0)` above
.forEach(sp -> Mockito.verify(sp, Mockito.never()).cancelJob(any(), argThat(props ->
props.getProperty(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE, "ABSENT").equals(MockedSpecExecutor.dummySerializedFuture))));

// kill dag proc tries to cancel all the dag nodes
Assert.assertEquals(cancelJobCount, 5);
Assert.assertEquals(actualCancelJobCount, numOfCancelledJobs);

Mockito.verify(this.mockedEventSubmitter, Mockito.times(numOfCancelledJobs))
.submit(eq(TimingEvent.LauncherTimings.JOB_CANCEL), anyMap());
Mockito.verify(this.mockedEventSubmitter, Mockito.times(numOfCancelledFlows))
.submit(eq(TimingEvent.FlowTimings.FLOW_CANCELLED), anyMap());
}

@Test
Expand Down Expand Up @@ -159,14 +208,23 @@ public void killDagNode() throws IOException, URISyntaxException, InterruptedExc
null, this.dagManagementStateStore, this.mockedDagProcEngineMetrics), ConfigFactory.empty());
killDagProc.process(this.dagManagementStateStore, this.mockedDagProcEngineMetrics);

int numOfCancelledJobs = 1; // the only job that was cancelled
int numOfCancelledFlows = 1;
long cancelJobCount = specProducers.stream()
.mapToLong(p -> Mockito.mockingDetails(p)
.getInvocations()
.stream()
.filter(a -> a.getMethod().getName().equals("cancelJob"))
.filter(a -> ((Properties) a.getArgument(1))
.getProperty(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE).equals(MockedSpecExecutor.dummySerializedFuture))
.count())
.sum();
// kill dag proc tries to cancel only the exact dag node that was provided
Assert.assertEquals(cancelJobCount, 1);
Assert.assertEquals(cancelJobCount, numOfCancelledJobs);

Mockito.verify(this.mockedEventSubmitter, Mockito.times(numOfCancelledJobs))
.submit(eq(TimingEvent.LauncherTimings.JOB_CANCEL), anyMap());
Mockito.verify(this.mockedEventSubmitter, Mockito.times(numOfCancelledFlows))
.submit(eq(TimingEvent.FlowTimings.FLOW_CANCELLED), anyMap());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@
import java.util.List;

import org.apache.hadoop.fs.Path;
import org.junit.runner.RunWith;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
Expand All @@ -38,6 +42,9 @@
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
import org.apache.gobblin.metrics.RootMetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
Expand Down Expand Up @@ -65,20 +72,23 @@
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.*;
import static org.powermock.reflect.Whitebox.setInternalState;


@RunWith(PowerMockRunner.class)
@PrepareForTest(EventSubmitter.class)
public class LaunchDagProcTest {
private ITestMetastoreDatabase testMetastoreDatabase;
private MySqlDagManagementStateStore dagManagementStateStore;
private DagProcessingEngineMetrics mockedDagProcEngineMetrics;
private MockedStatic<DagProc> dagProc;
private EventSubmitter mockedEventSubmitter;

@BeforeClass
public void setUp() throws Exception {
this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
this.dagProc = mockStatic(DagProc.class);
}

/**
Expand All @@ -89,12 +99,15 @@ public void resetDMSS() throws Exception {
this.dagManagementStateStore = spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
mockDMSSCommonBehavior(this.dagManagementStateStore);
this.mockedDagProcEngineMetrics = Mockito.mock(DagProcessingEngineMetrics.class);
this.mockedEventSubmitter = spy(new EventSubmitter.Builder(RootMetricContext.get(), "org.apache.gobblin.service").build());
setInternalState(DagProc.class, "eventSubmitter", this.mockedEventSubmitter);
}

@AfterClass(alwaysRun = true)
public void tearDown() throws Exception {
// `.close()` to avoid (in the aggregate, across multiple suites) - java.sql.SQLNonTransientConnectionException: Too many connections
this.testMetastoreDatabase.close();
this.dagProc.close();
}

@Test
Expand Down Expand Up @@ -127,6 +140,10 @@ public void launchDag() throws IOException, InterruptedException, URISyntaxExcep

Mockito.verify(this.dagManagementStateStore, Mockito.times(numOfLaunchedJobs))
.addJobDagAction(any(), any(), anyLong(), eq(DagActionStore.NO_JOB_NAME_DEFAULT), eq(DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE));

// FLOW_RUNNING is emitted exactly once per flow during the execution of LaunchDagProc
Mockito.verify(this.mockedEventSubmitter, Mockito.times(1))
.submit(eq(TimingEvent.FlowTimings.FLOW_RUNNING), anyMap());
}

@Test
Expand All @@ -153,6 +170,10 @@ public void launchDagWithMultipleParallelJobs() throws IOException, InterruptedE
// parallel jobs are launched through reevaluate dag action
Mockito.verify(this.dagManagementStateStore, Mockito.times(numOfLaunchedJobs))
.addJobDagAction(eq(flowGroup), eq(flowName), eq(flowExecutionId), any(), eq(DagActionStore.DagActionType.REEVALUATE));

// FLOW_RUNNING is emitted exactly once per flow during the execution of LaunchDagProc
Mockito.verify(this.mockedEventSubmitter, Mockito.times(1))
.submit(eq(TimingEvent.FlowTimings.FLOW_RUNNING), anyMap());
}

// This creates a dag like this
Expand Down
2 changes: 2 additions & 0 deletions gradle/scripts/dependencyDefinitions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ ext.externalDependency = [
'parquetAvro': 'org.apache.parquet:parquet-avro:1.11.0',
'parquetProto': 'org.apache.parquet:parquet-protobuf:1.11.0',
'parquetHadoop': 'org.apache.parquet:parquet-hadoop-bundle:1.11.0',
'powerMockApi' : 'org.powermock:powermock-api-mockito2:2.0.9',
'powerMockModule' : 'org.powermock:powermock-module-junit4:2.0.9',
'reactivex': 'io.reactivex.rxjava2:rxjava:2.1.0',
"slf4j": [
"org.slf4j:slf4j-api:" + slf4jVersion,
Expand Down

0 comments on commit e857c00

Please sign in to comment.