Skip to content

Commit

Permalink
[Java] Don't enter election on log recording counter going unavailabl…
Browse files Browse the repository at this point in the history
…e. Instead, wait on EOS. Issue #1472.
  • Loading branch information
mjpt777 committed Jun 27, 2023
1 parent 3f6c5e1 commit c6f921b
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import static io.aeron.cluster.client.AeronCluster.Configuration.PROTOCOL_SEMANTIC_VERSION;
import static io.aeron.cluster.service.ClusteredServiceContainer.Configuration.MARK_FILE_UPDATE_INTERVAL_NS;
import static io.aeron.exceptions.AeronException.Category.WARN;
import static java.lang.Math.min;

final class ConsensusModuleAgent implements Agent, TimerService.TimerHandler, ConsensusModuleSnapshotListener
{
Expand Down Expand Up @@ -155,11 +154,10 @@ final class ConsensusModuleAgent implements Agent, TimerService.TimerHandler, Co
private ClusterTermination clusterTermination;
private long logSubscriptionId = NULL_VALUE;
private long logRecordingId = NULL_VALUE;
private long logRecordedPosition = NULL_POSITION;
private long logRecordingStopPosition = 0;
private String liveLogDestination;
private String catchupLogDestination;
private String ingressEndpoints;
private boolean isElectionRequired;

ConsensusModuleAgent(final ConsensusModule.Context ctx)
{
Expand Down Expand Up @@ -1789,6 +1787,7 @@ void truncateLogEntry(final long leadershipTermId, final long logPosition)
recordingLog.commitLogPosition(leadershipTermId, logPosition);
}
logAdapter.disconnect(ctx.countedErrorHandler(), logPosition);
logRecordingStopPosition = logPosition;
}

boolean appendNewLeadershipTermEvent(final long nowNs)
Expand Down Expand Up @@ -2081,7 +2080,12 @@ else if (RecordingSignalEventDecoder.TEMPLATE_ID == templateId)

if (RecordingSignal.STOP == signal && recordingId == logRecordingId)
{
this.logRecordedPosition = position;
logRecordingStopPosition = position;

if (Cluster.Role.LEADER == role && null == election)
{
enterElection(false);
}
}

if (null != election)
Expand Down Expand Up @@ -2327,14 +2331,6 @@ else if (ConsensusModule.State.CLOSED == state)
{
unexpectedTermination();
}
else if (isElectionRequired)
{
if (null == election)
{
enterElection(logAdapter.isLogEndOfStream());
}
isElectionRequired = false;
}

if (nowNs >= markFileUpdateDeadlineNs)
{
Expand Down Expand Up @@ -2429,13 +2425,13 @@ private int consensusWork(final long timestamp, final long nowNs)
}
else
{
final long limit = null != appendPosition ? appendPosition.get() : logRecordedPosition;
final int count = logAdapter.poll(min(notifiedCommitPosition, limit));
final long limit = null != appendPosition ? appendPosition.get() : logRecordingStopPosition;
final int count = logAdapter.poll(Math.min(notifiedCommitPosition, limit));
if (0 == count && logAdapter.isImageClosed())
{
final boolean isEos = logAdapter.isLogEndOfStream();
ctx.countedErrorHandler().onError(new ClusterEvent(
"log disconnected from leader: eos=" + isEos));
final String message = "log disconnected from leader: eos=" + isEos;
ctx.countedErrorHandler().onError(new ClusterEvent(message));
enterElection(isEos);
return 1;
}
Expand Down Expand Up @@ -2870,14 +2866,13 @@ private boolean tryCreateAppendPosition(final int logSessionId)

logRecordingId(recordingId);
appendPosition = new ReadableCounter(counters, registrationId, counterId);
logRecordedPosition = NULL_POSITION;

return true;
}

private int updateFollowerPosition(final long nowNs)
{
final long recordedPosition = null != appendPosition ? appendPosition.get() : logRecordedPosition;
final long recordedPosition = null != appendPosition ? appendPosition.get() : logRecordingStopPosition;
return updateFollowerPosition(
leaderMember.publication(), nowNs, this.leadershipTermId, recordedPosition, APPEND_POSITION_FLAG_NONE);
}
Expand Down Expand Up @@ -3065,7 +3060,7 @@ long timeOfLastLeaderUpdateNs()
int updateLeaderPosition(final long nowNs, final long position)
{
thisMember.logPosition(position).timeOfLastAppendPositionNs(nowNs);
final long commitPosition = min(quorumPosition(), position);
final long commitPosition = Math.min(quorumPosition(), position);

if (commitPosition > this.commitPosition.getWeak() ||
nowNs >= (timeOfLastLogUpdateNs + leaderHeartbeatIntervalNs))
Expand Down Expand Up @@ -3232,14 +3227,16 @@ private void enterElection(final boolean isLogEndOfStream)
final RecordingLog.Entry termEntry = recordingLog.findTermEntry(leadershipTermId);
final long termBaseLogPosition = null != termEntry ?
termEntry.termBaseLogPosition : recoveryPlan.lastTermBaseLogPosition;
final long appendedPosition = null != appendPosition ?
appendPosition.get() : Math.max(recoveryPlan.appendedLogPosition, logRecordingStopPosition);

election = new Election(
false,
isLogEndOfStream ? leaderMember.id() : NULL_VALUE,
leadershipTermId,
termBaseLogPosition,
commitPosition.getWeak(),
null != appendPosition ? appendPosition.get() : recoveryPlan.appendedLogPosition,
appendedPosition,
activeMembers,
clusterMemberByIdMap,
thisMember,
Expand Down Expand Up @@ -3445,22 +3442,9 @@ private void onUnavailableCounter(final CountersReader counters, final long regi

if (null != appendPosition && appendPosition.registrationId() == registrationId)
{
appendPosition.close();
appendPosition = null;
logSubscriptionId = NULL_VALUE;

if (null != election)
{
election.handleError(clusterClock.timeNanos(), new ClusterEvent(
"log recording ended unexpectedly (null != election)"));
}
else if (NULL_POSITION == terminationPosition)
{
final String msg =
"log recording ended unexpectedly (NULL_POSITION == terminationPosition) logEos=" +
logAdapter.isLogEndOfStream();
ctx.countedErrorHandler().onError(new ClusterEvent(msg));
isElectionRequired = true;
}
}
}
}
Expand Down Expand Up @@ -3820,8 +3804,7 @@ private RecordingLog.RecoveryPlan recoverFromBootstrapState()
offset + MessageHeaderDecoder.ENCODED_LENGTH,
messageHeaderDecoder.blockLength(),
messageHeaderDecoder.version());
onLoadPendingMessage(
sessionMessageHeaderDecoder.clusterSessionId(), buffer, offset, length);
onLoadPendingMessage(sessionMessageHeaderDecoder.clusterSessionId(), buffer, offset, length);
return true;
},
Integer.MAX_VALUE);
Expand All @@ -3836,6 +3819,7 @@ private RecordingLog.RecoveryPlan recoverFromBootstrapState()

captureServiceClientIds();
++serviceAckId;

return recoveryPlan;
}

Expand Down
59 changes: 11 additions & 48 deletions aeron-system-tests/src/test/java/io/aeron/cluster/ClusterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
@ExtendWith({ EventLogExtension.class, InterruptingTestCallback.class })
class ClusterTest
{
private static final String EMPTY_MSG = "";
@RegisterExtension
final SystemTestWatcher systemTestWatcher = new SystemTestWatcher();

Expand Down Expand Up @@ -349,7 +350,7 @@ void shouldElectSameLeaderAfterLoosingQuorum()
}

@Test
@InterruptAfter(20)
@InterruptAfter(10)
void shouldElectNewLeaderAfterGracefulLeaderClose()
{
cluster = aCluster().withStaticNodes(3).start();
Expand All @@ -359,17 +360,14 @@ void shouldElectNewLeaderAfterGracefulLeaderClose()
cluster.connectClient();

final int messageCount = 10;
cluster.sendAndAwaitMessages(messageCount);
cluster.sendMessages(messageCount);
cluster.awaitResponseMessageCount(messageCount);

leader.gracefulClose();

final TestNode newLeader = cluster.awaitLeader();
cluster.awaitNewLeadershipEvent(1);
assertNotEquals(newLeader.index(), leader.index());

cluster.stopNode(leader);

cluster.sendAndAwaitMessages(messageCount, messageCount * 2);
}

@Test
Expand Down Expand Up @@ -807,7 +805,7 @@ void shouldTerminateLeaderWhenServiceStops()

@Test
@InterruptAfter(30)
void shouldEnterElectionWhenRecordingStopsOnLeader()
void shouldEnterElectionWhenRecordingStopsUnexpectedlyOnLeader()
{
cluster = aCluster().withStaticNodes(3).start();
systemTestWatcher.cluster(cluster);
Expand All @@ -833,36 +831,6 @@ void shouldEnterElectionWhenRecordingStopsOnLeader()
cluster.followers(2);
}

@Test
@InterruptAfter(30)
void shouldRecoverFollowerWhenRecordingStops()
{
cluster = aCluster().withStaticNodes(3).start();
systemTestWatcher.cluster(cluster);

cluster.awaitLeader();

final TestNode follower = cluster.followers().get(0);
final AeronArchive.Context archiveCtx = new AeronArchive.Context()
.controlRequestChannel(follower.archive().context().localControlChannel())
.controlResponseChannel(follower.archive().context().localControlChannel())
.controlRequestStreamId(follower.archive().context().localControlStreamId())
.aeronDirectoryName(follower.mediaDriver().aeronDirectoryName());

try (AeronArchive archive = AeronArchive.connect(archiveCtx))
{
final int firstRecordingIdIsTheClusterLog = 0;
assertTrue(archive.tryStopRecordingByIdentity(firstRecordingIdIsTheClusterLog));
}

final int messageCount = 10;
cluster.connectClient();
cluster.sendMessages(messageCount);
cluster.awaitResponseMessageCount(messageCount);

cluster.awaitServiceMessageCount(follower, messageCount);
}

@Test
@InterruptAfter(30)
void shouldCloseClientOnTimeout()
Expand Down Expand Up @@ -1918,7 +1886,7 @@ void shouldTakeASnapshotAfterReceivingAdminRequestOfTypeSnapshot()

final long requestCorrelationId = System.nanoTime();
final MutableBoolean hasResponse = injectAdminResponseEgressListener(
requestCorrelationId, AdminRequestType.SNAPSHOT, AdminResponseCode.OK, "");
requestCorrelationId, AdminRequestType.SNAPSHOT, AdminResponseCode.OK, EMPTY_MSG);

final AeronCluster client = cluster.connectClient();
while (!client.sendAdminRequestToTakeASnapshot(requestCorrelationId))
Expand Down Expand Up @@ -1956,8 +1924,7 @@ void shouldTakeASnapshotAfterReceivingAdminRequestOfTypeSnapshotAndNotifyViaCont
final TestNode leader = cluster.awaitLeader();

final long requestCorrelationId = System.nanoTime();
final MutableBoolean hasResponse = injectAdminRequestControlledEgressListener(
requestCorrelationId, AdminRequestType.SNAPSHOT, AdminResponseCode.OK, "");
final MutableBoolean hasResponse = injectAdminRequestControlledEgressListener(requestCorrelationId);

final AeronCluster client = cluster.connectClient();
while (!client.sendAdminRequestToTakeASnapshot(requestCorrelationId))
Expand Down Expand Up @@ -2116,11 +2083,7 @@ public void onAdminResponse(
return hasResponse;
}

private MutableBoolean injectAdminRequestControlledEgressListener(
final long expectedCorrelationId,
final AdminRequestType expectedRequestType,
final AdminResponseCode expectedResponseCode,
final String expectedMessage)
private MutableBoolean injectAdminRequestControlledEgressListener(final long expectedCorrelationId)
{
final MutableBoolean hasResponse = new MutableBoolean();

Expand Down Expand Up @@ -2150,9 +2113,9 @@ public void onAdminResponse(
{
hasResponse.set(true);
assertEquals(expectedCorrelationId, correlationId);
assertEquals(expectedRequestType, requestType);
assertEquals(expectedResponseCode, responseCode);
assertEquals(expectedMessage, message);
assertEquals(AdminRequestType.SNAPSHOT, requestType);
assertEquals(AdminResponseCode.OK, responseCode);
assertEquals(EMPTY_MSG, message);
assertNotNull(payload);
final int minPayloadOffset =
MessageHeaderEncoder.ENCODED_LENGTH +
Expand Down

0 comments on commit c6f921b

Please sign in to comment.