From b050979c68a25befd19000993536b8ca3e00cfe8 Mon Sep 17 00:00:00 2001 From: Xinyu Liu Date: Fri, 16 Aug 2024 09:06:28 -0700 Subject: [PATCH] Add logging for quorum count update --- .../org/apache/samza/operators/impl/WatermarkStates.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java index f5eca17033..11957711dc 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java @@ -58,6 +58,7 @@ private final static class WatermarkState { private final long createTime; private final LongSupplier systemTimeFunc; private volatile long watermarkTime = WATERMARK_NOT_EXIST; + private volatile int quorumCount = 0; WatermarkState( int expectedTotal, @@ -107,6 +108,13 @@ synchronized void update(long timestamp, String taskName) { // Active tasks must exceed the quorum size minWatermark = (updateCount >= quorumSize && min != Long.MAX_VALUE) ? min : WATERMARK_NOT_EXIST; + + // Log the current quorum count + if (this.quorumCount != updateCount) { + this.quorumCount = updateCount; + LOG.info("Current quorum count is {} for watermark aggregation, and the expected quorum size is {}", + this.quorumCount, this.quorumSize); + } } watermarkTime = Math.max(watermarkTime, minWatermark); }