diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index 29922abdc8..c6b37d6347 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -198,7 +198,8 @@ public DaVinciBackend( storeRepository, whetherToRestoreDataPartitions, true, - functionToCheckWhetherStorageEngineShouldBeKeptOrNot(managedClients)); + functionToCheckWhetherStorageEngineShouldBeKeptOrNot(managedClients), + se -> null); storageService.start(); PubSubClientsFactory pubSubClientsFactory = configLoader.getVeniceServerConfig().getPubSubClientsFactory(); VeniceWriterFactory writerFactory = diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/InternalLocalBootstrappingVeniceChangelogConsumer.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/InternalLocalBootstrappingVeniceChangelogConsumer.java index 1770c2de9c..34b4e31e47 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/InternalLocalBootstrappingVeniceChangelogConsumer.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/InternalLocalBootstrappingVeniceChangelogConsumer.java @@ -130,7 +130,8 @@ public InternalLocalBootstrappingVeniceChangelogConsumer( storeRepository, true, true, - functionToCheckWhetherStorageEngineShouldBeKeptOrNot()); + functionToCheckWhetherStorageEngineShouldBeKeptOrNot(), + functionToCheckWhetherStoragePartitionShouldBeKeptOrNot()); storageMetadataService = new StorageEngineMetadataService(storageService.getStorageEngineRepository(), partitionStateSerializer); } @@ -176,6 +177,10 @@ private Function functionToCheckWhetherStorageEngineShouldBeKep }; } + private Function functionToCheckWhetherStoragePartitionShouldBeKeptOrNot() { + return storageEngine -> null; + } + @Override protected boolean handleVersionSwapControlMessage( ControlMessage controlMessage, diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/StorageService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/StorageService.java index f6a92a5add..adc2e9ca8b 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/StorageService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/StorageService.java @@ -77,6 +77,7 @@ public class StorageService extends AbstractVeniceService { * @param restoreDataPartitions indicates if store data needs to be restored. * @param restoreMetadataPartitions indicates if meta data needs to be restored. * @param checkWhetherStorageEngineShouldBeKeptOrNot check whether the local storage engine should be kept or not. + * @param checkWhetherStoragePartitionShouldBeKeptOrNot check whether the partition is assigned and thus should be kept or not. */ StorageService( VeniceConfigLoader configLoader, @@ -88,6 +89,7 @@ public class StorageService extends AbstractVeniceService { boolean restoreDataPartitions, boolean restoreMetadataPartitions, Function checkWhetherStorageEngineShouldBeKeptOrNot, + Function checkWhetherStoragePartitionShouldBeKeptOrNot, Optional> persistenceTypeToStorageEngineFactoryMapOptional) { String dataPath = configLoader.getVeniceServerConfig().getDataBasePath(); if (!Utils.directoryExists(dataPath)) { @@ -122,7 +124,8 @@ public class StorageService extends AbstractVeniceService { configLoader, restoreDataPartitions, restoreMetadataPartitions, - checkWhetherStorageEngineShouldBeKeptOrNot); + checkWhetherStorageEngineShouldBeKeptOrNot, + checkWhetherStoragePartitionShouldBeKeptOrNot); } } @@ -135,7 +138,8 @@ public StorageService( ReadOnlyStoreRepository storeRepository, boolean restoreDataPartitions, boolean restoreMetadataPartitions, - Function checkWhetherStorageEngineShouldBeKeptOrNot) { + Function checkWhetherStorageEngineShouldBeKeptOrNot, + Function checkWhetherStoragePartitionShouldBeKeptOrNot) { this( configLoader, storageEngineStats, @@ -146,6 +150,7 @@ public StorageService( restoreDataPartitions, restoreMetadataPartitions, checkWhetherStorageEngineShouldBeKeptOrNot, + checkWhetherStoragePartitionShouldBeKeptOrNot, Optional.empty()); } @@ -167,7 +172,8 @@ public StorageService( storeRepository, restoreDataPartitions, restoreMetadataPartitions, - s -> true); + s -> true, + se -> null); } /** @@ -233,7 +239,8 @@ private void restoreAllStores( VeniceConfigLoader configLoader, boolean restoreDataPartitions, boolean restoreMetadataPartitions, - Function checkWhetherStorageEngineShouldBeKeptOrNot) { + Function checkWhetherStorageEngineShouldBeKeptOrNot, + Function checkWhetherStoragePartitionsShouldBeKeptOrNot) { LOGGER.info("Start restoring all the stores persisted previously"); for (Map.Entry entry: persistenceTypeToStorageEngineFactoryMap.entrySet()) { PersistenceType pType = entry.getKey(); @@ -254,6 +261,7 @@ private void restoreAllStores( if (checkWhetherStorageEngineShouldBeKeptOrNot.apply(storeName)) { try { storageEngine = openStore(storeConfig, () -> null); + checkWhetherStoragePartitionsShouldBeKeptOrNot.apply(storageEngine); } catch (Exception e) { if (ExceptionUtils.recursiveClassEquals(e, RocksDBException.class)) { LOGGER.warn("Encountered RocksDB error while opening store: {}", storeName, e); @@ -467,7 +475,7 @@ public synchronized void closeStorageEngine(String kafkaTopic) { public void cleanupAllStores(VeniceConfigLoader configLoader) { // Load local storage and delete them safely. // TODO Just clean the data dir in case loading and deleting is too slow. - restoreAllStores(configLoader, true, true, s -> true); + restoreAllStores(configLoader, true, true, s -> true, se -> null); LOGGER.info("Start cleaning up all the stores persisted previously"); storageEngineRepository.getAllLocalStorageEngines().stream().forEach(storageEngine -> { String storeName = storageEngine.getStoreVersionName(); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/storage/StorageServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/storage/StorageServiceTest.java index a86a4879be..d8dddce5fe 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/storage/StorageServiceTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/storage/StorageServiceTest.java @@ -115,6 +115,7 @@ public void testGetStoreAndUserPartitionsMapping() { true, true, (s) -> true, + (se) -> null, Optional.of(persistenceTypeToStorageEngineFactoryMap)); Map> expectedMapping = new HashMap<>(); diff --git a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java index 1c93211c40..220c872397 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java @@ -331,7 +331,8 @@ private List createServices() { metadataRepo, whetherToRestoreDataPartitions, true, - functionToCheckWhetherStorageEngineShouldBeKeptOrNot()); + functionToCheckWhetherStorageEngineShouldBeKeptOrNot(), + functionToCheckWhetherStoragePartitionShouldBeKeptOrNot()); storageEngineMetadataService = new StorageEngineMetadataService(storageService.getStorageEngineRepository(), partitionStateSerializer); services.add(storageEngineMetadataService); @@ -711,19 +712,15 @@ protected final boolean isIsolatedIngestion() { } private Function functionToCheckWhetherStorageEngineShouldBeKeptOrNot() { - return storageEngineName -> { - String storeName = Version.parseStoreFromKafkaTopicName(storageEngineName); - - StorageEngineRepository storageEngineRepository = new StorageEngineRepository(); - - AbstractStorageEngine storageEngine = storageEngineRepository.getLocalStorageEngine(storageEngineName); - - if (storageEngine == null) { - return true; - } + return storageEngineName -> true; + } + private Function functionToCheckWhetherStoragePartitionShouldBeKeptOrNot() { + return storageEngine -> { + String storageEngineName = storageEngine.toString(); + String storeName = Version.parseStoreFromKafkaTopicName(storageEngineName); PropertyKey.Builder propertyKeyBuilder = - new PropertyKey.Builder(this.veniceConfigLoader.getVeniceClusterConfig().getClusterName()); + new PropertyKey.Builder(veniceConfigLoader.getVeniceClusterConfig().getClusterName()); IdealState idealState = getHelixParticipationService().getHelixManager() .getHelixDataAccessor() .getProperty(propertyKeyBuilder.idealStates(storeName)); @@ -740,8 +737,7 @@ private Function functionToCheckWhetherStorageEngineShouldBeKep } storageEngine.dropPartition(storageEnginePartitionId); } - - return true; + return null; }; }