Skip to content

Commit

Permalink
Revert "Create store directory paths in CSM constructor for disk spac…
Browse files Browse the repository at this point in the history
…e monitor (apache#1697) (apache#43)" (apache#44)

This reverts commit 01534ff.

The above commit introduces a type change that is causing all builds to
fail on samza-li. e.g. see samza-li e8b00bcba88cf9d235f8432309ef60c6ae1c7d74

Revert that commit in the short-term until we can address the type
mismatch.
  • Loading branch information
nickgarvey authored Apr 8, 2024
1 parent 01534ff commit 5aaa0fa
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,19 +197,15 @@ public ContainerStorageManager(
this.context = new ContextImpl(jobContext, containerContext, Optional.empty(), Optional.empty(), Optional.empty(),
externalContextOptional);

this.storeDirectoryPaths = new HashSet<>();

// Setting the init thread pool size equal to the number of taskInstances
this.parallelInitThreadPoolSize = containerModel.getTasks().size();

// Note: The store directory paths are used by SamzaContainer to add a metric to watch the disk space usage
// of the store directories. The stores itself does not need to be created but the store directory paths need to be
// set to be able to monitor them, once they're created and in use.
this.storeDirectoryPaths = ContainerStorageManagerUtil.getStoreDirPaths(config, storageEngineFactories,
activeTaskChangelogSystemStreams, sideInputStoreNames, containerModel, storageManagerUtil,
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory);

this.inMemoryStores = ContainerStorageManagerUtil.createInMemoryStores(
activeTaskChangelogSystemStreams, storageEngineFactories, sideInputStoreNames, containerModel, jobContext,
containerContext, taskInstanceMetrics, taskInstanceCollectors, serdes, storageManagerUtil,
activeTaskChangelogSystemStreams, storageEngineFactories, sideInputStoreNames,
storeDirectoryPaths, containerModel, jobContext, containerContext,
samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors, serdes, storageManagerUtil,
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config);

this.daVinciStores = ContainerStorageManagerUtil.createDaVinciStores(
Expand Down Expand Up @@ -263,7 +259,8 @@ public Map<TaskName, Checkpoint> start() throws SamzaException, InterruptedExcep
this.sideInputsManager = new SideInputsManager(
sideInputSystemStreams, systemFactories,
changelogSystemStreams, activeTaskChangelogSystemStreams,
storageEngineFactories, containerModel, jobContext, containerContext,
storageEngineFactories, storeDirectoryPaths,
containerModel, jobContext, containerContext,
samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors,
streamMetadataCache, systemAdmins, serdeManager, serdes, storageManagerUtil,
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock);
Expand Down Expand Up @@ -375,7 +372,8 @@ private Map<TaskName, Checkpoint> restoreStores() throws InterruptedException {
.filter(s -> !inMemoryStoreNames.contains(s)).collect(Collectors.toSet());
this.taskStores = ContainerStorageManagerUtil.createTaskStores(
storesToCreate, this.storageEngineFactories, this.sideInputStoreNames,
this.activeTaskChangelogSystemStreams, this.containerModel, this.jobContext, this.containerContext,
this.activeTaskChangelogSystemStreams, this.storeDirectoryPaths,
this.containerModel, this.jobContext, this.containerContext,
this.serdes, this.samzaContainerMetrics, this.taskInstanceMetrics, this.taskInstanceCollectors, this.storageManagerUtil,
this.loggedStoreBaseDirectory, this.nonLoggedStoreBaseDirectory, this.config);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public static Map<TaskName, Map<String, StorageEngine>> createTaskStores(Set<Str
Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
Set<String> sideInputStoreNames,
Map<String, SystemStream> activeTaskChangelogSystemStreams,
Set<Path> storeDirectoryPaths,
ContainerModel containerModel, JobContext jobContext, ContainerContext containerContext,
Map<String, Serde<Object>> serdes,
SamzaContainerMetrics samzaContainerMetrics,
Expand All @@ -81,6 +82,7 @@ public static Map<TaskName, Map<String, StorageEngine>> createTaskStores(Set<Str
File loggedStoreBaseDirectory, File nonLoggedStoreBaseDirectory,
Config config) {
Map<TaskName, Map<String, StorageEngine>> taskStores = new HashMap<>();
StorageConfig storageConfig = new StorageConfig(config);

// iterate over each task and each storeName
for (Map.Entry<TaskName, TaskModel> task : containerModel.getTasks().entrySet()) {
Expand All @@ -91,8 +93,16 @@ public static Map<TaskName, Map<String, StorageEngine>> createTaskStores(Set<Str
}

for (String storeName : storesToCreate) {
File storeDirectory = getStoreDirPath(storeName, config, activeTaskChangelogSystemStreams,
sideInputStoreNames, taskName, taskModel, storageManagerUtil, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory);
List<String> storeBackupManagers = storageConfig.getStoreBackupFactories(storeName);
// A store is considered durable if it is backed by a changelog or another backupManager factory
boolean isDurable = activeTaskChangelogSystemStreams.containsKey(storeName) || !storeBackupManagers.isEmpty();
boolean isSideInput = sideInputStoreNames.contains(storeName);
// Use the logged-store-base-directory for change logged stores and sideInput stores, and non-logged-store-base-dir
// for non logged stores
File storeBaseDir = isDurable || isSideInput ? loggedStoreBaseDirectory : nonLoggedStoreBaseDirectory;
File storeDirectory = storageManagerUtil.getTaskStoreDir(storeBaseDir, storeName, taskName,
taskModel.getTaskMode());
storeDirectoryPaths.add(storeDirectory.toPath());

// if taskInstanceMetrics are specified use those for store metrics,
// otherwise (in case of StorageRecovery) use a blank MetricsRegistryMap
Expand Down Expand Up @@ -160,6 +170,7 @@ public static Map<TaskName, Map<String, StorageEngine>> createInMemoryStores(
Map<String, SystemStream> activeTaskChangelogSystemStreams,
Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
Set<String> sideInputStoreNames,
Set<Path> storeDirectoryPaths,
ContainerModel containerModel, JobContext jobContext, ContainerContext containerContext,
SamzaContainerMetrics samzaContainerMetrics,
Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
Expand All @@ -171,7 +182,8 @@ public static Map<TaskName, Map<String, StorageEngine>> createInMemoryStores(
Set<String> inMemoryStoreNames = getInMemoryStoreNames(storageEngineFactories, config);
return ContainerStorageManagerUtil.createTaskStores(
inMemoryStoreNames, storageEngineFactories, sideInputStoreNames,
activeTaskChangelogSystemStreams, containerModel, jobContext, containerContext, serdes,
activeTaskChangelogSystemStreams, storeDirectoryPaths,
containerModel, jobContext, containerContext, serdes,
samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors, storageManagerUtil,
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config);
}
Expand Down Expand Up @@ -436,42 +448,4 @@ public static Set<String> getSideInputStoreNames(
}
return sideInputStores;
}

public static Set<Path> getStoreDirPaths(Config config, Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
Map<String, SystemStream> activeTaskChangelogSystemStreams, Set<String> sideInputStoreNames,
ContainerModel containerModel, StorageManagerUtil storageManagerUtil, File loggedStoreBaseDirectory,
File nonLoggedStoreBaseDirectory) {
Set<Path> storeDirectoryPaths = new HashSet<>();
StorageConfig storageConfig = new StorageConfig(config);
Set<String> storeNames = new HashSet<>();
// Add all side input and regular stores
storeNames.addAll(storageConfig.getStoreNames());
// Add all in-memory store names
storeNames.addAll(getInMemoryStoreNames(storageEngineFactories, config));

for (String storeName : storeNames) {
for (Map.Entry<TaskName, TaskModel> task : containerModel.getTasks().entrySet()) {
File storeDirPath =
getStoreDirPath(storeName, config, activeTaskChangelogSystemStreams, sideInputStoreNames, task.getKey(),
task.getValue(), storageManagerUtil, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory);
storeDirectoryPaths.add(storeDirPath.toPath());
}
}
return storeDirectoryPaths;
}
public static File getStoreDirPath(String storeName, Config config, Map<String, SystemStream> activeTaskChangelogSystemStreams,
Set<String> sideInputStoreNames, TaskName taskName, TaskModel taskModel, StorageManagerUtil storageManagerUtil,
File loggedStoreBaseDirectory, File nonLoggedStoreBaseDirectory) {
StorageConfig storageConfig = new StorageConfig(config);
List<String> storeBackupManagers = storageConfig.getStoreBackupFactories(storeName);
// A store is considered durable if it is backed by a changelog or another backupManager factory
boolean isDurable = activeTaskChangelogSystemStreams.containsKey(storeName) || !storeBackupManagers.isEmpty();
boolean isSideInput = sideInputStoreNames.contains(storeName);
// Use the logged-store-base-directory for change logged stores and sideInput stores, and non-logged-store-base-dir
// for non logged stores
File storeBaseDir = isDurable || isSideInput ? loggedStoreBaseDirectory : nonLoggedStoreBaseDirectory;
File storeDirectory = storageManagerUtil.getTaskStoreDir(storeBaseDir, storeName, taskName,
taskModel.getTaskMode());
return storeDirectory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import scala.collection.JavaConversions;

import java.io.File;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -118,6 +119,7 @@ public SideInputsManager(Map<String, Set<SystemStream>> sideInputSystemStreams,
Map<String, SystemStream> changelogSystemStreams,
Map<String, SystemStream> activeTaskChangelogSystemStreams,
Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
Set<Path> storeDirectoryPaths,
ContainerModel containerModel, JobContext jobContext, ContainerContext containerContext,
SamzaContainerMetrics samzaContainerMetrics,
Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
Expand Down Expand Up @@ -145,7 +147,8 @@ public SideInputsManager(Map<String, Set<SystemStream>> sideInputSystemStreams,
// create side input taskStores for all tasks in the containerModel and each store in storageEngineFactories
this.sideInputStores = ContainerStorageManagerUtil.createTaskStores(
sideInputStoreNames, storageEngineFactories, sideInputStoreNames,
activeTaskChangelogSystemStreams, containerModel, jobContext, containerContext, serdes,
activeTaskChangelogSystemStreams, storeDirectoryPaths,
containerModel, jobContext, containerContext, serdes,
samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors, storageManagerUtil,
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.common.collect.ImmutableSet;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -1082,39 +1081,6 @@ public void testRestoreRecoversFromDeletedException() throws Exception {
inOrder.verify(blobStoreManager).close(); // close called on blobStoreManager passed to taskRestoreManager
}

@Test
public void testStoreDirectoriesInitialized() {
String sideInputStore = "sideInputStore";
String inMemoryStore = "inMemoryStore";
String regularStore = "regularStore";
Map<String, String> storeFactories = new HashMap<>();
storeFactories.put(String.format("stores.%s.side.inputs.processor.factory", sideInputStore), "sideinputfactory");
storeFactories.put(String.format("stores.%s.factory", regularStore), "regularstorefactory");
storeFactories.put(String.format("stores.%s.factory", inMemoryStore),
"org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory");
Map<String, String> configMap = new HashMap<>(storeFactories);
Config config = new MapConfig(configMap);
Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories = new HashMap<>();
storageEngineFactories.put(sideInputStore, (StorageEngineFactory<Object, Object>) mock(StorageEngineFactory.class));
storageEngineFactories.put(inMemoryStore, (StorageEngineFactory<Object, Object>) mock(StorageEngineFactory.class));
storageEngineFactories.put(regularStore, (StorageEngineFactory<Object, Object>) mock(StorageEngineFactory.class));

Map<String, SystemStream> activeTaskChangelogSystemStreams = new HashMap<>();
activeTaskChangelogSystemStreams.put(regularStore, new SystemStream("kafka", "changelog"));
Set<String> sideInputStoreNames = new HashSet<>();
sideInputStoreNames.add(sideInputStore);
ContainerModel containerModel = mock(ContainerModel.class);
when(containerModel.getTasks())
.thenReturn(ImmutableMap.of(new TaskName("task"),
new TaskModel(new TaskName("task"), Collections.emptySet(), new Partition(1))));

Set<Path> storeDirPaths = ContainerStorageManagerUtil.getStoreDirPaths(config, storageEngineFactories,
activeTaskChangelogSystemStreams, sideInputStoreNames, containerModel, new StorageManagerUtil(),
new File("/tmp"), new File("/tmp2"));

assertEquals(3, storeDirPaths.size());
}

@Test
public void getActiveTaskChangelogSystemStreams() {
Map<String, SystemStream> storeToChangelogSystemStreams =
Expand Down

0 comments on commit 5aaa0fa

Please sign in to comment.