Skip to content

Commit

Permalink
Add java doc.
Browse files Browse the repository at this point in the history
  • Loading branch information
Hao Xu committed Sep 30, 2024
1 parent 0ad40ea commit adade07
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,10 @@ public void unsubscribeAll(PubSubTopic versionTopic) {
versionTopicToTopicPartitionToConsumer.compute(versionTopic, (k, topicPartitionToConsumerMap) -> {
if (topicPartitionToConsumerMap != null) {
topicPartitionToConsumerMap.forEach((topicPartition, sharedConsumer) -> {
/**
* Refer {@link KafkaConsumerService#startConsumptionIntoDataReceiver} for avoid race condition during when
* setting data receiver and un-subscription concurrently for the same topic partition on a shared consumer.
*/
synchronized (sharedConsumer) {
sharedConsumer.unSubscribe(topicPartition);
removeTopicPartitionFromConsumptionTask(sharedConsumer, topicPartition);
Expand All @@ -239,6 +243,10 @@ public void unsubscribeAll(PubSubTopic versionTopic) {
public void unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) {
PubSubConsumerAdapter consumer = getConsumerAssignedToVersionTopicPartition(versionTopic, pubSubTopicPartition);
if (consumer != null) {
/**
* Refer {@link KafkaConsumerService#startConsumptionIntoDataReceiver} for avoid race condition during when setting
* data receiver and un-subscription concurrently for the same topic partition on a shared consumer.
*/
synchronized (consumer) {
consumer.unSubscribe(pubSubTopicPartition);
removeTopicPartitionFromConsumptionTask(consumer, pubSubTopicPartition);
Expand Down Expand Up @@ -269,20 +277,25 @@ public void batchUnsubscribe(PubSubTopic versionTopic, Set<PubSubTopicPartition>
/**
* Leverage {@link PubSubConsumerAdapter#batchUnsubscribe(Set)}.
*/
consumerUnSubTopicPartitionSet.forEach((c, tpSet) -> {
c.batchUnsubscribe(tpSet);
ConsumptionTask task = consumerToConsumptionTask.get(c);
tpSet.forEach(tp -> {
task.removeDataReceiver(tp);
versionTopicToTopicPartitionToConsumer.compute(versionTopic, (k, topicPartitionToConsumerMap) -> {
if (topicPartitionToConsumerMap != null) {
topicPartitionToConsumerMap.remove(tp);
return topicPartitionToConsumerMap.isEmpty() ? null : topicPartitionToConsumerMap;
} else {
return null;
}
});
});
consumerUnSubTopicPartitionSet.forEach((sharedConsumer, tpSet) -> {
ConsumptionTask task = consumerToConsumptionTask.get(sharedConsumer);
/**
* Refer {@link KafkaConsumerService#startConsumptionIntoDataReceiver} for avoid race condition during when setting
* data receiver and un-subscription concurrently for the same topic partition on a shared consumer.
*/
synchronized (sharedConsumer) {
sharedConsumer.batchUnsubscribe(tpSet);
tpSet.forEach(task::removeDataReceiver);
}
tpSet.forEach(
tp -> versionTopicToTopicPartitionToConsumer.compute(versionTopic, (k, topicPartitionToConsumerMap) -> {
if (topicPartitionToConsumerMap != null) {
topicPartitionToConsumerMap.remove(tp);
return topicPartitionToConsumerMap.isEmpty() ? null : topicPartitionToConsumerMap;
} else {
return null;
}
}));
});
}

Expand Down Expand Up @@ -396,6 +409,12 @@ public void startConsumptionIntoDataReceiver(
throw new VeniceException(
"Shared consumer must exist for version topic: " + versionTopic + " in Kafka cluster: " + kafkaUrl);
}
/**
* It is possible that when one {@link StoreIngestionTask} thread finishes unsubscribing a topic partition but not
* finish removing data receiver, but the other {@link StoreIngestionTask} thread is setting data receiver for this
* topic partition before subscription. As {@link ConsumptionTask} does not allow 2 different data receivers for
* the same topic partition, it will throw exception.
*/
synchronized (consumer) {
ConsumptionTask consumptionTask = consumerToConsumptionTask.get(consumer);
if (consumptionTask == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -596,10 +597,14 @@ private Runnable getResubscriptionRunnableFor(
Version.parseVersionFromKafkaTopicName(partitionReplicaIngestionContext.getVersionTopic().getName());
// Here we did not consider batchUnsubscribe, as it is only for batch-only stores with version topic
// partitions.
if (versionNum % 2 == 0) {
if (versionNum % 3 == 0) {
consumerServiceDelegator.unSubscribe(versionTopic, pubSubTopicPartition);
} else {
} else if (versionNum % 3 == 1) {
consumerServiceDelegator.unsubscribeAll(partitionReplicaIngestionContext.getVersionTopic());
} else {
consumerServiceDelegator.batchUnsubscribe(
partitionReplicaIngestionContext.getVersionTopic(),
Collections.singleton(partitionReplicaIngestionContext.getPubSubTopicPartition()));
}
}
} catch (Exception e) {
Expand Down

0 comments on commit adade07

Please sign in to comment.