From bb726fc3ee6b35a2cb9b872f2f024ab5e0e4acfb Mon Sep 17 00:00:00 2001 From: Hao Xu Date: Wed, 25 Sep 2024 16:10:31 -0700 Subject: [PATCH] Debug for CI. --- .../kafka/consumer/KafkaConsumerService.java | 33 +++++++------------ .../KafkaConsumerServiceDelegatorTest.java | 14 ++++++-- 2 files changed, 22 insertions(+), 25 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java index 099e39e37f..42b6e88cc4 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java @@ -25,8 +25,6 @@ import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; -import com.linkedin.venice.utils.locks.AutoCloseableLock; -import com.linkedin.venice.utils.locks.ResourceAutoClosableLockManager; import io.tehuti.metrics.MetricsRepository; import java.util.HashMap; import java.util.HashSet; @@ -37,7 +35,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.IntConsumer; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -82,8 +79,6 @@ public abstract class KafkaConsumerService extends AbstractKafkaConsumerService private RandomAccessDaemonThreadFactory threadFactory; private final Logger LOGGER; private final ExecutorService consumerExecutor; - private final ResourceAutoClosableLockManager topicPartitionLockManager = - new ResourceAutoClosableLockManager<>(ReentrantLock::new); private static final int SHUTDOWN_TIMEOUT_IN_SECOND = 1; private static final RedundantExceptionFilter REDUNDANT_LOGGING_FILTER = RedundantExceptionFilter.getRedundantExceptionFilter(); @@ -227,7 +222,7 @@ public void unsubscribeAll(PubSubTopic versionTopic) { versionTopicToTopicPartitionToConsumer.compute(versionTopic, (k, topicPartitionToConsumerMap) -> { if (topicPartitionToConsumerMap != null) { topicPartitionToConsumerMap.forEach((topicPartition, sharedConsumer) -> { - try (AutoCloseableLock ignored = topicPartitionLockManager.getLockForResource(topicPartition)) { + synchronized (sharedConsumer) { sharedConsumer.unSubscribe(topicPartition); removeTopicPartitionFromConsumptionTask(sharedConsumer, topicPartition); } @@ -242,9 +237,9 @@ public void unsubscribeAll(PubSubTopic versionTopic) { */ @Override public void unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) { - try (AutoCloseableLock ignored = topicPartitionLockManager.getLockForResource(pubSubTopicPartition)) { - PubSubConsumerAdapter consumer = getConsumerAssignedToVersionTopicPartition(versionTopic, pubSubTopicPartition); - if (consumer != null) { + PubSubConsumerAdapter consumer = getConsumerAssignedToVersionTopicPartition(versionTopic, pubSubTopicPartition); + if (consumer != null) { + synchronized (consumer) { consumer.unSubscribe(pubSubTopicPartition); removeTopicPartitionFromConsumptionTask(consumer, pubSubTopicPartition); versionTopicToTopicPartitionToConsumer.compute(versionTopic, (k, topicPartitionToConsumerMap) -> { @@ -395,13 +390,13 @@ public void startConsumptionIntoDataReceiver( ConsumedDataReceiver>> consumedDataReceiver) { PubSubTopic versionTopic = consumedDataReceiver.destinationIdentifier(); PubSubTopicPartition topicPartition = partitionReplicaIngestionContext.getPubSubTopicPartition(); - try (AutoCloseableLock ignored = topicPartitionLockManager.getLockForResource(topicPartition)) { - SharedKafkaConsumer consumer = assignConsumerFor(versionTopic, topicPartition); - if (consumer == null) { - // Defensive code. Shouldn't happen except in case of a regression. - throw new VeniceException( - "Shared consumer must exist for version topic: " + versionTopic + " in Kafka cluster: " + kafkaUrl); - } + SharedKafkaConsumer consumer = assignConsumerFor(versionTopic, topicPartition); + if (consumer == null) { + // Defensive code. Shouldn't happen except in case of a regression. + throw new VeniceException( + "Shared consumer must exist for version topic: " + versionTopic + " in Kafka cluster: " + kafkaUrl); + } + synchronized (consumer) { ConsumptionTask consumptionTask = consumerToConsumptionTask.get(consumer); if (consumptionTask == null) { // Defensive coding. Should never happen except in case of a regression. @@ -562,10 +557,4 @@ public enum ConsumerAssignmentStrategy { public void setThreadFactory(RandomAccessDaemonThreadFactory threadFactory) { this.threadFactory = threadFactory; } - - @Override - public synchronized void stop() throws Exception { - topicPartitionLockManager.removeAllLocks(); - super.stop(); - } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegatorTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegatorTest.java index b1c94762ce..5c9353228f 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegatorTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegatorTest.java @@ -33,8 +33,10 @@ import io.tehuti.metrics.Sensor; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; @@ -493,7 +495,8 @@ public void testKafkaConsumerServiceResubscriptionConcurrency() throws Exception MetricsRepository mockMetricsRepository = mock(MetricsRepository.class); final Sensor mockSensor = mock(Sensor.class); doReturn(mockSensor).when(mockMetricsRepository).sensor(anyString(), any()); - int consumerNum = 6; + + int versionNum = 5; PubSubMessageDeserializer pubSubDeserializer = new PubSubMessageDeserializer( new OptimizedKafkaValueSerializer(), @@ -504,7 +507,7 @@ public void testKafkaConsumerServiceResubscriptionConcurrency() throws Exception factory, properties, 1000l, - consumerNum, + versionNum + 1, mock(IngestionThrottler.class), mock(KafkaClusterBasedRecordThrottler.class), mockMetricsRepository, @@ -534,7 +537,8 @@ public void testKafkaConsumerServiceResubscriptionConcurrency() throws Exception new PubSubTopicPartitionImpl(TOPIC_REPOSITORY.getTopic(Version.composeRealTimeTopic(storeName)), 0); CountDownLatch countDownLatch = new CountDownLatch(1); - for (int i = 0; i < consumerNum; i++) { + List infiniteSubUnSubThreads = new ArrayList<>(); + for (int i = 0; i < versionNum; i++) { PubSubTopic topicV1ForStoreName3 = TOPIC_REPOSITORY.getTopic(Version.composeKafkaTopic(storeName, i)); StoreIngestionTask task = mock(StoreIngestionTask.class); when(task.getVersionTopic()).thenReturn(topicV1ForStoreName3); @@ -555,11 +559,15 @@ public void testKafkaConsumerServiceResubscriptionConcurrency() throws Exception countDownLatch); Thread infiniteSubUnSubThread = new Thread(infiniteSubUnSub, "infiniteResubscribe: " + topicV1ForStoreName3); infiniteSubUnSubThread.start(); + infiniteSubUnSubThreads.add(infiniteSubUnSubThread); } long currentTime = System.currentTimeMillis(); Boolean raceConditionFound = countDownLatch.await(30, TimeUnit.SECONDS); long elapsedTime = System.currentTimeMillis() - currentTime; + for (Thread infiniteSubUnSubThread: infiniteSubUnSubThreads) { + infiniteSubUnSubThread.stop(); + } Assert.assertFalse( raceConditionFound, "Found race condition in KafkaConsumerService with time passed in milliseconds: " + elapsedTime);