Skip to content

Commit

Permalink
Debug for CI.
Browse files Browse the repository at this point in the history
  • Loading branch information
Hao Xu committed Sep 26, 2024
1 parent 9aeaaf8 commit bb726fc
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.utils.locks.AutoCloseableLock;
import com.linkedin.venice.utils.locks.ResourceAutoClosableLockManager;
import io.tehuti.metrics.MetricsRepository;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -37,7 +35,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.IntConsumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
Expand Down Expand Up @@ -82,8 +79,6 @@ public abstract class KafkaConsumerService extends AbstractKafkaConsumerService
private RandomAccessDaemonThreadFactory threadFactory;
private final Logger LOGGER;
private final ExecutorService consumerExecutor;
private final ResourceAutoClosableLockManager<PubSubTopicPartition> topicPartitionLockManager =
new ResourceAutoClosableLockManager<>(ReentrantLock::new);
private static final int SHUTDOWN_TIMEOUT_IN_SECOND = 1;
private static final RedundantExceptionFilter REDUNDANT_LOGGING_FILTER =
RedundantExceptionFilter.getRedundantExceptionFilter();
Expand Down Expand Up @@ -227,7 +222,7 @@ public void unsubscribeAll(PubSubTopic versionTopic) {
versionTopicToTopicPartitionToConsumer.compute(versionTopic, (k, topicPartitionToConsumerMap) -> {
if (topicPartitionToConsumerMap != null) {
topicPartitionToConsumerMap.forEach((topicPartition, sharedConsumer) -> {
try (AutoCloseableLock ignored = topicPartitionLockManager.getLockForResource(topicPartition)) {
synchronized (sharedConsumer) {
sharedConsumer.unSubscribe(topicPartition);
removeTopicPartitionFromConsumptionTask(sharedConsumer, topicPartition);
}
Expand All @@ -242,9 +237,9 @@ public void unsubscribeAll(PubSubTopic versionTopic) {
*/
@Override
public void unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) {
try (AutoCloseableLock ignored = topicPartitionLockManager.getLockForResource(pubSubTopicPartition)) {
PubSubConsumerAdapter consumer = getConsumerAssignedToVersionTopicPartition(versionTopic, pubSubTopicPartition);
if (consumer != null) {
PubSubConsumerAdapter consumer = getConsumerAssignedToVersionTopicPartition(versionTopic, pubSubTopicPartition);
if (consumer != null) {
synchronized (consumer) {
consumer.unSubscribe(pubSubTopicPartition);
removeTopicPartitionFromConsumptionTask(consumer, pubSubTopicPartition);
versionTopicToTopicPartitionToConsumer.compute(versionTopic, (k, topicPartitionToConsumerMap) -> {
Expand Down Expand Up @@ -395,13 +390,13 @@ public void startConsumptionIntoDataReceiver(
ConsumedDataReceiver<List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> consumedDataReceiver) {
PubSubTopic versionTopic = consumedDataReceiver.destinationIdentifier();
PubSubTopicPartition topicPartition = partitionReplicaIngestionContext.getPubSubTopicPartition();
try (AutoCloseableLock ignored = topicPartitionLockManager.getLockForResource(topicPartition)) {
SharedKafkaConsumer consumer = assignConsumerFor(versionTopic, topicPartition);
if (consumer == null) {
// Defensive code. Shouldn't happen except in case of a regression.
throw new VeniceException(
"Shared consumer must exist for version topic: " + versionTopic + " in Kafka cluster: " + kafkaUrl);
}
SharedKafkaConsumer consumer = assignConsumerFor(versionTopic, topicPartition);
if (consumer == null) {
// Defensive code. Shouldn't happen except in case of a regression.
throw new VeniceException(
"Shared consumer must exist for version topic: " + versionTopic + " in Kafka cluster: " + kafkaUrl);
}
synchronized (consumer) {
ConsumptionTask consumptionTask = consumerToConsumptionTask.get(consumer);
if (consumptionTask == null) {
// Defensive coding. Should never happen except in case of a regression.
Expand Down Expand Up @@ -562,10 +557,4 @@ public enum ConsumerAssignmentStrategy {
public void setThreadFactory(RandomAccessDaemonThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}

@Override
public synchronized void stop() throws Exception {
topicPartitionLockManager.removeAllLocks();
super.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
import io.tehuti.metrics.Sensor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
Expand Down Expand Up @@ -493,7 +495,8 @@ public void testKafkaConsumerServiceResubscriptionConcurrency() throws Exception
MetricsRepository mockMetricsRepository = mock(MetricsRepository.class);
final Sensor mockSensor = mock(Sensor.class);
doReturn(mockSensor).when(mockMetricsRepository).sensor(anyString(), any());
int consumerNum = 6;

int versionNum = 5;

PubSubMessageDeserializer pubSubDeserializer = new PubSubMessageDeserializer(
new OptimizedKafkaValueSerializer(),
Expand All @@ -504,7 +507,7 @@ public void testKafkaConsumerServiceResubscriptionConcurrency() throws Exception
factory,
properties,
1000l,
consumerNum,
versionNum + 1,
mock(IngestionThrottler.class),
mock(KafkaClusterBasedRecordThrottler.class),
mockMetricsRepository,
Expand Down Expand Up @@ -534,7 +537,8 @@ public void testKafkaConsumerServiceResubscriptionConcurrency() throws Exception
new PubSubTopicPartitionImpl(TOPIC_REPOSITORY.getTopic(Version.composeRealTimeTopic(storeName)), 0);

CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < consumerNum; i++) {
List<Thread> infiniteSubUnSubThreads = new ArrayList<>();
for (int i = 0; i < versionNum; i++) {
PubSubTopic topicV1ForStoreName3 = TOPIC_REPOSITORY.getTopic(Version.composeKafkaTopic(storeName, i));
StoreIngestionTask task = mock(StoreIngestionTask.class);
when(task.getVersionTopic()).thenReturn(topicV1ForStoreName3);
Expand All @@ -555,11 +559,15 @@ public void testKafkaConsumerServiceResubscriptionConcurrency() throws Exception
countDownLatch);
Thread infiniteSubUnSubThread = new Thread(infiniteSubUnSub, "infiniteResubscribe: " + topicV1ForStoreName3);
infiniteSubUnSubThread.start();
infiniteSubUnSubThreads.add(infiniteSubUnSubThread);
}

long currentTime = System.currentTimeMillis();
Boolean raceConditionFound = countDownLatch.await(30, TimeUnit.SECONDS);
long elapsedTime = System.currentTimeMillis() - currentTime;
for (Thread infiniteSubUnSubThread: infiniteSubUnSubThreads) {
infiniteSubUnSubThread.stop();
}
Assert.assertFalse(
raceConditionFound,
"Found race condition in KafkaConsumerService with time passed in milliseconds: " + elapsedTime);
Expand Down

0 comments on commit bb726fc

Please sign in to comment.