diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index 8acc5d3535..beb84ca31c 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -79,6 +79,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -446,7 +447,8 @@ private synchronized void bootstrap() { storageMetadataService, ingestionService, storageService, - blobTransferManager) + blobTransferManager, + this::getVeniceCurrentVersionNumber) : new DefaultIngestionBackend( storageMetadataService, ingestionService, @@ -656,6 +658,11 @@ Version getVeniceCurrentVersion(String storeName) { } } + int getVeniceCurrentVersionNumber(String storeName) { + Version currentVersion = getVeniceCurrentVersion(storeName); + return currentVersion == null ? -1 : currentVersion.getNumber(); + } + private Version getVeniceLatestNonFaultyVersion(Store store, Set faultyVersions) { Version latestNonFaultyVersion = null; for (Version version: store.getVersions()) { @@ -818,4 +825,30 @@ static ExecutionStatus getDaVinciErrorStatus(Exception e, boolean useDaVinciSpec } return status; } + + public boolean hasCurrentVersionBootstrapping() { + return ingestionBackend.hasCurrentVersionBootstrapping(); + } + + static class BootstrappingAwareCompletableFuture { + private ScheduledExecutorService scheduledExecutor = + Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("DaVinci_Bootstrapping_Check_Executor")); + public final CompletableFuture bootstrappingFuture = new CompletableFuture<>(); + + public BootstrappingAwareCompletableFuture(DaVinciBackend backend) { + scheduledExecutor.scheduleAtFixedRate(() -> { + if (bootstrappingFuture.isDone()) { + return; + } + if (!backend.hasCurrentVersionBootstrapping()) { + bootstrappingFuture.complete(null); + } + }, 0, 3, TimeUnit.SECONDS); + bootstrappingFuture.whenComplete((ignored1, ignored2) -> scheduledExecutor.shutdown()); + } + + public CompletableFuture getBootstrappingFuture() { + return bootstrappingFuture; + } + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/VersionBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/VersionBackend.java index e3b0cc59e8..b609f32b4f 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/VersionBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/VersionBackend.java @@ -193,7 +193,7 @@ synchronized void tryStartHeartbeat() { if (isReportingPushStatus() && heartbeat == null) { heartbeat = backend.getExecutor().scheduleAtFixedRate(() -> { try { - backend.getPushStatusStoreWriter().writeHeartbeat(version.getStoreName()); + sendOutHeartbeat(backend, version); } catch (Throwable t) { LOGGER.error("Unable to send heartbeat for {}", this); } @@ -201,6 +201,22 @@ synchronized void tryStartHeartbeat() { } } + protected static void sendOutHeartbeat(DaVinciBackend backend, Version version) { + if (backend.hasCurrentVersionBootstrapping()) { + LOGGER.info( + "DaVinci still is still bootstrapping, so it will send heart-beat message with a special timestamp" + + " for store: {} to avoid delaying the new push job", + version.getStoreName()); + /** + * Tell backend that the report from the bootstrapping instance doesn't count to avoid + * delaying new pushes. + */ + backend.getPushStatusStoreWriter().writeHeartbeatForBootstrappingInstance(version.getStoreName()); + } else { + backend.getPushStatusStoreWriter().writeHeartbeat(version.getStoreName()); + } + } + synchronized void tryStopHeartbeat() { if (heartbeat != null && partitionFutures.values().stream().allMatch(CompletableFuture::isDone)) { heartbeat.cancel(true); @@ -359,9 +375,40 @@ synchronized CompletableFuture subscribe(ComplementSet partitions futures.add(partitionFutures.get(partition)); } - return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((v, e) -> { + CompletableFuture bootstrappingAwareSubscriptionFuture = new CompletableFuture<>(); + + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((v, e) -> { storeBackendStats.recordSubscribeDuration(Duration.between(startTime, Instant.now())); + if (e != null) { + bootstrappingAwareSubscriptionFuture.completeExceptionally(e); + LOGGER.warn("Bootstrapping store: {}, version: {} failed", version.getStoreName(), version.getNumber(), e); + } else { + LOGGER.info("Bootstrapping store: {}, version: {} is completed", version.getStoreName(), version.getNumber()); + /** + * It is important to start polling the bootstrapping status after the version ingestion is completed to + * make sure the bootstrapping status polling is valid (not doing polling without any past/active ingestion tasks). + */ + new DaVinciBackend.BootstrappingAwareCompletableFuture(backend).getBootstrappingFuture() + .whenComplete((ignored, ee) -> { + if (ee != null) { + bootstrappingAwareSubscriptionFuture.completeExceptionally(ee); + LOGGER.warn( + "Bootstrapping aware subscription to store: {}, version: {} failed", + version.getStoreName(), + version.getNumber(), + ee); + } else { + bootstrappingAwareSubscriptionFuture.complete(null); + LOGGER.info( + "Bootstrapping aware subscription to store: {}, version: {} is completed", + version.getStoreName(), + version.getNumber()); + } + }); + } }); + + return bootstrappingAwareSubscriptionFuture; } synchronized void unsubscribe(ComplementSet partitions) { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java index 7f57536048..efe31697bd 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java @@ -188,6 +188,11 @@ public void setStorageEngineReference( } } + @Override + public boolean hasCurrentVersionBootstrapping() { + return getStoreIngestionService().hasCurrentVersionBootstrapping(); + } + @Override public KafkaStoreIngestionService getStoreIngestionService() { return storeIngestionService; diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/IngestionBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/IngestionBackend.java index f8d6127bfb..97094e0752 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/IngestionBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/IngestionBackend.java @@ -54,4 +54,9 @@ void dropStoragePartitionGracefully( // setStorageEngineReference is used by Da Vinci exclusively to speed up storage engine retrieval for read path. void setStorageEngineReference(String topicName, AtomicReference storageEngineReference); + + /** + * Check whether there are any current version bootstrapping or not. + */ + boolean hasCurrentVersionBootstrapping(); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/IsolatedIngestionBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/IsolatedIngestionBackend.java index f9716f6a52..500374d06b 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/IsolatedIngestionBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/IsolatedIngestionBackend.java @@ -11,6 +11,7 @@ import com.linkedin.davinci.ingestion.main.MainIngestionRequestClient; import com.linkedin.davinci.ingestion.main.MainIngestionStorageMetadataService; import com.linkedin.davinci.ingestion.main.MainPartitionIngestionStatus; +import com.linkedin.davinci.ingestion.main.MainTopicIngestionStatus; import com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService; import com.linkedin.davinci.notifier.RelayNotifier; import com.linkedin.davinci.notifier.VeniceNotifier; @@ -19,12 +20,15 @@ import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.ingestion.protocol.enums.IngestionCommandType; import com.linkedin.venice.ingestion.protocol.enums.IngestionComponentType; +import com.linkedin.venice.meta.Version; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; import io.tehuti.metrics.MetricsRepository; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.function.Function; import java.util.function.Supplier; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -50,6 +54,7 @@ public class IsolatedIngestionBackend extends DefaultIngestionBackend implements private final MainIngestionMonitorService mainIngestionMonitorService; private final VeniceConfigLoader configLoader; private final ExecutorService completionReportHandlingExecutor = Executors.newFixedThreadPool(10); + private final Function currentVersionSupplier; private Process isolatedIngestionServiceProcess; public IsolatedIngestionBackend( @@ -58,7 +63,8 @@ public IsolatedIngestionBackend( StorageMetadataService storageMetadataService, KafkaStoreIngestionService storeIngestionService, StorageService storageService, - BlobTransferManager blobTransferManager) { + BlobTransferManager blobTransferManager, + Function currentVersionSupplier) { super( storageMetadataService, storeIngestionService, @@ -68,6 +74,7 @@ public IsolatedIngestionBackend( int servicePort = configLoader.getVeniceServerConfig().getIngestionServicePort(); int listenerPort = configLoader.getVeniceServerConfig().getIngestionApplicationPort(); this.configLoader = configLoader; + this.currentVersionSupplier = currentVersionSupplier; // Create the ingestion request client. mainIngestionRequestClient = new MainIngestionRequestClient(configLoader); // Create the forked isolated ingestion process. @@ -192,6 +199,10 @@ public MainIngestionMonitorService getMainIngestionMonitorService() { return mainIngestionMonitorService; } + Function getCurrentVersionSupplier() { + return currentVersionSupplier; + } + public MainIngestionRequestClient getMainIngestionRequestClient() { return mainIngestionRequestClient; } @@ -218,6 +229,31 @@ public void close() { } } + public boolean hasCurrentVersionBootstrapping() { + if (super.hasCurrentVersionBootstrapping()) { + return true; + } + + Map topicIngestionStatusMap = + getMainIngestionMonitorService().getTopicIngestionStatusMap(); + for (Map.Entry entry: topicIngestionStatusMap.entrySet()) { + String topicName = entry.getKey(); + MainTopicIngestionStatus ingestionStatus = entry.getValue(); + String storeName = Version.parseStoreFromKafkaTopicName(topicName); + int version = Version.parseVersionFromKafkaTopicName(topicName); + /** + * If the current version is still being ingested by isolated process, it means the bootstrapping hasn't finished + * yet as the ingestion task should be handled over to main process if all partitions complete ingestion. + */ + if (getCurrentVersionSupplier().apply(storeName) == version + && ingestionStatus.hasPartitionIngestingInIsolatedProcess()) { + return true; + } + } + + return false; + } + boolean isTopicPartitionHostedInMainProcess(String topicName, int partition) { return getMainIngestionMonitorService().getTopicPartitionIngestionStatus(topicName, partition) .equals(MainPartitionIngestionStatus.MAIN); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/main/MainTopicIngestionStatus.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/main/MainTopicIngestionStatus.java index 91214aa7d5..270dfc6f53 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/main/MainTopicIngestionStatus.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/main/MainTopicIngestionStatus.java @@ -43,4 +43,13 @@ public long getIngestingPartitionCount() { public String getTopicName() { return topicName; } + + public boolean hasPartitionIngestingInIsolatedProcess() { + for (Map.Entry entry: ingestionStatusMap.entrySet()) { + if (entry.getValue().equals(MainPartitionIngestionStatus.ISOLATED)) { + return true; + } + } + return false; + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java index e073b20b72..ac4cbc0e65 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java @@ -557,6 +557,20 @@ private static void shutdownExecutorService(ExecutorService executor, String nam } } + public boolean hasCurrentVersionBootstrapping() { + return hasCurrentVersionBootstrapping(topicNameToIngestionTaskMap); + } + + public static boolean hasCurrentVersionBootstrapping(Map ingestionTaskMap) { + for (Map.Entry entry: ingestionTaskMap.entrySet()) { + StoreIngestionTask task = entry.getValue(); + if (task.isCurrentVersion() && !task.hasAllPartitionReportedCompleted()) { + return true; + } + } + return false; + } + /** * Stops all the Kafka consumption tasks. * Closes all the Kafka clients. diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java index 50b36b157a..2bab2878d8 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java @@ -4,6 +4,10 @@ import static com.linkedin.venice.pushmonitor.ExecutionStatus.ERROR; import static com.linkedin.venice.utils.DataProviderUtils.BOOLEAN; import static com.linkedin.venice.utils.DataProviderUtils.allPermutationGenerator; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.fail; @@ -11,6 +15,9 @@ import com.linkedin.venice.exceptions.MemoryLimitExhaustedException; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.pushmonitor.ExecutionStatus; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -119,4 +126,17 @@ public void testGetDaVinciErrorStatusWithInvalidCases( } } + @Test + public void testBootstrappingAwareCompletableFuture() + throws ExecutionException, InterruptedException, TimeoutException { + DaVinciBackend backend = mock(DaVinciBackend.class); + + when(backend.hasCurrentVersionBootstrapping()).thenReturn(true).thenReturn(false); + + DaVinciBackend.BootstrappingAwareCompletableFuture future = + new DaVinciBackend.BootstrappingAwareCompletableFuture(backend); + future.getBootstrappingFuture().get(10, TimeUnit.SECONDS); + verify(backend, times(2)).hasCurrentVersionBootstrapping(); + } + } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/StoreBackendTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/StoreBackendTest.java index b1f636edb2..e11dc14412 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/StoreBackendTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/StoreBackendTest.java @@ -37,6 +37,7 @@ import com.linkedin.venice.utils.ComplementSet; import com.linkedin.venice.utils.PropertyBuilder; import com.linkedin.venice.utils.ReferenceCounted; +import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.VeniceProperties; import io.tehuti.Metric; @@ -98,6 +99,7 @@ void setUp() { when(backend.getVeniceCurrentVersion(anyString())).thenCallRealMethod(); when(backend.getIngestionBackend()).thenReturn(ingestionBackend); when(backend.getCompressorFactory()).thenReturn(compressorFactory); + when(backend.hasCurrentVersionBootstrapping()).thenReturn(false); doCallRealMethod().when(backend).handleStoreChanged(any()); store = new ZKStore( @@ -146,7 +148,7 @@ void testSubscribeCurrentVersion() throws Exception { CompletableFuture subscribeResult = storeBackend.subscribe(ComplementSet.of(partition)); TimeUnit.MILLISECONDS.sleep(v1SubscribeDurationMs); versionMap.get(version1.kafkaTopicName()).completePartition(partition); - subscribeResult.get(0, TimeUnit.SECONDS); + subscribeResult.get(3, TimeUnit.SECONDS); // Verify that subscribe selected the current version by default. try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { assertEquals(versionRef.get().getVersion().getNumber(), version1.getNumber()); @@ -220,7 +222,7 @@ void testSubscribeWithoutCurrentVersion() throws Exception { // Expecting to subscribe to the latest version (version2). CompletableFuture subscribeResult = storeBackend.subscribe(ComplementSet.of(partition)); versionMap.get(version2.kafkaTopicName()).completePartition(partition); - subscribeResult.get(0, TimeUnit.SECONDS); + subscribeResult.get(3, TimeUnit.SECONDS); // Verify that subscribe selected the latest version as current. try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { assertEquals(versionRef.get().getVersion().getNumber(), version2.getNumber()); @@ -238,7 +240,7 @@ void testSubscribeBootstrapVersion() throws Exception { // Expecting to subscribe to the specified version (version1), which is neither current nor latest. CompletableFuture subscribeResult = storeBackend.subscribe(ComplementSet.of(partition), Optional.of(version1)); versionMap.get(version1.kafkaTopicName()).completePartition(partition); - subscribeResult.get(0, TimeUnit.SECONDS); + subscribeResult.get(3, TimeUnit.SECONDS); // Verify that subscribe selected the specified version as current. try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { assertEquals(versionRef.get().getVersion().getNumber(), version1.getNumber()); @@ -247,9 +249,11 @@ void testSubscribeBootstrapVersion() throws Exception { // Simulate future version ingestion is complete. versionMap.get(version2.kafkaTopicName()).completePartition(partition); // Verify that future version became current once ingestion is complete. - try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { - assertEquals(versionRef.get().getVersion().getNumber(), version2.getNumber()); - } + TestUtils.waitForNonDeterministicAssertion(3, TimeUnit.SECONDS, () -> { + try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { + assertEquals(versionRef.get().getVersion().getNumber(), version2.getNumber()); + } + }); } @Test @@ -258,7 +262,7 @@ void testFutureVersionFailure() throws Exception { // Expecting to subscribe to version1 and that version2 is a future version. CompletableFuture subscribeResult = storeBackend.subscribe(ComplementSet.of(partition)); versionMap.get(version1.kafkaTopicName()).completePartition(partition); - subscribeResult.get(0, TimeUnit.SECONDS); + subscribeResult.get(3, TimeUnit.SECONDS); assertTrue(versionMap.containsKey(version2.kafkaTopicName())); // Simulate future version kill and removal from Venice. @@ -280,23 +284,29 @@ void testFutureVersionFailure() throws Exception { versionMap.get(version3.kafkaTopicName()).completePartitionExceptionally(partition, new Exception()); // Verify that neither of the bad versions became current. - try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { - assertEquals(versionRef.get().getVersion().getNumber(), version1.getNumber()); - } + TestUtils.waitForNonDeterministicAssertion(3, TimeUnit.SECONDS, () -> { + try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { + assertEquals(versionRef.get().getVersion().getNumber(), version1.getNumber()); + } + }); versionMap.get(version4.kafkaTopicName()).completePartition(partition); // Verify that version 4 did not become current even if ingestion is complete. - try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { - assertEquals(versionRef.get().getVersion().getNumber(), version1.getNumber()); - } + TestUtils.waitForNonDeterministicAssertion(3, TimeUnit.SECONDS, () -> { + try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { + assertEquals(versionRef.get().getVersion().getNumber(), version1.getNumber()); + } + }); // Mark the version 4 as current. store.setCurrentVersion(version4.getNumber()); backend.handleStoreChanged(storeBackend); // Verify that successfully ingested version became current. - try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { - assertEquals(versionRef.get().getVersion().getNumber(), version4.getNumber()); - } + TestUtils.waitForNonDeterministicAssertion(3, TimeUnit.SECONDS, () -> { + try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { + assertEquals(versionRef.get().getVersion().getNumber(), version4.getNumber()); + } + }); // Simulate new version push and subsequent ingestion failure. Version version5 = new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 30); @@ -320,16 +330,18 @@ void testSubscribeUnsubscribe() throws Exception { assertFalse(subscribeResult.isDone()); storeBackend.unsubscribe(ComplementSet.of(1)); // Verify that unsubscribe completed pending subscribe without failing it. - subscribeResult.get(0, TimeUnit.SECONDS); - try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { - assertTrue(versionRef.get().isPartitionReadyToServe(0)); - assertFalse(versionRef.get().isPartitionReadyToServe(1)); - } + subscribeResult.get(3, TimeUnit.SECONDS); + TestUtils.waitForNonDeterministicAssertion(3, TimeUnit.SECONDS, () -> { + try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { + assertTrue(versionRef.get().isPartitionReadyToServe(0)); + assertFalse(versionRef.get().isPartitionReadyToServe(1)); + } + }); // Simulate unsubscribe from all partitions while future version ingestion is pending. subscribeResult = storeBackend.subscribe(ComplementSet.universalSet()); storeBackend.unsubscribe(ComplementSet.universalSet()); - subscribeResult.get(0, TimeUnit.SECONDS); + subscribeResult.get(3, TimeUnit.SECONDS); // Verify that all versions were deleted because subscription set became empty. assertTrue(versionMap.isEmpty()); assertEquals(FileUtils.sizeOfDirectory(baseDataPath), 0); @@ -377,24 +389,30 @@ void testRollbackAndRollForward() { versionMap.get(version3.kafkaTopicName()).completePartition(partition); store.setCurrentVersion(version3.getNumber()); backend.handleStoreChanged(storeBackend); - try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { - assertEquals(versionRef.get().getVersion().getNumber(), version3.getNumber()); - } + TestUtils.waitForNonDeterministicAssertion(3, TimeUnit.SECONDS, () -> { + try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { + assertEquals(versionRef.get().getVersion().getNumber(), version3.getNumber()); + } + }); // Rollback happens here, expecting Da Vinci to switch back to v1. store.setCurrentVersion(1); backend.handleStoreChanged(storeBackend); versionMap.get(version1.kafkaTopicName()).completePartition(partition); - try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { - assertEquals(versionRef.get().getVersion().getNumber(), version1.getNumber()); - } + TestUtils.waitForNonDeterministicAssertion(3, TimeUnit.SECONDS, () -> { + try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { + assertEquals(versionRef.get().getVersion().getNumber(), version1.getNumber()); + } + }); versionMap.get(version2.kafkaTopicName()).completePartition(partition); store.setCurrentVersion(3); backend.handleStoreChanged(storeBackend); versionMap.get(version3.kafkaTopicName()).completePartition(partition); - try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { - assertEquals(versionRef.get().getVersion().getNumber(), version3.getNumber()); - } + TestUtils.waitForNonDeterministicAssertion(3, TimeUnit.SECONDS, () -> { + try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { + assertEquals(versionRef.get().getVersion().getNumber(), version3.getNumber()); + } + }); } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/VersionBackendTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/VersionBackendTest.java index ef19d58989..0c22d61d94 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/VersionBackendTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/VersionBackendTest.java @@ -6,12 +6,14 @@ import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.VersionImpl; import com.linkedin.venice.pushmonitor.ExecutionStatus; +import com.linkedin.venice.pushstatushelper.PushStatusStoreWriter; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import java.util.ArrayList; import java.util.Collections; @@ -89,4 +91,32 @@ public void testMaybeReportBatchEOIPStatus() { verify(mockConsumer, times(1)).accept("inc_49"); Assert.assertFalse(partitionToBatchReportEOIPEnabled.get(0)); } + + @Test + public void testSendOutHeartBeat() { + String storeName = "test_store"; + DaVinciBackend backend = mock(DaVinciBackend.class); + doReturn(true).when(backend).hasCurrentVersionBootstrapping(); + PushStatusStoreWriter mockWriter = mock(PushStatusStoreWriter.class); + doReturn(mockWriter).when(backend).getPushStatusStoreWriter(); + + Version currentVersion = mock(Version.class); + doReturn(storeName).when(currentVersion).getStoreName(); + doReturn(1).when(currentVersion).getNumber(); + Version futureVersion = mock(Version.class); + doReturn(storeName).when(futureVersion).getStoreName(); + doReturn(2).when(futureVersion).getNumber(); + + VersionBackend.sendOutHeartbeat(backend, currentVersion); + VersionBackend.sendOutHeartbeat(backend, futureVersion); + + verify(mockWriter, times(2)).writeHeartbeatForBootstrappingInstance(storeName); + verify(mockWriter, never()).writeHeartbeat(storeName); + + doReturn(false).when(backend).hasCurrentVersionBootstrapping(); + VersionBackend.sendOutHeartbeat(backend, currentVersion); + VersionBackend.sendOutHeartbeat(backend, futureVersion); + + verify(mockWriter, times(2)).writeHeartbeat(storeName); + } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/DefaultIngestionBackendTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/DefaultIngestionBackendTest.java index a5dcf48fd9..bc60ed7d77 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/DefaultIngestionBackendTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/DefaultIngestionBackendTest.java @@ -4,10 +4,14 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; import com.linkedin.davinci.blobtransfer.BlobTransferManager; import com.linkedin.davinci.config.VeniceServerConfig; @@ -26,7 +30,6 @@ import java.util.concurrent.CompletableFuture; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -108,6 +111,18 @@ public void testStartConsumptionWithBlobTransferWhenNoPeerFound() { CompletableFuture future = ingestionBackend.bootstrapFromBlobs(store, VERSION_NUMBER, PARTITION).toCompletableFuture(); - Assert.assertTrue(future.isDone()); + assertTrue(future.isDone()); + } + + @Test + public void testHasCurrentVersionBootstrapping() { + KafkaStoreIngestionService mockIngestionService = mock(KafkaStoreIngestionService.class); + DefaultIngestionBackend ingestionBackend = + new DefaultIngestionBackend(null, mockIngestionService, null, null, null); + doReturn(true).when(mockIngestionService).hasCurrentVersionBootstrapping(); + assertTrue(ingestionBackend.hasCurrentVersionBootstrapping()); + + doReturn(false).when(mockIngestionService).hasCurrentVersionBootstrapping(); + assertFalse(ingestionBackend.hasCurrentVersionBootstrapping()); } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/IsolatedIngestionBackendTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/IsolatedIngestionBackendTest.java index c9b88f9b0e..5f52d4e578 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/IsolatedIngestionBackendTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/IsolatedIngestionBackendTest.java @@ -10,11 +10,14 @@ import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; import com.linkedin.davinci.config.VeniceConfigLoader; import com.linkedin.davinci.config.VeniceStoreVersionConfig; @@ -31,11 +34,13 @@ import com.linkedin.venice.utils.Pair; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import java.util.function.Supplier; import org.testng.Assert; import org.testng.annotations.Test; @@ -269,4 +274,49 @@ public void testBackendCanMaintainMetadataCorrectlyForDroppingPartition() { Assert.assertEquals(topicIngestionStatusMap.get(topic).getPartitionIngestionStatus(partition), NOT_EXIST); } } + + @Test + public void testHasCurrentVersionBootstrapping() { + IsolatedIngestionBackend mockBackend = mock(IsolatedIngestionBackend.class); + MainIngestionMonitorService mockMonitorService = mock(MainIngestionMonitorService.class); + + Map ingestionStatusMap = new HashMap<>(); + MainTopicIngestionStatus store1V1IngestionStatus = new MainTopicIngestionStatus("store1_v1"); + MainTopicIngestionStatus store1V2IngestionStatus = new MainTopicIngestionStatus("store1_v2"); + MainTopicIngestionStatus store2V2IngestionStatus = new MainTopicIngestionStatus("store2_v2"); + ingestionStatusMap.put("store1_v1", store1V1IngestionStatus); + ingestionStatusMap.put("store1_v2", store1V2IngestionStatus); + ingestionStatusMap.put("store2_v2", store2V2IngestionStatus); + + doReturn(ingestionStatusMap).when(mockMonitorService).getTopicIngestionStatusMap(); + store1V1IngestionStatus.setPartitionIngestionStatusToIsolatedIngestion(1); + store1V1IngestionStatus.setPartitionIngestionStatusToLocalIngestion(2); + store1V2IngestionStatus.setPartitionIngestionStatusToLocalIngestion(1); + store1V2IngestionStatus.setPartitionIngestionStatusToLocalIngestion(2); + store2V2IngestionStatus.setPartitionIngestionStatusToLocalIngestion(1); + + Function currentVersionSupplier = s -> { + if (s.equals("store1")) { + return 1; + } + if (s.equals("store2")) { + return 2; + } + return 3; + }; + doReturn(currentVersionSupplier).when(mockBackend).getCurrentVersionSupplier(); + doReturn(mockMonitorService).when(mockBackend).getMainIngestionMonitorService(); + + KafkaStoreIngestionService mockIngestionService = mock(KafkaStoreIngestionService.class); + doReturn(false).when(mockIngestionService).hasCurrentVersionBootstrapping(); + doReturn(mockIngestionService).when(mockBackend).getStoreIngestionService(); + + doCallRealMethod().when(mockBackend).hasCurrentVersionBootstrapping(); + + assertTrue(mockBackend.hasCurrentVersionBootstrapping()); + + // Move current version ingestion to main process + store1V1IngestionStatus.setPartitionIngestionStatusToLocalIngestion(1); + assertFalse(mockBackend.hasCurrentVersionBootstrapping()); + } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/main/MainTopicIngestionStatusTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/main/MainTopicIngestionStatusTest.java new file mode 100644 index 0000000000..23b08673e4 --- /dev/null +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/main/MainTopicIngestionStatusTest.java @@ -0,0 +1,24 @@ +package com.linkedin.davinci.ingestion.main; + +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +import org.testng.annotations.Test; + + +public class MainTopicIngestionStatusTest { + @Test + public void testHasPartitionIngestingInIsolatedProcess() { + String topicName = "test_store_v1"; + MainTopicIngestionStatus ingestionStatus = new MainTopicIngestionStatus(topicName); + ingestionStatus.setPartitionIngestionStatusToLocalIngestion(1); + ingestionStatus.setPartitionIngestionStatusToLocalIngestion(2); + assertFalse(ingestionStatus.hasPartitionIngestingInIsolatedProcess()); + + ingestionStatus.setPartitionIngestionStatusToIsolatedIngestion(3); + assertTrue(ingestionStatus.hasPartitionIngestingInIsolatedProcess()); + + ingestionStatus.removePartitionIngestionStatus(3); + assertFalse(ingestionStatus.hasPartitionIngestingInIsolatedProcess()); + } +} diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java index b537fa8291..37c664f45b 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java @@ -12,6 +12,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import com.linkedin.davinci.compression.StorageEngineBackedCompressorFactory; @@ -53,6 +54,8 @@ import io.tehuti.metrics.MetricsRepository; import it.unimi.dsi.fastutil.ints.Int2ObjectMaps; import it.unimi.dsi.fastutil.objects.Object2IntMaps; +import java.util.HashMap; +import java.util.Map; import java.util.NavigableMap; import java.util.Optional; import java.util.Properties; @@ -449,4 +452,41 @@ public void testStoreIngestionTaskShutdownLastPartition(boolean isIsolatedIngest kafkaStoreIngestionService.getTopicPartitionIngestionContext(topicName, topicName, 0); verify(kafkaConsumerService, atMostOnce()).getIngestionInfoFromConsumer(pubSubTopic, pubSubTopicPartition); } + + @Test + public void testHasCurrentVersionBootstrapping() { + StoreIngestionTask nonCurrentVersionBootstrappingTask = mock(StoreIngestionTask.class); + doReturn(false).when(nonCurrentVersionBootstrappingTask).isCurrentVersion(); + doReturn(false).when(nonCurrentVersionBootstrappingTask).hasAllPartitionReportedCompleted(); + + StoreIngestionTask nonCurrentVersionCompletedTask = mock(StoreIngestionTask.class); + doReturn(false).when(nonCurrentVersionCompletedTask).isCurrentVersion(); + doReturn(true).when(nonCurrentVersionCompletedTask).hasAllPartitionReportedCompleted(); + + StoreIngestionTask currentVersionBootstrappingTask = mock(StoreIngestionTask.class); + doReturn(true).when(currentVersionBootstrappingTask).isCurrentVersion(); + doReturn(false).when(currentVersionBootstrappingTask).hasAllPartitionReportedCompleted(); + + StoreIngestionTask currentVersionCompletedTask = mock(StoreIngestionTask.class); + doReturn(true).when(currentVersionCompletedTask).isCurrentVersion(); + doReturn(true).when(currentVersionCompletedTask).hasAllPartitionReportedCompleted(); + + Map mapContainsAllCompletedTask = new HashMap<>(); + mapContainsAllCompletedTask.put("non_current_version_completed", nonCurrentVersionCompletedTask); + mapContainsAllCompletedTask.put("current_version_completed", currentVersionCompletedTask); + + assertFalse(KafkaStoreIngestionService.hasCurrentVersionBootstrapping(mapContainsAllCompletedTask)); + + Map mapContainsNonCurrentBootstrappingTask = new HashMap<>(); + mapContainsNonCurrentBootstrappingTask.put("non_current_version_bootstrapping", nonCurrentVersionBootstrappingTask); + mapContainsNonCurrentBootstrappingTask.put("current_version_completed", currentVersionCompletedTask); + + assertFalse(KafkaStoreIngestionService.hasCurrentVersionBootstrapping(mapContainsNonCurrentBootstrappingTask)); + + Map mapContainsCurrentBootstrappingTask = new HashMap<>(); + mapContainsCurrentBootstrappingTask.put("non_current_version_bootstrapping", nonCurrentVersionBootstrappingTask); + mapContainsCurrentBootstrappingTask.put("current_version_bootstrapping", currentVersionBootstrappingTask); + + assertTrue(KafkaStoreIngestionService.hasCurrentVersionBootstrapping(mapContainsCurrentBootstrappingTask)); + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreReader.java b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreReader.java index 0070b222bf..73cefd9518 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreReader.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreReader.java @@ -34,6 +34,10 @@ * This class is a helper class for Venice controller to read PushStatus / Heartbeat messages. */ public class PushStatusStoreReader implements Closeable { + public enum InstanceStatus { + ALIVE, DEAD, BOOTSTRAPPING + } + private static final Logger LOGGER = LogManager.getLogger(PushStatusStoreReader.class); private static final int DEFAULT_HEARTBEAT_READ_TIMEOUT_SECONDS = 3; private final Map> veniceClients = @@ -226,11 +230,23 @@ public long getHeartbeat(String storeName, String instanceName) { } public boolean isInstanceAlive(String storeName, String instanceName) { - long lastReportTimeStamp = getHeartbeat(storeName, instanceName); + return isInstanceAlive(getHeartbeat(storeName, instanceName)); + } + + boolean isInstanceAlive(long lastReportTimeStamp) { return System.currentTimeMillis() - lastReportTimeStamp <= TimeUnit.SECONDS .toMillis(heartbeatExpirationTimeInSeconds); } + public InstanceStatus getInstanceStatus(String storeName, String instanceName) { + long lastReportTimeStamp = getHeartbeat(storeName, instanceName); + if (lastReportTimeStamp < 0) { + return InstanceStatus.BOOTSTRAPPING; + } + + return isInstanceAlive(lastReportTimeStamp) ? InstanceStatus.ALIVE : InstanceStatus.DEAD; + } + public Map getSupposedlyOngoingIncrementalPushVersions(String storeName, int storeVersion) { AvroSpecificStoreClient storeClient = getVeniceClient(storeName); PushStatusKey pushStatusKey = PushStatusStoreUtils.getOngoingIncrementalPushStatusesKey(storeVersion); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriter.java index 1e6c759743..9a8a2ce6b7 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriter.java @@ -67,6 +67,15 @@ public void writeHeartbeat(String storeName) { writeHeartbeat(storeName, System.currentTimeMillis()); } + /** + * This function will write `-1` to indicate the node is bootstrapping and Controller + * should ignore all the reports from this instance. + * @param storeName + */ + public void writeHeartbeatForBootstrappingInstance(String storeName) { + writeHeartbeat(storeName, -1); + } + public void writeHeartbeat(String storeName, long heartbeat) { VeniceWriter writer = veniceWriterCache.prepareVeniceWriter(storeName); PushStatusKey pushStatusKey = PushStatusStoreUtils.getHeartbeatKey(instanceName); diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/pushstatushelper/PushStatusStoreReaderTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/pushstatushelper/PushStatusStoreReaderTest.java index ebd133c10f..763fdc2b09 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/pushstatushelper/PushStatusStoreReaderTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/pushstatushelper/PushStatusStoreReaderTest.java @@ -6,6 +6,7 @@ import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anySet; +import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; @@ -13,6 +14,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEqualsDeep; import static org.testng.Assert.assertNotEquals; @@ -365,4 +367,22 @@ public void testNullResponseWhenVersionLevelKeyIsNotWritten() // Test that push status store reader will also return null instead of empty map in this case Assert.assertNull(storeReaderSpy.getVersionStatus(storeName, storeVersion)); } + + @Test + public void testGetInstanceStatus() { + PushStatusStoreReader mockReader = mock(PushStatusStoreReader.class); + doCallRealMethod().when(mockReader).getInstanceStatus(any(), any()); + + doReturn(-1l).when(mockReader).getHeartbeat("store_1", "instance_1"); + assertEquals( + mockReader.getInstanceStatus("store_1", "instance_1"), + PushStatusStoreReader.InstanceStatus.BOOTSTRAPPING); + + doReturn(1000l).when(mockReader).getHeartbeat("store_1", "instance_1"); + doReturn(true).when(mockReader).isInstanceAlive(anyLong()); + assertEquals(mockReader.getInstanceStatus("store_1", "instance_1"), PushStatusStoreReader.InstanceStatus.ALIVE); + + doReturn(false).when(mockReader).isInstanceAlive(anyLong()); + assertEquals(mockReader.getInstanceStatus("store_1", "instance_1"), PushStatusStoreReader.InstanceStatus.DEAD); + } } diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriterTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriterTest.java index 1a569166d5..d6c6bdf33e 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriterTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriterTest.java @@ -144,4 +144,13 @@ public void testWriteHeartbeat() { verify(veniceWriterMock) .update(eq(statusKey), eq(getHeartbeatRecord(heartbeat)), eq(valueSchemaId), eq(derivedSchemaId), eq(null)); } + + @Test + public void testWriteHeartbeatForBootstrappingInstance() { + PushStatusKey statusKey = PushStatusStoreUtils.getHeartbeatKey(instanceName); + pushStatusStoreWriter.writeHeartbeatForBootstrappingInstance(storeName); + verify(veniceWriterCacheMock).prepareVeniceWriter(storeName); + verify(veniceWriterMock) + .update(eq(statusKey), eq(getHeartbeatRecord(-1)), eq(valueSchemaId), eq(derivedSchemaId), eq(null)); + } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java index 40e6a543f3..6e30c0f70f 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java @@ -11,8 +11,10 @@ import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_CHECK_INTERVAL_IN_MS; import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_SCAN_INTERVAL_IN_SECONDS; import static com.linkedin.venice.ConfigKeys.DA_VINCI_CURRENT_VERSION_BOOTSTRAPPING_SPEEDUP_ENABLED; +import static com.linkedin.venice.ConfigKeys.KAFKA_FETCH_QUOTA_RECORDS_PER_SECOND; import static com.linkedin.venice.ConfigKeys.PERSISTENCE_TYPE; import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_STORE_ENABLED; +import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_STORE_HEARTBEAT_INTERVAL_IN_SECONDS; import static com.linkedin.venice.ConfigKeys.SERVER_CONSUMER_POOL_SIZE_PER_KAFKA_CLUSTER; import static com.linkedin.venice.ConfigKeys.SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_DATABASE_SYNC_BYTES_INTERNAL_FOR_DEFERRED_WRITE_MODE; @@ -70,6 +72,7 @@ import com.linkedin.venice.integration.utils.VeniceClusterWrapper; import com.linkedin.venice.integration.utils.VeniceRouterWrapper; import com.linkedin.venice.meta.IngestionMetadataUpdateType; +import com.linkedin.venice.meta.IngestionMode; import com.linkedin.venice.meta.Version; import com.linkedin.venice.offsets.OffsetRecord; import com.linkedin.venice.partitioner.ConstantVenicePartitioner; @@ -148,6 +151,8 @@ public void setUp() { inputDirPath = "file://" + inputDir.getAbsolutePath(); Properties clusterConfig = new Properties(); clusterConfig.put(SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS, 1L); + clusterConfig.put(PUSH_STATUS_STORE_ENABLED, true); + clusterConfig.put(DAVINCI_PUSH_STATUS_SCAN_INTERVAL_IN_SECONDS, 3); cluster = ServiceFactory.getVeniceCluster(1, 2, 1, 2, 100, false, false, clusterConfig); d2Client = new D2ClientBuilder().setZkHosts(cluster.getZk().getAddress()) .setZkSessionTimeout(3, TimeUnit.SECONDS) @@ -775,7 +780,7 @@ public void testHybridStore() throws Exception { // Isolated clients should not be able to unsubscribe partitions of other clients. client3.unsubscribeAll(); - client3.subscribe(Collections.singleton(partition)).get(0, TimeUnit.SECONDS); + client3.subscribe(Collections.singleton(partition)).get(10, TimeUnit.SECONDS); for (int i = 0; i < KEY_COUNT; i++) { final int key = i; // Both client2 & client4 are not subscribed to any partition. But client2 is not-isolated so it can @@ -788,6 +793,58 @@ public void testHybridStore() throws Exception { } } + @Test(timeOut = TEST_TIMEOUT, dataProvider = "Isolated-Ingestion", dataProviderClass = DataProviderUtils.class) + public void testStatusReportDuringBoostrap(IngestionMode ingestionMode) throws Exception { + int keyCnt = 1000; + String storeName = createStoreWithMetaSystemStoreAndPushStatusSystemStore(keyCnt); + String baseDataPath = Utils.getTempDataDirectory().getAbsolutePath(); + Map extraBackendProp = new HashMap<>(); + extraBackendProp.put(DATA_BASE_PATH, baseDataPath); + extraBackendProp.put(PUSH_STATUS_STORE_HEARTBEAT_INTERVAL_IN_SECONDS, "5"); + extraBackendProp.put(KAFKA_FETCH_QUOTA_RECORDS_PER_SECOND, "5"); + extraBackendProp.put(PUSH_STATUS_STORE_ENABLED, "true"); + DaVinciTestContext daVinciTestContext = + ServiceFactory.getGenericAvroDaVinciFactoryAndClientWithRetries( + d2Client, + new MetricsRepository(), + Optional.empty(), + cluster.getZk().getAddress(), + storeName, + new DaVinciConfig().setIsolated(ingestionMode.equals(IngestionMode.ISOLATED)), + extraBackendProp); + try (DaVinciClient client = daVinciTestContext.getDaVinciClient()) { + CompletableFuture subscribeFuture = client.subscribeAll(); + + /** + * Create a new version while bootstrapping. + */ + VersionCreationResponse newVersion = cluster.getNewVersion(storeName); + String topic = newVersion.getKafkaTopic(); + VeniceWriterFactory vwFactory = IntegrationTestPushUtils + .getVeniceWriterFactory(cluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory); + VeniceKafkaSerializer keySerializer = new VeniceAvroKafkaSerializer(DEFAULT_KEY_SCHEMA); + VeniceKafkaSerializer valueSerializer = new VeniceAvroKafkaSerializer(DEFAULT_VALUE_SCHEMA); + int valueSchemaId = HelixReadOnlySchemaRepository.VALUE_SCHEMA_STARTING_ID; + + try (VeniceWriter batchProducer = vwFactory.createVeniceWriter( + new VeniceWriterOptions.Builder(topic).setKeySerializer(keySerializer) + .setValueSerializer(valueSerializer) + .build())) { + batchProducer.broadcastStartOfPush(Collections.emptyMap()); + int keyCntForSecondVersion = 100; + Future[] writerFutures = new Future[keyCntForSecondVersion]; + for (int i = 0; i < keyCntForSecondVersion; i++) { + writerFutures[i] = batchProducer.put(i, i, valueSchemaId); + } + for (int i = 0; i < keyCntForSecondVersion; i++) { + writerFutures[i].get(); + } + batchProducer.broadcastEndOfPush(Collections.emptyMap()); + } + subscribeFuture.get(); + } + } + @Test(timeOut = TEST_TIMEOUT, dataProvider = "dv-client-config-provider", dataProviderClass = DataProviderUtils.class) public void testBootstrap(DaVinciConfig daVinciConfig) throws Exception { String storeName = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushMonitorUtils.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushMonitorUtils.java index d66b4a69cd..0eb52dd8f2 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushMonitorUtils.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushMonitorUtils.java @@ -89,6 +89,15 @@ public static ExecutionStatusWithDetails getDaVinciPushStatusAndDetails( Set incompleteInstanceList = new HashSet<>(); ExecutionStatus errorStatus = ExecutionStatus.ERROR; for (Map.Entry entry: instances.entrySet()) { + PushStatusStoreReader.InstanceStatus instanceStatus = + reader.getInstanceStatus(storeName, entry.getKey().toString()); + if (instanceStatus.equals(PushStatusStoreReader.InstanceStatus.BOOTSTRAPPING)) { + LOGGER.info( + "Skipping ingestion status report from bootstrapping instance: {} for topic: {}", + entry.getKey().toString(), + topicName); + continue; + } ExecutionStatus status = ExecutionStatus.valueOf(entry.getValue()); // We will skip completed instances, as they have stopped emitting heartbeats and will not be counted as live // instances. @@ -96,8 +105,7 @@ public static ExecutionStatusWithDetails getDaVinciPushStatusAndDetails( completedInstanceCount++; continue; } - boolean isInstanceAlive = reader.isInstanceAlive(storeName, entry.getKey().toString()); - if (!isInstanceAlive) { + if (instanceStatus.equals(PushStatusStoreReader.InstanceStatus.DEAD)) { offlineInstanceCount++; // Keep at most 5 offline instances for logging purpose. if (offlineInstanceList.size() < 5) { @@ -234,13 +242,27 @@ public static ExecutionStatusWithDetails getDaVinciPartitionLevelPushStatusAndDe /** * This cache is used to reduce the duplicate calls for liveness check as one host can host multiple partitions. */ - Map instanceLivenessCache = new HashMap<>(); + Map instanceLivenessCache = new HashMap<>(); for (int partitionId = 0; partitionId < partitionCount; partitionId++) { Map instances = reader.getPartitionStatus(storeName, version, partitionId, incrementalPushVersion); boolean allInstancesCompleted = true; totalReplicaCount += instances.size(); for (Map.Entry entry: instances.entrySet()) { + String instanceName = entry.getKey().toString(); + PushStatusStoreReader.InstanceStatus instanceStatus = instanceLivenessCache + .computeIfAbsent(instanceName, ignored -> reader.getInstanceStatus(storeName, instanceName)); + if (instanceStatus.equals(PushStatusStoreReader.InstanceStatus.BOOTSTRAPPING)) { + // Don't count bootstrapping instance status report. + totalReplicaCount--; + LOGGER.info( + "Skipping ingestion status report from bootstrapping node: {} for topic: {}, partition: {}", + entry.getKey().toString(), + topicName, + partitionId); + continue; + } + ExecutionStatus status = ExecutionStatus.valueOf(entry.getValue()); // We will skip completed replicas, as they have stopped emitting heartbeats and will not be counted as live // replicas. @@ -248,10 +270,7 @@ public static ExecutionStatusWithDetails getDaVinciPartitionLevelPushStatusAndDe completedReplicaCount++; continue; } - String instanceName = entry.getKey().toString(); - boolean isInstanceAlive = instanceLivenessCache - .computeIfAbsent(instanceName, ignored -> reader.isInstanceAlive(storeName, instanceName)); - if (!isInstanceAlive) { + if (instanceStatus.equals(PushStatusStoreReader.InstanceStatus.DEAD)) { // Keep at most 5 offline instances for logging purpose. if (offlineInstanceList.size() < 5) { offlineInstanceList.add(entry.getKey().toString()); @@ -357,4 +376,9 @@ public static ExecutionStatusWithDetails getDaVinciPartitionLevelPushStatusAndDe static void setDaVinciErrorInstanceWaitTime(int time) { daVinciErrorInstanceWaitTime = time; } + + // For testing purpose + static void setDVCDeadInstanceTime(String topicName, long timestamp) { + storeVersionToDVCDeadInstanceTimeMap.put(topicName, timestamp); + } } diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PushMonitorUtilsTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PushMonitorUtilsTest.java index 6d3df2d559..334dc51cf6 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PushMonitorUtilsTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PushMonitorUtilsTest.java @@ -21,22 +21,32 @@ public class PushMonitorUtilsTest { @Test public void testCompleteStatusCanBeReportedWithOfflineInstancesBelowFailFastThreshold() { PushMonitorUtils.setDaVinciErrorInstanceWaitTime(0); + PushMonitorUtils.setDVCDeadInstanceTime("store_v1", System.currentTimeMillis()); PushStatusStoreReader reader = mock(PushStatusStoreReader.class); /** * Instance a is offline and its push status is not completed. * Instance b,c,d are online and their push status is completed. * In this case, the overall DaVinci push status can be COMPLETED as long as 1 is below the fail fast threshold. */ - doReturn(false).when(reader).isInstanceAlive(eq("store"), eq("a")); - doReturn(true).when(reader).isInstanceAlive(eq("store"), eq("b")); - doReturn(true).when(reader).isInstanceAlive(eq("store"), eq("c")); - doReturn(true).when(reader).isInstanceAlive(eq("store"), eq("d")); + doReturn(PushStatusStoreReader.InstanceStatus.DEAD).when(reader).getInstanceStatus(eq("store"), eq("a")); + doReturn(PushStatusStoreReader.InstanceStatus.ALIVE).when(reader).getInstanceStatus(eq("store"), eq("b")); + doReturn(PushStatusStoreReader.InstanceStatus.ALIVE).when(reader).getInstanceStatus(eq("store"), eq("c")); + doReturn(PushStatusStoreReader.InstanceStatus.ALIVE).when(reader).getInstanceStatus(eq("store"), eq("d")); + // Bootstrapping nodes should be ignored + doReturn(PushStatusStoreReader.InstanceStatus.BOOTSTRAPPING).when(reader).getInstanceStatus(eq("store"), eq("e")); + doReturn(PushStatusStoreReader.InstanceStatus.BOOTSTRAPPING).when(reader).getInstanceStatus(eq("store"), eq("f")); + doReturn(PushStatusStoreReader.InstanceStatus.BOOTSTRAPPING).when(reader).getInstanceStatus(eq("store"), eq("g")); + doReturn(PushStatusStoreReader.InstanceStatus.BOOTSTRAPPING).when(reader).getInstanceStatus(eq("store"), eq("h")); Map map = new HashMap<>(); map.put("a", 2); map.put("b", 10); map.put("c", 10); map.put("d", 10); + map.put("e", 2); + map.put("f", 2); + map.put("g", 2); + map.put("h", 2); // Test partition level key first doReturn(null).when(reader).getVersionStatus("store", 1); @@ -56,10 +66,10 @@ public void testCompleteStatusCanBeReportedWithOfflineInstancesBelowFailFastThre public void testDaVinciPushStatusScan(boolean useDaVinciSpecificExecutionStatusForError) { PushMonitorUtils.setDaVinciErrorInstanceWaitTime(0); PushStatusStoreReader reader = mock(PushStatusStoreReader.class); - doReturn(true).when(reader).isInstanceAlive(eq("store"), eq("a")); - doReturn(false).when(reader).isInstanceAlive(eq("store"), eq("b")); - doReturn(false).when(reader).isInstanceAlive(eq("store"), eq("c")); - doReturn(false).when(reader).isInstanceAlive(eq("store"), eq("d")); + doReturn(PushStatusStoreReader.InstanceStatus.ALIVE).when(reader).getInstanceStatus(eq("store"), eq("a")); + doReturn(PushStatusStoreReader.InstanceStatus.DEAD).when(reader).getInstanceStatus(eq("store"), eq("b")); + doReturn(PushStatusStoreReader.InstanceStatus.DEAD).when(reader).getInstanceStatus(eq("store"), eq("c")); + doReturn(PushStatusStoreReader.InstanceStatus.DEAD).when(reader).getInstanceStatus(eq("store"), eq("d")); Map map = new HashMap<>(); map.put("a", 3); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PushStatusCollectorTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PushStatusCollectorTest.java index 1f3edb5be3..16bcb20a0e 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PushStatusCollectorTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PushStatusCollectorTest.java @@ -109,7 +109,8 @@ public void testPushStatusCollector() { .thenReturn(startedInstancePushStatus, dvcTooManyDeadInstancesErrorInstancePushStatus); when(pushStatusStoreReader.getPartitionStatus(daVinciStoreName, 11, 0, Optional.empty())) .thenReturn(startedInstancePushStatus, dvcOtherErrorInstancePushStatus); - when(pushStatusStoreReader.isInstanceAlive(daVinciStoreName, "instance")).thenReturn(true); + when(pushStatusStoreReader.getInstanceStatus(daVinciStoreName, "instance")) + .thenReturn(PushStatusStoreReader.InstanceStatus.ALIVE); pushStatusCollector.subscribeTopic(daVinciStoreTopicV1, 1); Assert.assertFalse(pushStatusCollector.getTopicToPushStatusMap().containsKey(daVinciStoreTopicV1)); @@ -241,7 +242,8 @@ public void testPushStatusCollectorDaVinciStatusPollingRetry() { .thenReturn(Collections.emptyMap(), startedInstancePushStatus, dvcTooManyDeadInstancesErrorInstancePushStatus); when(pushStatusStoreReader.getPartitionStatus(daVinciStoreName, 6, 0, Optional.empty())) .thenReturn(Collections.emptyMap(), startedInstancePushStatus, dvcOtherErrorInstancePushStatus); - when(pushStatusStoreReader.isInstanceAlive(daVinciStoreName, "instance")).thenReturn(true); + when(pushStatusStoreReader.getInstanceStatus(daVinciStoreName, "instance")) + .thenReturn(PushStatusStoreReader.InstanceStatus.ALIVE); pushStatusCollector.subscribeTopic(daVinciStoreTopicV1, 1); Assert.assertFalse(pushStatusCollector.getTopicToPushStatusMap().containsKey(daVinciStoreTopicV1)); @@ -330,7 +332,8 @@ public void testPushStatusCollectorDaVinciStatusPollingRetryWhenEmptyResultUntil when(pushStatusStoreReader.getPartitionStatus(daVinciStoreName, 2, 0, Optional.empty())) .thenReturn(Collections.emptyMap()); - when(pushStatusStoreReader.isInstanceAlive(daVinciStoreName, "instance")).thenReturn(true); + when(pushStatusStoreReader.getInstanceStatus(daVinciStoreName, "instance")) + .thenReturn(PushStatusStoreReader.InstanceStatus.ALIVE); pushStatusCollector.subscribeTopic(daVinciStoreTopicV1, 1); Assert.assertFalse(pushStatusCollector.getTopicToPushStatusMap().containsKey(daVinciStoreTopicV1));