diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java index 5e8bb8a592..c9c0bc6411 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java @@ -222,6 +222,10 @@ public void unsubscribeAll(PubSubTopic versionTopic) { versionTopicToTopicPartitionToConsumer.compute(versionTopic, (k, topicPartitionToConsumerMap) -> { if (topicPartitionToConsumerMap != null) { topicPartitionToConsumerMap.forEach((topicPartition, sharedConsumer) -> { + /** + * Refer {@link KafkaConsumerService#startConsumptionIntoDataReceiver} for avoid race condition during when + * setting data receiver and un-subscription concurrently for the same topic partition on a shared consumer. + */ synchronized (sharedConsumer) { sharedConsumer.unSubscribe(topicPartition); removeTopicPartitionFromConsumptionTask(sharedConsumer, topicPartition); @@ -239,6 +243,10 @@ public void unsubscribeAll(PubSubTopic versionTopic) { public void unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) { PubSubConsumerAdapter consumer = getConsumerAssignedToVersionTopicPartition(versionTopic, pubSubTopicPartition); if (consumer != null) { + /** + * Refer {@link KafkaConsumerService#startConsumptionIntoDataReceiver} for avoid race condition during when setting + * data receiver and un-subscription concurrently for the same topic partition on a shared consumer. + */ synchronized (consumer) { consumer.unSubscribe(pubSubTopicPartition); removeTopicPartitionFromConsumptionTask(consumer, pubSubTopicPartition); @@ -269,20 +277,25 @@ public void batchUnsubscribe(PubSubTopic versionTopic, Set /** * Leverage {@link PubSubConsumerAdapter#batchUnsubscribe(Set)}. */ - consumerUnSubTopicPartitionSet.forEach((c, tpSet) -> { - c.batchUnsubscribe(tpSet); - ConsumptionTask task = consumerToConsumptionTask.get(c); - tpSet.forEach(tp -> { - task.removeDataReceiver(tp); - versionTopicToTopicPartitionToConsumer.compute(versionTopic, (k, topicPartitionToConsumerMap) -> { - if (topicPartitionToConsumerMap != null) { - topicPartitionToConsumerMap.remove(tp); - return topicPartitionToConsumerMap.isEmpty() ? null : topicPartitionToConsumerMap; - } else { - return null; - } - }); - }); + consumerUnSubTopicPartitionSet.forEach((sharedConsumer, tpSet) -> { + ConsumptionTask task = consumerToConsumptionTask.get(sharedConsumer); + /** + * Refer {@link KafkaConsumerService#startConsumptionIntoDataReceiver} for avoid race condition during when setting + * data receiver and un-subscription concurrently for the same topic partition on a shared consumer. + */ + synchronized (sharedConsumer) { + sharedConsumer.batchUnsubscribe(tpSet); + tpSet.forEach(task::removeDataReceiver); + } + tpSet.forEach( + tp -> versionTopicToTopicPartitionToConsumer.compute(versionTopic, (k, topicPartitionToConsumerMap) -> { + if (topicPartitionToConsumerMap != null) { + topicPartitionToConsumerMap.remove(tp); + return topicPartitionToConsumerMap.isEmpty() ? null : topicPartitionToConsumerMap; + } else { + return null; + } + })); }); } @@ -396,6 +409,12 @@ public void startConsumptionIntoDataReceiver( throw new VeniceException( "Shared consumer must exist for version topic: " + versionTopic + " in Kafka cluster: " + kafkaUrl); } + /** + * It is possible that when one {@link StoreIngestionTask} thread finishes unsubscribing a topic partition but not + * finish removing data receiver, but the other {@link StoreIngestionTask} thread is setting data receiver for this + * topic partition before subscription. As {@link ConsumptionTask} does not allow 2 different data receivers for + * the same topic partition, it will throw exception. + */ synchronized (consumer) { ConsumptionTask consumptionTask = consumerToConsumptionTask.get(consumer); if (consumptionTask == null) { diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegatorTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegatorTest.java index a473eabb68..2b74895316 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegatorTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegatorTest.java @@ -34,6 +34,7 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -596,10 +597,14 @@ private Runnable getResubscriptionRunnableFor( Version.parseVersionFromKafkaTopicName(partitionReplicaIngestionContext.getVersionTopic().getName()); // Here we did not consider batchUnsubscribe, as it is only for batch-only stores with version topic // partitions. - if (versionNum % 2 == 0) { + if (versionNum % 3 == 0) { consumerServiceDelegator.unSubscribe(versionTopic, pubSubTopicPartition); - } else { + } else if (versionNum % 3 == 1) { consumerServiceDelegator.unsubscribeAll(partitionReplicaIngestionContext.getVersionTopic()); + } else { + consumerServiceDelegator.batchUnsubscribe( + partitionReplicaIngestionContext.getVersionTopic(), + Collections.singleton(partitionReplicaIngestionContext.getPubSubTopicPartition())); } } } catch (Exception e) {