Skip to content

Commit

Permalink
[Java] Expose a counter on the Cluster Backup to track the number of …
Browse files Browse the repository at this point in the history
…snapshots downloaded.
  • Loading branch information
mikeb01 committed Sep 20, 2024
1 parent 0d595be commit 5fab012
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 1 deletion.
6 changes: 6 additions & 0 deletions aeron-client/src/main/java/io/aeron/AeronCounters.java
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,12 @@ public final class AeronCounters
@AeronCounter
public static final int CLUSTER_BACKUP_ERROR_COUNT_TYPE_ID = 211;

/**
* The type id of the {@link Counter} used for tracking the number of snapshots downloaded.
*/
@AeronCounter
public static final int CLUSTER_BACKUP_SNAPSHOT_RETRIEVE_COUNT_TYPE_ID = 240;

/**
* Counter type id for the consensus module error count.
*/
Expand Down
34 changes: 34 additions & 0 deletions aeron-cluster/src/main/java/io/aeron/cluster/ClusterBackup.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import static io.aeron.AeronCounters.CLUSTER_BACKUP_SNAPSHOT_RETRIEVE_COUNT_TYPE_ID;
import static io.aeron.CommonContext.ENDPOINT_PARAM_NAME;
import static io.aeron.cluster.ConsensusModule.Configuration.SERVICE_ID;
import static io.aeron.cluster.service.ClusteredServiceContainer.Configuration.LIVENESS_TIMEOUT_MS;
Expand Down Expand Up @@ -614,6 +615,7 @@ public static class Context implements Cloneable
private Counter stateCounter;
private Counter liveLogPositionCounter;
private Counter nextQueryDeadlineMsCounter;
private Counter snapshotRetrieveCounter;

private AeronArchive.Context archiveContext;
private AeronArchive.Context clusterArchiveContext;
Expand Down Expand Up @@ -792,6 +794,16 @@ public void conclude()
aeron, buffer, "ClusterBackup next query deadline in ms", QUERY_DEADLINE_TYPE_ID, clusterId);
}

if (null == snapshotRetrieveCounter)
{
snapshotRetrieveCounter = ClusterCounters.allocate(
aeron,
buffer,
"ClusterBackup snapshots retrieved",
CLUSTER_BACKUP_SNAPSHOT_RETRIEVE_COUNT_TYPE_ID,
clusterId);
}

if (null == threadFactory)
{
threadFactory = Thread::new;
Expand Down Expand Up @@ -1710,6 +1722,28 @@ public Context liveLogPositionCounter(final Counter liveLogPositionCounter)
return this;
}

/**
* Set the counter for the number of snapshots retrieved by the backup from the cluster.
*
* @param snapshotRetrieveCounter the counter to use for snapshots retrieved.
* @return this for a fluent API.
*/
public Context snapshotRetrieveCounter(final Counter snapshotRetrieveCounter)
{
this.snapshotRetrieveCounter = snapshotRetrieveCounter;
return this;
}

/**
* Get the counter for the number of snapshots retrieved by the backup from the cluster.
*
* @return counter for the number of snapshots retrieved by the backup from the cluster.
*/
public Counter snapshotRetrieveCounter()
{
return snapshotRetrieveCounter;
}

/**
* Get the counter for the next query deadline ms.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,11 @@ private int snapshotRetrieve(final long nowMs)
{
snapshotsRetrieved.addAll(snapshotReplication.snapshotsRetrieved());

if (!snapshotsRetrieved.isEmpty())
{
ctx.snapshotRetrieveCounter().incrementOrdered();
}

snapshotReplication.close();
snapshotReplication = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ void shouldBackupClusterNoSnapshotsAndThenSendMessages()
}

@Test
@InterruptAfter(30)
@InterruptAfter(20)
void shouldBackupClusterWithSnapshot()
{
final TestCluster cluster = aCluster().withStaticNodes(3).useResponseChannels(true).start();
Expand All @@ -155,6 +155,7 @@ void shouldBackupClusterWithSnapshot()

cluster.awaitBackupState(ClusterBackup.State.BACKING_UP);
cluster.awaitBackupLiveLogPosition(logPosition);
cluster.awaitBackupSnapshotRetrievedCount(1);
cluster.stopAllNodes();

final TestNode node = cluster.startStaticNodeFromBackup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ long liveLogPosition()
return counter.get();
}

public long snapshotRetrieveCount()
{
return context.clusterBackupContext.snapshotRetrieveCounter().get();
}

public EpochClock epochClock()
{
return context.clusterBackupContext.epochClock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1150,6 +1150,24 @@ public void awaitBackupLiveLogPosition(final long position)
}
}

public void awaitBackupSnapshotRetrievedCount(final long snapshotCount)
{
if (null == backupNode)
{
throw new IllegalStateException("no backup node present");
}

@SuppressWarnings("indentation")
final Supplier<String> msg =
() -> "Snapshot retrieve count not found expected=" + snapshotCount +
" actual=" + backupNode.snapshotRetrieveCount();

while (backupNode.snapshotRetrieveCount() < snapshotCount)
{
Tests.yieldingIdle(msg);
}
}

public TestNode node(final int index)
{
return nodes[index];
Expand Down

0 comments on commit 5fab012

Please sign in to comment.