Skip to content

Commit

Permalink
Updated StorageService constructor and initializer with functionToChe…
Browse files Browse the repository at this point in the history
…ckWhetherStoragePartitionShouldBeKeptOrNot
  • Loading branch information
kristyelee committed Sep 27, 2024
1 parent e09f7bf commit 6c50513
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ public DaVinciBackend(
storeRepository,
whetherToRestoreDataPartitions,
true,
functionToCheckWhetherStorageEngineShouldBeKeptOrNot(managedClients));
functionToCheckWhetherStorageEngineShouldBeKeptOrNot(managedClients),
se -> null);
storageService.start();
PubSubClientsFactory pubSubClientsFactory = configLoader.getVeniceServerConfig().getPubSubClientsFactory();
VeniceWriterFactory writerFactory =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ public InternalLocalBootstrappingVeniceChangelogConsumer(
storeRepository,
true,
true,
functionToCheckWhetherStorageEngineShouldBeKeptOrNot());
functionToCheckWhetherStorageEngineShouldBeKeptOrNot(),
functionToCheckWhetherStoragePartitionShouldBeKeptOrNot());
storageMetadataService =
new StorageEngineMetadataService(storageService.getStorageEngineRepository(), partitionStateSerializer);
}
Expand Down Expand Up @@ -176,6 +177,10 @@ private Function<String, Boolean> functionToCheckWhetherStorageEngineShouldBeKep
};
}

private Function<AbstractStorageEngine, Void> functionToCheckWhetherStoragePartitionShouldBeKeptOrNot() {
return storageEngine -> null;
}

@Override
protected boolean handleVersionSwapControlMessage(
ControlMessage controlMessage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class StorageService extends AbstractVeniceService {
* @param restoreDataPartitions indicates if store data needs to be restored.
* @param restoreMetadataPartitions indicates if meta data needs to be restored.
* @param checkWhetherStorageEngineShouldBeKeptOrNot check whether the local storage engine should be kept or not.
* @param checkWhetherStoragePartitionShouldBeKeptOrNot check whether the partition is assigned and thus should be kept or not.
*/
StorageService(
VeniceConfigLoader configLoader,
Expand All @@ -88,6 +89,7 @@ public class StorageService extends AbstractVeniceService {
boolean restoreDataPartitions,
boolean restoreMetadataPartitions,
Function<String, Boolean> checkWhetherStorageEngineShouldBeKeptOrNot,
Function<AbstractStorageEngine, Void> checkWhetherStoragePartitionShouldBeKeptOrNot,
Optional<Map<PersistenceType, StorageEngineFactory>> persistenceTypeToStorageEngineFactoryMapOptional) {
String dataPath = configLoader.getVeniceServerConfig().getDataBasePath();
if (!Utils.directoryExists(dataPath)) {
Expand Down Expand Up @@ -122,7 +124,8 @@ public class StorageService extends AbstractVeniceService {
configLoader,
restoreDataPartitions,
restoreMetadataPartitions,
checkWhetherStorageEngineShouldBeKeptOrNot);
checkWhetherStorageEngineShouldBeKeptOrNot,
checkWhetherStoragePartitionShouldBeKeptOrNot);
}
}

Expand All @@ -135,7 +138,8 @@ public StorageService(
ReadOnlyStoreRepository storeRepository,
boolean restoreDataPartitions,
boolean restoreMetadataPartitions,
Function<String, Boolean> checkWhetherStorageEngineShouldBeKeptOrNot) {
Function<String, Boolean> checkWhetherStorageEngineShouldBeKeptOrNot,
Function<AbstractStorageEngine, Void> checkWhetherStoragePartitionShouldBeKeptOrNot) {
this(
configLoader,
storageEngineStats,
Expand All @@ -146,6 +150,7 @@ public StorageService(
restoreDataPartitions,
restoreMetadataPartitions,
checkWhetherStorageEngineShouldBeKeptOrNot,
checkWhetherStoragePartitionShouldBeKeptOrNot,
Optional.empty());
}

Expand All @@ -167,7 +172,8 @@ public StorageService(
storeRepository,
restoreDataPartitions,
restoreMetadataPartitions,
s -> true);
s -> true,
se -> null);
}

/**
Expand Down Expand Up @@ -233,7 +239,8 @@ private void restoreAllStores(
VeniceConfigLoader configLoader,
boolean restoreDataPartitions,
boolean restoreMetadataPartitions,
Function<String, Boolean> checkWhetherStorageEngineShouldBeKeptOrNot) {
Function<String, Boolean> checkWhetherStorageEngineShouldBeKeptOrNot,
Function<AbstractStorageEngine, Void> checkWhetherStoragePartitionsShouldBeKeptOrNot) {
LOGGER.info("Start restoring all the stores persisted previously");
for (Map.Entry<PersistenceType, StorageEngineFactory> entry: persistenceTypeToStorageEngineFactoryMap.entrySet()) {
PersistenceType pType = entry.getKey();
Expand All @@ -254,6 +261,7 @@ private void restoreAllStores(
if (checkWhetherStorageEngineShouldBeKeptOrNot.apply(storeName)) {
try {
storageEngine = openStore(storeConfig, () -> null);
checkWhetherStoragePartitionsShouldBeKeptOrNot.apply(storageEngine);
} catch (Exception e) {
if (ExceptionUtils.recursiveClassEquals(e, RocksDBException.class)) {
LOGGER.warn("Encountered RocksDB error while opening store: {}", storeName, e);
Expand Down Expand Up @@ -467,7 +475,7 @@ public synchronized void closeStorageEngine(String kafkaTopic) {
public void cleanupAllStores(VeniceConfigLoader configLoader) {
// Load local storage and delete them safely.
// TODO Just clean the data dir in case loading and deleting is too slow.
restoreAllStores(configLoader, true, true, s -> true);
restoreAllStores(configLoader, true, true, s -> true, se -> null);
LOGGER.info("Start cleaning up all the stores persisted previously");
storageEngineRepository.getAllLocalStorageEngines().stream().forEach(storageEngine -> {
String storeName = storageEngine.getStoreVersionName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public void testGetStoreAndUserPartitionsMapping() {
true,
true,
(s) -> true,
(se) -> null,
Optional.of(persistenceTypeToStorageEngineFactoryMap));

Map<String, Set<Integer>> expectedMapping = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,8 @@ private List<AbstractVeniceService> createServices() {
metadataRepo,
whetherToRestoreDataPartitions,
true,
functionToCheckWhetherStorageEngineShouldBeKeptOrNot());
functionToCheckWhetherStorageEngineShouldBeKeptOrNot(),
functionToCheckWhetherStoragePartitionShouldBeKeptOrNot());
storageEngineMetadataService =
new StorageEngineMetadataService(storageService.getStorageEngineRepository(), partitionStateSerializer);
services.add(storageEngineMetadataService);
Expand Down Expand Up @@ -711,19 +712,15 @@ protected final boolean isIsolatedIngestion() {
}

private Function<String, Boolean> functionToCheckWhetherStorageEngineShouldBeKeptOrNot() {
return storageEngineName -> {
String storeName = Version.parseStoreFromKafkaTopicName(storageEngineName);

StorageEngineRepository storageEngineRepository = new StorageEngineRepository();

AbstractStorageEngine storageEngine = storageEngineRepository.getLocalStorageEngine(storageEngineName);

if (storageEngine == null) {
return true;
}
return storageEngineName -> true;
}

private Function<AbstractStorageEngine, Void> functionToCheckWhetherStoragePartitionShouldBeKeptOrNot() {
return storageEngine -> {
String storageEngineName = storageEngine.toString();
String storeName = Version.parseStoreFromKafkaTopicName(storageEngineName);
PropertyKey.Builder propertyKeyBuilder =
new PropertyKey.Builder(this.veniceConfigLoader.getVeniceClusterConfig().getClusterName());
new PropertyKey.Builder(veniceConfigLoader.getVeniceClusterConfig().getClusterName());
IdealState idealState = getHelixParticipationService().getHelixManager()
.getHelixDataAccessor()
.getProperty(propertyKeyBuilder.idealStates(storeName));
Expand All @@ -740,8 +737,7 @@ private Function<String, Boolean> functionToCheckWhetherStorageEngineShouldBeKep
}
storageEngine.dropPartition(storageEnginePartitionId);
}

return true;
return null;
};
}

Expand Down

0 comments on commit 6c50513

Please sign in to comment.