Skip to content

Commit

Permalink
Improve the lock and add explaination.
Browse files Browse the repository at this point in the history
  • Loading branch information
Hao Xu committed Sep 26, 2024
1 parent bb726fc commit b3b7174
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -242,15 +242,15 @@ public void unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTop
synchronized (consumer) {
consumer.unSubscribe(pubSubTopicPartition);
removeTopicPartitionFromConsumptionTask(consumer, pubSubTopicPartition);
versionTopicToTopicPartitionToConsumer.compute(versionTopic, (k, topicPartitionToConsumerMap) -> {
if (topicPartitionToConsumerMap != null) {
topicPartitionToConsumerMap.remove(pubSubTopicPartition);
return topicPartitionToConsumerMap.isEmpty() ? null : topicPartitionToConsumerMap;
} else {
return null;
}
});
}
versionTopicToTopicPartitionToConsumer.compute(versionTopic, (k, topicPartitionToConsumerMap) -> {
if (topicPartitionToConsumerMap != null) {
topicPartitionToConsumerMap.remove(pubSubTopicPartition);
return topicPartitionToConsumerMap.isEmpty() ? null : topicPartitionToConsumerMap;
} else {
return null;
}
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,11 @@ private void verifyConsumerServiceStartConsumptionIntoDataReceiver(
partitionReplicaIngestionContext.getPubSubTopicPartition());
}

/**
* This test is to simulate multiple threads resubscribing to the same real-time topic partition for different store
* versions and verify if the lock will protect the handoff for {@link ConsumptionTask} and {@link ConsumedDataReceiver}
* during the re-subscription.
*/
@Test
public void testKafkaConsumerServiceResubscriptionConcurrency() throws Exception {
ApacheKafkaConsumerAdapter consumer1 = mock(ApacheKafkaConsumerAdapter.class);
Expand All @@ -507,7 +512,7 @@ public void testKafkaConsumerServiceResubscriptionConcurrency() throws Exception
factory,
properties,
1000l,
versionNum + 1,
versionNum + 1, // Plus 1 to guarantee the consumer pool size is larger than the # of versions.
mock(IngestionThrottler.class),
mock(KafkaClusterBasedRecordThrottler.class),
mockMetricsRepository,
Expand Down Expand Up @@ -568,6 +573,7 @@ public void testKafkaConsumerServiceResubscriptionConcurrency() throws Exception
for (Thread infiniteSubUnSubThread: infiniteSubUnSubThreads) {
infiniteSubUnSubThread.stop();
}
consumerService.close();
Assert.assertFalse(
raceConditionFound,
"Found race condition in KafkaConsumerService with time passed in milliseconds: " + elapsedTime);
Expand Down

0 comments on commit b3b7174

Please sign in to comment.