Skip to content

Commit

Permalink
fix race condition b/w reevaluate dag proc and deadline dag proc
Browse files Browse the repository at this point in the history
  • Loading branch information
arjun4084346 committed Aug 10, 2024
1 parent ca7ad90 commit 5ddd05e
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,7 @@
import java.util.Map;
import java.util.Properties;

import javax.inject.Named;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.util.InjectionNames;
import org.apache.gobblin.util.ExponentialBackoff;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -44,9 +40,11 @@

import javax.annotation.Nonnull;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import lombok.Getter;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
Expand All @@ -62,9 +60,11 @@
import org.apache.gobblin.runtime.api.SpecStore;
import org.apache.gobblin.runtime.spec_serde.JavaSpecSerDe;
import org.apache.gobblin.runtime.spec_store.FSSpecStore;
import org.apache.gobblin.runtime.util.InjectionNames;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExponentialBackoff;
import org.apache.gobblin.util.callbacks.CallbackResult;
import org.apache.gobblin.util.callbacks.CallbacksDispatcher;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -162,18 +163,16 @@ public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel,
}

try {
if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
Future<?> future = dagNodeToCancel.getValue().getJobFuture().get();
String serializedFuture = DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE, serializedFuture);
sendCancellationEvent(dagNodeToCancel.getValue());
} else {
log.warn("No Job future when canceling DAG node (hence, not sending cancellation event) - {}",
dagNodeToCancel.getValue().getJobSpec().getUri());
if (!dagNodeToCancel.getValue().getJobFuture().isPresent()) {
log.warn("No Job future when canceling DAG node - {}", dagNodeToCancel.getValue().getJobSpec().getUri());
}
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(), props).get();
// add back the dag node with updated states in the store
dagManagementStateStore.addDagNodeState(dagNodeToCancel, dagId);
// send cancellation event after updating the state, because cancellation event triggers a ReevaluateDagAction
// that will delete the dag. Due to race condition between adding dag node and deleting dag, state store may get
// into inconsistent state.
sendCancellationEvent(dagNodeToCancel, props);
} catch (Exception e) {
throw new IOException(e);
}
Expand All @@ -188,7 +187,12 @@ public static void cancelDag(Dag<JobExecutionPlan> dag, DagManagementStateStore
}
}

public static void sendCancellationEvent(JobExecutionPlan jobExecutionPlan) {
private static void sendCancellationEvent(Dag.DagNode<JobExecutionPlan> dagNodeToCancel, Properties props)
throws ExecutionException, InterruptedException {
JobExecutionPlan jobExecutionPlan = dagNodeToCancel.getValue();
Future<?> future = jobExecutionPlan.getJobFuture().get();
String serializedFuture = DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE, serializedFuture);
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_CANCEL).stop(jobMetadata);
jobExecutionPlan.setExecutionStatus(CANCELLED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ protected void enforceDeadline(DagManagementStateStore dagManagementStateStore,
dag.setMessage("Flow killed due to exceeding SLA of " + flowFinishDeadline + " ms");
dagProcEngineMetrics.markDagActionsAct(getDagActionType(), true);
} else {
dagProcEngineMetrics.markDagActionsAct(getDagActionType(), true);
log.error("EnforceFlowFinishDeadline dagAction received before due time. flowStartTime {}, flowFinishDeadline {} ", flowStartTime, flowFinishDeadline);
dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
throw new RuntimeException(String.format("EnforceFlowFinishDeadline dagAction received before due time. flowStartTime %s, flowFinishDeadline %s ", flowStartTime, flowFinishDeadline));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -614,10 +614,11 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
* Remove a flowSpec from schedule
* Unlike onDeleteSpec, we want to avoid deleting the flowSpec on the executor
* and we still want to unschedule if we cannot connect to zookeeper as the current node cannot be the master
* returns true if unschedule is successful, false otherwise
* @param specURI
* @param specVersion
*/
private void unscheduleSpec(URI specURI, String specVersion) throws JobException {
private boolean unscheduleSpec(URI specURI, String specVersion) throws JobException {
if (this.scheduledFlowSpecs.containsKey(specURI.toString())) {
_log.info("Unscheduling flowSpec " + specURI + "/" + specVersion);
this.scheduledFlowSpecs.remove(specURI.toString());
Expand All @@ -630,10 +631,12 @@ private void unscheduleSpec(URI specURI, String specVersion) throws JobException
} catch (SpecNotFoundException e) {
_log.warn("Unable to retrieve spec for URI {}", specURI);
}
return true;
} else {
throw new JobException(String.format(
_log.info(String.format(
"Spec with URI: %s was not found in cache. May be it was cleaned, if not please clean it manually",
specURI));
return false;
}
}

Expand All @@ -657,10 +660,11 @@ public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion, Properti
return;
}

Spec deletedSpec = this.scheduledFlowSpecs.get(deletedSpecURI.toString());
try {
Spec deletedSpec = this.scheduledFlowSpecs.get(deletedSpecURI.toString());
unscheduleSpec(deletedSpecURI, deletedSpecVersion);
this.orchestrator.remove(deletedSpec, headers);
if (unscheduleSpec(deletedSpecURI, deletedSpecVersion)) {
this.orchestrator.remove(deletedSpec, headers);
}
} catch (JobException | IOException e) {
_log.warn(String.format("Spec with URI: %s was not unscheduled cleaning", deletedSpecURI), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ protected void processMessage(DecodeableKafkaRecord message) {
this.failedAddedSpecs.mark();
}
} else if (operation.equals("DELETE")) {
log.info("Deleting spec {} after receiving spec store change event", specAsUri);
log.info("Deleting spec {} from scheduler after receiving spec store change event", specAsUri);
scheduler.onDeleteSpec(specAsUri, FlowSpec.Builder.DEFAULT_VERSION);
this.deletedSpecs.mark();
} else {
Expand All @@ -186,7 +186,7 @@ protected void processMessage(DecodeableKafkaRecord message) {
return;
}
} catch (Exception e) {
log.warn("Ran into unexpected error processing SpecStore changes. Reexamine scheduler. Error: {}", e);
log.warn("Ran into unexpected error processing SpecStore changes. Reexamine scheduler. Error: ", e);
this.unexpectedErrors.mark();
return;
}
Expand Down

0 comments on commit 5ddd05e

Please sign in to comment.