From 01534ffe93caaf805e356002990cbbc66058bc5a Mon Sep 17 00:00:00 2001 From: Shekhar Sharma Date: Fri, 5 Apr 2024 13:59:59 -0700 Subject: [PATCH] Create store directory paths in CSM constructor for disk space monitor (#1697) (#43) * Create store directory paths in CSM constructor to be able to monitor the disk usage of the store directories * Fix stylecheck issues * Refactor - init all store paths together and do not mutate the storeDirPaths. Added test * Remove ununsed method * Remove ununsed method * Stylecheck, Remove ununsed import Co-authored-by: Shekhar Sharma <72765053+shekhars-li@users.noreply.github.com> --- .../storage/ContainerStorageManager.java | 20 ++++--- .../storage/ContainerStorageManagerUtil.java | 56 ++++++++++++++----- .../samza/storage/SideInputsManager.java | 5 +- .../storage/TestContainerStorageManager.java | 34 +++++++++++ 4 files changed, 87 insertions(+), 28 deletions(-) diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java index 278f428b14..4c0caed04b 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java +++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java @@ -197,15 +197,19 @@ public ContainerStorageManager( this.context = new ContextImpl(jobContext, containerContext, Optional.empty(), Optional.empty(), Optional.empty(), externalContextOptional); - this.storeDirectoryPaths = new HashSet<>(); - // Setting the init thread pool size equal to the number of taskInstances this.parallelInitThreadPoolSize = containerModel.getTasks().size(); + // Note: The store directory paths are used by SamzaContainer to add a metric to watch the disk space usage + // of the store directories. The stores itself does not need to be created but the store directory paths need to be + // set to be able to monitor them, once they're created and in use. + this.storeDirectoryPaths = ContainerStorageManagerUtil.getStoreDirPaths(config, storageEngineFactories, + activeTaskChangelogSystemStreams, sideInputStoreNames, containerModel, storageManagerUtil, + loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory); + this.inMemoryStores = ContainerStorageManagerUtil.createInMemoryStores( - activeTaskChangelogSystemStreams, storageEngineFactories, sideInputStoreNames, - storeDirectoryPaths, containerModel, jobContext, containerContext, - samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors, serdes, storageManagerUtil, + activeTaskChangelogSystemStreams, storageEngineFactories, sideInputStoreNames, containerModel, jobContext, + containerContext, taskInstanceMetrics, taskInstanceCollectors, serdes, storageManagerUtil, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config); this.daVinciStores = ContainerStorageManagerUtil.createDaVinciStores( @@ -259,8 +263,7 @@ public Map start() throws SamzaException, InterruptedExcep this.sideInputsManager = new SideInputsManager( sideInputSystemStreams, systemFactories, changelogSystemStreams, activeTaskChangelogSystemStreams, - storageEngineFactories, storeDirectoryPaths, - containerModel, jobContext, containerContext, + storageEngineFactories, containerModel, jobContext, containerContext, samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors, streamMetadataCache, systemAdmins, serdeManager, serdes, storageManagerUtil, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock); @@ -372,8 +375,7 @@ private Map restoreStores() throws InterruptedException { .filter(s -> !inMemoryStoreNames.contains(s)).collect(Collectors.toSet()); this.taskStores = ContainerStorageManagerUtil.createTaskStores( storesToCreate, this.storageEngineFactories, this.sideInputStoreNames, - this.activeTaskChangelogSystemStreams, this.storeDirectoryPaths, - this.containerModel, this.jobContext, this.containerContext, + this.activeTaskChangelogSystemStreams, this.containerModel, this.jobContext, this.containerContext, this.serdes, this.samzaContainerMetrics, this.taskInstanceMetrics, this.taskInstanceCollectors, this.storageManagerUtil, this.loggedStoreBaseDirectory, this.nonLoggedStoreBaseDirectory, this.config); diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java index ced54ea11e..1e0faa3ad6 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java +++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java @@ -72,7 +72,6 @@ public static Map> createTaskStores(Set> storageEngineFactories, Set sideInputStoreNames, Map activeTaskChangelogSystemStreams, - Set storeDirectoryPaths, ContainerModel containerModel, JobContext jobContext, ContainerContext containerContext, Map> serdes, SamzaContainerMetrics samzaContainerMetrics, @@ -82,7 +81,6 @@ public static Map> createTaskStores(Set> taskStores = new HashMap<>(); - StorageConfig storageConfig = new StorageConfig(config); // iterate over each task and each storeName for (Map.Entry task : containerModel.getTasks().entrySet()) { @@ -93,16 +91,8 @@ public static Map> createTaskStores(Set storeBackupManagers = storageConfig.getStoreBackupFactories(storeName); - // A store is considered durable if it is backed by a changelog or another backupManager factory - boolean isDurable = activeTaskChangelogSystemStreams.containsKey(storeName) || !storeBackupManagers.isEmpty(); - boolean isSideInput = sideInputStoreNames.contains(storeName); - // Use the logged-store-base-directory for change logged stores and sideInput stores, and non-logged-store-base-dir - // for non logged stores - File storeBaseDir = isDurable || isSideInput ? loggedStoreBaseDirectory : nonLoggedStoreBaseDirectory; - File storeDirectory = storageManagerUtil.getTaskStoreDir(storeBaseDir, storeName, taskName, - taskModel.getTaskMode()); - storeDirectoryPaths.add(storeDirectory.toPath()); + File storeDirectory = getStoreDirPath(storeName, config, activeTaskChangelogSystemStreams, + sideInputStoreNames, taskName, taskModel, storageManagerUtil, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory); // if taskInstanceMetrics are specified use those for store metrics, // otherwise (in case of StorageRecovery) use a blank MetricsRegistryMap @@ -170,7 +160,6 @@ public static Map> createInMemoryStores( Map activeTaskChangelogSystemStreams, Map> storageEngineFactories, Set sideInputStoreNames, - Set storeDirectoryPaths, ContainerModel containerModel, JobContext jobContext, ContainerContext containerContext, SamzaContainerMetrics samzaContainerMetrics, Map taskInstanceMetrics, @@ -182,8 +171,7 @@ public static Map> createInMemoryStores( Set inMemoryStoreNames = getInMemoryStoreNames(storageEngineFactories, config); return ContainerStorageManagerUtil.createTaskStores( inMemoryStoreNames, storageEngineFactories, sideInputStoreNames, - activeTaskChangelogSystemStreams, storeDirectoryPaths, - containerModel, jobContext, containerContext, serdes, + activeTaskChangelogSystemStreams, containerModel, jobContext, containerContext, serdes, samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors, storageManagerUtil, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config); } @@ -448,4 +436,42 @@ public static Set getSideInputStoreNames( } return sideInputStores; } + + public static Set getStoreDirPaths(Config config, Map> storageEngineFactories, + Map activeTaskChangelogSystemStreams, Set sideInputStoreNames, + ContainerModel containerModel, StorageManagerUtil storageManagerUtil, File loggedStoreBaseDirectory, + File nonLoggedStoreBaseDirectory) { + Set storeDirectoryPaths = new HashSet<>(); + StorageConfig storageConfig = new StorageConfig(config); + Set storeNames = new HashSet<>(); + // Add all side input and regular stores + storeNames.addAll(storageConfig.getStoreNames()); + // Add all in-memory store names + storeNames.addAll(getInMemoryStoreNames(storageEngineFactories, config)); + + for (String storeName : storeNames) { + for (Map.Entry task : containerModel.getTasks().entrySet()) { + File storeDirPath = + getStoreDirPath(storeName, config, activeTaskChangelogSystemStreams, sideInputStoreNames, task.getKey(), + task.getValue(), storageManagerUtil, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory); + storeDirectoryPaths.add(storeDirPath.toPath()); + } + } + return storeDirectoryPaths; + } + public static File getStoreDirPath(String storeName, Config config, Map activeTaskChangelogSystemStreams, + Set sideInputStoreNames, TaskName taskName, TaskModel taskModel, StorageManagerUtil storageManagerUtil, + File loggedStoreBaseDirectory, File nonLoggedStoreBaseDirectory) { + StorageConfig storageConfig = new StorageConfig(config); + List storeBackupManagers = storageConfig.getStoreBackupFactories(storeName); + // A store is considered durable if it is backed by a changelog or another backupManager factory + boolean isDurable = activeTaskChangelogSystemStreams.containsKey(storeName) || !storeBackupManagers.isEmpty(); + boolean isSideInput = sideInputStoreNames.contains(storeName); + // Use the logged-store-base-directory for change logged stores and sideInput stores, and non-logged-store-base-dir + // for non logged stores + File storeBaseDir = isDurable || isSideInput ? loggedStoreBaseDirectory : nonLoggedStoreBaseDirectory; + File storeDirectory = storageManagerUtil.getTaskStoreDir(storeBaseDir, storeName, taskName, + taskModel.getTaskMode()); + return storeDirectory; + } } diff --git a/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java b/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java index 1fc8fd6cd8..ad1db93762 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java +++ b/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java @@ -26,7 +26,6 @@ import scala.collection.JavaConversions; import java.io.File; -import java.nio.file.Path; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -119,7 +118,6 @@ public SideInputsManager(Map> sideInputSystemStreams, Map changelogSystemStreams, Map activeTaskChangelogSystemStreams, Map> storageEngineFactories, - Set storeDirectoryPaths, ContainerModel containerModel, JobContext jobContext, ContainerContext containerContext, SamzaContainerMetrics samzaContainerMetrics, Map taskInstanceMetrics, @@ -147,8 +145,7 @@ public SideInputsManager(Map> sideInputSystemStreams, // create side input taskStores for all tasks in the containerModel and each store in storageEngineFactories this.sideInputStores = ContainerStorageManagerUtil.createTaskStores( sideInputStoreNames, storageEngineFactories, sideInputStoreNames, - activeTaskChangelogSystemStreams, storeDirectoryPaths, - containerModel, jobContext, containerContext, serdes, + activeTaskChangelogSystemStreams, containerModel, jobContext, containerContext, serdes, samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors, storageManagerUtil, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config); diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java index 50ec34d0a3..517c9397d2 100644 --- a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java +++ b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableSet; import java.io.ByteArrayOutputStream; import java.io.File; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -1081,6 +1082,39 @@ public void testRestoreRecoversFromDeletedException() throws Exception { inOrder.verify(blobStoreManager).close(); // close called on blobStoreManager passed to taskRestoreManager } + @Test + public void testStoreDirectoriesInitialized() { + String sideInputStore = "sideInputStore"; + String inMemoryStore = "inMemoryStore"; + String regularStore = "regularStore"; + Map storeFactories = new HashMap<>(); + storeFactories.put(String.format("stores.%s.side.inputs.processor.factory", sideInputStore), "sideinputfactory"); + storeFactories.put(String.format("stores.%s.factory", regularStore), "regularstorefactory"); + storeFactories.put(String.format("stores.%s.factory", inMemoryStore), + "org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory"); + Map configMap = new HashMap<>(storeFactories); + Config config = new MapConfig(configMap); + Map> storageEngineFactories = new HashMap<>(); + storageEngineFactories.put(sideInputStore, (StorageEngineFactory) mock(StorageEngineFactory.class)); + storageEngineFactories.put(inMemoryStore, (StorageEngineFactory) mock(StorageEngineFactory.class)); + storageEngineFactories.put(regularStore, (StorageEngineFactory) mock(StorageEngineFactory.class)); + + Map activeTaskChangelogSystemStreams = new HashMap<>(); + activeTaskChangelogSystemStreams.put(regularStore, new SystemStream("kafka", "changelog")); + Set sideInputStoreNames = new HashSet<>(); + sideInputStoreNames.add(sideInputStore); + ContainerModel containerModel = mock(ContainerModel.class); + when(containerModel.getTasks()) + .thenReturn(ImmutableMap.of(new TaskName("task"), + new TaskModel(new TaskName("task"), Collections.emptySet(), new Partition(1)))); + + Set storeDirPaths = ContainerStorageManagerUtil.getStoreDirPaths(config, storageEngineFactories, + activeTaskChangelogSystemStreams, sideInputStoreNames, containerModel, new StorageManagerUtil(), + new File("/tmp"), new File("/tmp2")); + + assertEquals(3, storeDirPaths.size()); + } + @Test public void getActiveTaskChangelogSystemStreams() { Map storeToChangelogSystemStreams =