From 6057b1f21073c4e7d1c8809e31045a2b66713d31 Mon Sep 17 00:00:00 2001 From: Zac Policzer Date: Mon, 30 Sep 2024 13:53:18 -0700 Subject: [PATCH] [server] tweak heartbeat reporting to use completion check (#1207) --- .../kafka/consumer/LeaderFollowerStoreIngestionTask.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index 96f0030577..f0594664b0 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -2146,7 +2146,7 @@ protected void recordHeartbeatReceived( partitionConsumptionState.getPartition(), serverConfig.getKafkaClusterUrlToAliasMap().get(kafkaUrl), consumerRecord.getValue().producerMetadata.messageTimestamp, - partitionConsumptionState.isWaitingForReplicationLag()); + partitionConsumptionState.isComplete()); } else { heartbeatMonitoringService.recordFollowerHeartbeat( storeName, @@ -2154,7 +2154,7 @@ protected void recordHeartbeatReceived( partitionConsumptionState.getPartition(), serverConfig.getKafkaClusterUrlToAliasMap().get(kafkaUrl), consumerRecord.getValue().producerMetadata.messageTimestamp, - partitionConsumptionState.isWaitingForReplicationLag()); + partitionConsumptionState.isComplete()); } }