Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server][common] Allow store config repo to fetch configs only when needed #1204

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,7 @@ private void initServerStoreAndSchemaRepository() {
// Load existing store config and setup watches
storeRepo.refresh();

storeConfigRepo = new HelixReadOnlyStoreConfigRepository(
zkClient,
adapter,
clusterConfig.getRefreshAttemptsForZkReconnect(),
clusterConfig.getRefreshIntervalForZkReconnectInMs());
storeConfigRepo = new HelixReadOnlyStoreConfigRepository(zkClient, adapter);
storeConfigRepo.refresh();
adamxchen marked this conversation as resolved.
Show resolved Hide resolved

readOnlyZKSharedSchemaRepository = new HelixReadOnlyZKSharedSchemaRepository(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.venice.helix;

import com.google.common.annotations.VisibleForTesting;
import com.linkedin.venice.VeniceResource;
import com.linkedin.venice.common.VeniceSystemStoreType;
import com.linkedin.venice.exceptions.VeniceException;
Expand Down Expand Up @@ -33,73 +34,58 @@
public class HelixReadOnlyStoreConfigRepository implements ReadOnlyStoreConfigRepository, VeniceResource {
private static final Logger LOGGER = LogManager.getLogger(HelixReadOnlyStoreConfigRepository.class);

private final AtomicReference<Map<String, StoreConfig>> storeConfigMap;
private final AtomicReference<Map<String, StoreConfig>> loadedStoreConfigMap;
private final AtomicReference<Set<String>> availableStoreSet;
private final ZkStoreConfigAccessor accessor;
private final StoreConfigChangedListener storeConfigChangedListener;
private final StoreConfigAddedOrDeletedChangedListener storeConfigAddedOrDeletedListener;
private final ZkClient zkClient;
private final CachedResourceZkStateListener zkStateListener;
private final int refreshAttemptsForZkReconnect;
private final long refreshIntervalForZkReconnectInMs;

public HelixReadOnlyStoreConfigRepository(
ZkClient zkClient,
HelixAdapterSerializer adapterSerializer,
int refreshAttemptsForZkReconnect,
long refreshIntervalForZkReconnectInMs) {
this(
zkClient,
new ZkStoreConfigAccessor(zkClient, adapterSerializer, Optional.empty()),
refreshAttemptsForZkReconnect,
refreshIntervalForZkReconnectInMs);

public HelixReadOnlyStoreConfigRepository(ZkClient zkClient, HelixAdapterSerializer adapterSerializer) {
this(zkClient, new ZkStoreConfigAccessor(zkClient, adapterSerializer, Optional.empty()));
}

public HelixReadOnlyStoreConfigRepository(
ZkClient zkClient,
ZkStoreConfigAccessor accessor,
int refreshAttemptsForZkReconnect,
long refreshIntervalForZkReconnectInMs) {
public HelixReadOnlyStoreConfigRepository(ZkClient zkClient, ZkStoreConfigAccessor accessor) {
this.zkClient = zkClient;
this.accessor = accessor;
this.storeConfigMap = new AtomicReference<>(new HashMap<>());
this.loadedStoreConfigMap = new AtomicReference<>(new HashMap<>());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use VeniceConcurrentHashMap here so that we don't need to recreate a map whenever there is a chance in StoreConfigChangedListener?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use ^ as getStoreConfig can update loadedStoreConfigMap concurrently when store configs are missing from the map.

this.availableStoreSet = new AtomicReference<>(new HashSet<>());
storeConfigChangedListener = new StoreConfigChangedListener();
storeConfigAddedOrDeletedListener = new StoreConfigAddedOrDeletedChangedListener();
this.refreshAttemptsForZkReconnect = refreshAttemptsForZkReconnect;
this.refreshIntervalForZkReconnectInMs = refreshIntervalForZkReconnectInMs;
// This repository already retry on getChildren, so do not need extra retry in listener.
zkStateListener = new CachedResourceZkStateListener(this);
}

/**
* Obtain all available stores and load them into cache, but it doesn't fetch the store configs and attach ZK watch yet
*/
@Override
public void refresh() {
LOGGER.info("Loading all store configs from zk.");
LOGGER.info("Loading all store names from zk.");
accessor.subscribeStoreConfigAddedOrDeletedListener(storeConfigAddedOrDeletedListener);
List<StoreConfig> configList =
accessor.getAllStoreConfigs(refreshAttemptsForZkReconnect, refreshIntervalForZkReconnectInMs);
LOGGER.info("Found {} store configs.", configList.size());
Map<String, StoreConfig> configMap = new HashMap<>();
for (StoreConfig config: configList) {
configMap.put(config.getStoreName(), config);
accessor.subscribeStoreConfigDataChangedListener(config.getStoreName(), storeConfigChangedListener);
}
storeConfigMap.set(configMap);
availableStoreSet.set(new HashSet<>(accessor.getAllStores()));
LOGGER.info("Found {} stores.", availableStoreSet.get().size());
zkClient.subscribeStateChanges(zkStateListener);
LOGGER.info("All store configs are loaded.");
LOGGER.info("All store names are loaded.");
}

@Override
public void clear() {
LOGGER.info("Clearing all store configs in local");
accessor.unsubscribeStoreConfigAddedOrDeletedListener(storeConfigAddedOrDeletedListener);
for (String storeName: storeConfigMap.get().keySet()) {
for (String storeName: loadedStoreConfigMap.get().keySet()) {
accessor.unsubscribeStoreConfigDataChangedListener(storeName, storeConfigChangedListener);
}
this.storeConfigMap.set(Collections.emptyMap());
this.loadedStoreConfigMap.set(Collections.emptyMap());
this.availableStoreSet.set(Collections.emptySet());
zkClient.unsubscribeStateChanges(zkStateListener);
LOGGER.info("Cleared all store configs in local");
}

/**
* Get the store config by store name. It would fetch the store config from ZK if it's not in cache yet and attach ZK
* watch.
* The corresponding Venice store config is returned for metadata system store's store config. This is the most
* natural way to handle cluster discovery for metadata system stores and store migration.
*/
Expand All @@ -114,12 +100,20 @@ public Optional<StoreConfig> getStoreConfig(String storeName) {
if (systemStoreType != null && systemStoreType.equals(VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE)) {
veniceStoreName = VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.extractRegularStoreName(storeName);
}
StoreConfig config = storeConfigMap.get().get(veniceStoreName);
if (config != null) {
return Optional.of(config.cloneStoreConfig());
} else {
return Optional.empty();

if (availableStoreSet.get().contains(storeName)) {
StoreConfig config = loadedStoreConfigMap.get().get(storeName);
if (config == null) {
// lazy fetch from ZK and attach watch
config = accessor.getStoreConfig(veniceStoreName);
if (config != null) {
loadedStoreConfigMap.get().put(config.getStoreName(), config);
accessor.subscribeStoreConfigDataChangedListener(veniceStoreName, storeConfigChangedListener);
}
adamxchen marked this conversation as resolved.
Show resolved Hide resolved
}
return Optional.ofNullable(config != null ? config.cloneStoreConfig() : null);
}
return Optional.empty();
}

@Override
Expand All @@ -133,60 +127,72 @@ public StoreConfig getStoreConfigOrThrow(String storeName) {

@Override
public List<StoreConfig> getAllStoreConfigs() {
return new ArrayList<>(storeConfigMap.get().values());
return new ArrayList<>(loadedStoreConfigMap.get().values());
adamxchen marked this conversation as resolved.
Show resolved Hide resolved
}

protected StoreConfigAddedOrDeletedChangedListener getStoreConfigAddedOrDeletedListener() {
@VisibleForTesting
StoreConfigAddedOrDeletedChangedListener getStoreConfigAddedOrDeletedListener() {
return storeConfigAddedOrDeletedListener;
}

protected StoreConfigChangedListener getStoreConfigChangedListener() {
@VisibleForTesting
StoreConfigChangedListener getStoreConfigChangedListener() {
return storeConfigChangedListener;
}

@VisibleForTesting
Set<String> getAvailableStoreSet() {
return availableStoreSet.get();
}

@VisibleForTesting
Map<String, StoreConfig> getLoadedStoreConfigMap() {
return loadedStoreConfigMap.get();
}

protected class StoreConfigAddedOrDeletedChangedListener implements IZkChildListener {
@Override
public void handleChildChange(String parentPath, List<String> currentChildren) throws Exception {
synchronized (storeConfigMap) {
Map<String, StoreConfig> map = new HashMap<>(storeConfigMap.get());
public void handleChildChange(String parentPath, List<String> currentChildren) {
synchronized (availableStoreSet) {
Set<String> set = new HashSet<>(availableStoreSet.get());
adamxchen marked this conversation as resolved.
Show resolved Hide resolved
List<String> newStores =
currentChildren.stream().filter(newStore -> !map.containsKey(newStore)).collect(Collectors.toList());
currentChildren.stream().filter(newStore -> !set.contains(newStore)).collect(Collectors.toList());

Set<String> deletedStores = new HashSet<>(map.keySet());
currentChildren.forEach(deletedStores::remove);
// obtain the stores that are removed
currentChildren.forEach(set::remove);
LOGGER.info(
"Store configs list is changed. {} new configs. And will delete {} configs.",
newStores.size(),
deletedStores.size());
// New added store configs
List<StoreConfig> newConfigs = accessor.getStoreConfigs(newStores);
for (StoreConfig config: newConfigs) {
map.put(config.getStoreName(), config);
accessor.subscribeStoreConfigDataChangedListener(config.getStoreName(), storeConfigChangedListener);
}

// Deleted store configs
for (String deletedStore: deletedStores) {
map.remove(deletedStore);
accessor.unsubscribeStoreConfigDataChangedListener(deletedStore, storeConfigChangedListener);
set.size());

// update the available store set
availableStoreSet.set(new HashSet<>(currentChildren));

synchronized (loadedStoreConfigMap) {
Map<String, StoreConfig> map = new HashMap<>(loadedStoreConfigMap.get());
// Deleted store configs
for (String deletedStore: set) {
map.remove(deletedStore);
accessor.unsubscribeStoreConfigDataChangedListener(deletedStore, storeConfigChangedListener);
}
loadedStoreConfigMap.set(map);
}
storeConfigMap.set(map);
}
}
}

protected class StoreConfigChangedListener implements IZkDataListener {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
public void handleDataChange(String dataPath, Object data) {
if (!(data instanceof StoreConfig)) {
throw new VeniceException(
"Invalid data from zk notification. Required: StoreConfig, but get: " + data.getClass().getName());
}
StoreConfig config = (StoreConfig) data;
synchronized (storeConfigMap) {
Map<String, StoreConfig> map = new HashMap<>(storeConfigMap.get());
synchronized (loadedStoreConfigMap) {
Map<String, StoreConfig> map = new HashMap<>(loadedStoreConfigMap.get());
map.put(config.getStoreName(), config);
storeConfigMap.set(map);
loadedStoreConfigMap.set(map);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public ZkStoreConfigAccessor(
}

public List<String> getAllStores() {
return dataAccessor.getChildNames(ROOT_PATH, AccessOption.PERSISTENT);
return HelixUtils.listPathContents(dataAccessor, ROOT_PATH);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It provides a bit better error handling, though essentially they are same.

  public static <T> List<String> listPathContents(ZkBaseDataAccessor<T> dataAccessor, String path) {
    try {
      List<String> paths = dataAccessor.getChildNames(path, AccessOption.PERSISTENT);
      return paths == null ? Collections.emptyList() : paths;
    } catch (Exception e) {
      LOGGER.error("Error when listing contents in path: {}", path, e);
      throw e;
    }
  }

}

public List<StoreConfig> getAllStoreConfigs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.systemstore.schemas.StoreClusterConfig;
import com.linkedin.venice.utils.AvroRecordUtils;
import java.util.Objects;


/**
Expand Down Expand Up @@ -88,4 +89,32 @@ public StoreConfig cloneStoreConfig() {
clonedStoreConfig.setMigrationDestCluster(getMigrationDestCluster());
return clonedStoreConfig;
}

@Override
public int hashCode() {
return Objects.hash(
storeClusterConfig.storeName,
storeClusterConfig.cluster,
storeClusterConfig.deleting,
storeClusterConfig.migrationSrcCluster,
storeClusterConfig.migrationDestCluster);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

StoreConfig that = (StoreConfig) o;

return storeClusterConfig.deleting == that.storeClusterConfig.deleting
&& storeClusterConfig.storeName.equals(that.storeClusterConfig.storeName)
&& storeClusterConfig.cluster.equals(that.storeClusterConfig.cluster)
&& (Objects.equals(storeClusterConfig.migrationSrcCluster, that.storeClusterConfig.migrationSrcCluster))
&& (Objects.equals(storeClusterConfig.migrationDestCluster, that.storeClusterConfig.migrationDestCluster));
}
}
Loading
Loading