From c8ea647958f08b0691838078ae618e5029db9148 Mon Sep 17 00:00:00 2001 From: "Xin(Adam) Chen" Date: Thu, 26 Sep 2024 19:18:43 -0700 Subject: [PATCH 1/4] [common] Allow store config repo to fetch configs only when needed --- .../VeniceMetadataRepositoryBuilder.java | 1 - .../HelixReadOnlyStoreConfigRepository.java | 16 ++++++++ ...estHelixReadOnlyStoreConfigRepository.java | 38 +++++++++++++++---- 3 files changed, 46 insertions(+), 9 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/VeniceMetadataRepositoryBuilder.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/VeniceMetadataRepositoryBuilder.java index 49a27875d8..96ad76d961 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/VeniceMetadataRepositoryBuilder.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/VeniceMetadataRepositoryBuilder.java @@ -147,7 +147,6 @@ private void initServerStoreAndSchemaRepository() { adapter, clusterConfig.getRefreshAttemptsForZkReconnect(), clusterConfig.getRefreshIntervalForZkReconnectInMs()); - storeConfigRepo.refresh(); readOnlyZKSharedSchemaRepository = new HelixReadOnlyZKSharedSchemaRepository( readOnlyZKSharedSystemStoreRepository, diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadOnlyStoreConfigRepository.java b/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadOnlyStoreConfigRepository.java index 9092fccce3..11a6bc93a0 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadOnlyStoreConfigRepository.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadOnlyStoreConfigRepository.java @@ -1,5 +1,6 @@ package com.linkedin.venice.helix; +import com.google.common.annotations.VisibleForTesting; import com.linkedin.venice.VeniceResource; import com.linkedin.venice.common.VeniceSystemStoreType; import com.linkedin.venice.exceptions.VeniceException; @@ -70,6 +71,11 @@ public HelixReadOnlyStoreConfigRepository( zkStateListener = new CachedResourceZkStateListener(this); } + /** + * Obtain all store configs and attach ZK watches on every store config received. + * Note that calling this method would load all stores under the store config path from ZK and increases the ZK watch + * count significantly. + */ @Override public void refresh() { LOGGER.info("Loading all store configs from zk."); @@ -115,6 +121,15 @@ public Optional getStoreConfig(String storeName) { veniceStoreName = VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.extractRegularStoreName(storeName); } StoreConfig config = storeConfigMap.get().get(veniceStoreName); + // Lazy fetch. It should happen when the entire store config map is not loaded, and only do ad-hoc fetch + if (config == null) { + config = accessor.getStoreConfig(veniceStoreName); + if (config != null) { + storeConfigMap.get().put(config.getStoreName(), config); + accessor.subscribeStoreConfigDataChangedListener(veniceStoreName, storeConfigChangedListener); + } + } + if (config != null) { return Optional.of(config.cloneStoreConfig()); } else { @@ -136,6 +151,7 @@ public List getAllStoreConfigs() { return new ArrayList<>(storeConfigMap.get().values()); } + @VisibleForTesting protected StoreConfigAddedOrDeletedChangedListener getStoreConfigAddedOrDeletedListener() { return storeConfigAddedOrDeletedListener; } diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/helix/TestHelixReadOnlyStoreConfigRepository.java b/internal/venice-common/src/test/java/com/linkedin/venice/helix/TestHelixReadOnlyStoreConfigRepository.java index fbb60217cb..7d90d6389f 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/helix/TestHelixReadOnlyStoreConfigRepository.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/helix/TestHelixReadOnlyStoreConfigRepository.java @@ -1,11 +1,17 @@ package com.linkedin.venice.helix; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + import com.linkedin.venice.meta.StoreConfig; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; import org.apache.helix.zookeeper.impl.client.ZkClient; -import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -17,8 +23,8 @@ public class TestHelixReadOnlyStoreConfigRepository { @BeforeMethod public void setUp() { - mockAccessor = Mockito.mock(ZkStoreConfigAccessor.class); - storeConfigRepository = new HelixReadOnlyStoreConfigRepository(Mockito.mock(ZkClient.class), mockAccessor, 1, 1000); + mockAccessor = mock(ZkStoreConfigAccessor.class); + storeConfigRepository = new HelixReadOnlyStoreConfigRepository(mock(ZkClient.class), mockAccessor, 1, 1000); } @Test @@ -29,7 +35,7 @@ public void testGetStoreConfig() { config.setCluster(clusterName); List list = new ArrayList<>(); list.add(config); - Mockito.doReturn(list).when(mockAccessor).getAllStoreConfigs(1, 1000); + doReturn(list).when(mockAccessor).getAllStoreConfigs(1, 1000); storeConfigRepository.refresh(); Assert.assertEquals( storeConfigRepository.getStoreConfig(storeName).get().getCluster(), @@ -49,7 +55,7 @@ public void testRefreshAndClear() { config.setCluster("testRefreshAndClearCluster" + i); list.add(config); } - Mockito.doReturn(list).when(mockAccessor).getAllStoreConfigs(1, 1000); + doReturn(list).when(mockAccessor).getAllStoreConfigs(1, 1000); storeConfigRepository.refresh(); for (int i = 0; i < storeCount; i++) { @@ -77,7 +83,7 @@ public void testGetStoreConfigChildrenChangedNotification() throws Exception { config.setCluster("testRefreshAndClearCluster" + i); list.add(config); } - Mockito.doReturn(list).when(mockAccessor).getAllStoreConfigs(1, 1000); + doReturn(list).when(mockAccessor).getAllStoreConfigs(1, 1000); storeConfigRepository.refresh(); List storeNames = list.stream().map(config -> config.getStoreName()).collect(Collectors.toList()); @@ -91,7 +97,7 @@ public void testGetStoreConfigChildrenChangedNotification() throws Exception { StoreConfig newStoreConfig = new StoreConfig(newStoreName); newStoreConfig.setCluster("testRefreshAndClearClusterNew"); newStoreConfigList.add(newStoreConfig); - Mockito.doReturn(newStoreConfigList).when(mockAccessor).getStoreConfigs(Mockito.eq(newStoreNames)); + doReturn(newStoreConfigList).when(mockAccessor).getStoreConfigs(eq(newStoreNames)); listener.handleChildChange("", storeNames); @@ -108,7 +114,7 @@ public void testGetUpdateStoreConfigNotification() throws Exception { StoreConfig config = new StoreConfig(storeNAme); config.setCluster("testCluster"); list.add(config); - Mockito.doReturn(list).when(mockAccessor).getAllStoreConfigs(1, 1000); + doReturn(list).when(mockAccessor).getAllStoreConfigs(1, 1000); storeConfigRepository.refresh(); HelixReadOnlyStoreConfigRepository.StoreConfigChangedListener listener = @@ -119,4 +125,20 @@ public void testGetUpdateStoreConfigNotification() throws Exception { Assert.assertEquals(storeConfigRepository.getStoreConfig(storeNAme).get().getCluster(), newConfig.getCluster()); } + + @Test + public void testStoreConfigLazyFetch() { + String storeName = "testLazyFetchStore"; + Optional storeConfigOptional = storeConfigRepository.getStoreConfig(storeName); + // config is empty + Assert.assertFalse(storeConfigOptional.isPresent()); + + // config is fetched + StoreConfig config = mock(StoreConfig.class); + doReturn(config).when(config).cloneStoreConfig(); + doReturn(config).when(mockAccessor).getStoreConfig(storeName); + storeConfigOptional = storeConfigRepository.getStoreConfig(storeName); + Assert.assertEquals(storeConfigOptional.get(), config); + verify(mockAccessor).subscribeStoreConfigDataChangedListener(eq(storeName), any()); + } } From f9655a1dba98c1d802dd9cfb9ef48a7b4858aa39 Mon Sep 17 00:00:00 2001 From: "Xin(Adam) Chen" Date: Fri, 27 Sep 2024 16:05:53 -0700 Subject: [PATCH 2/4] address comment --- .../VeniceMetadataRepositoryBuilder.java | 7 +- .../HelixReadOnlyStoreConfigRepository.java | 144 ++++++++---------- .../com/linkedin/venice/meta/StoreConfig.java | 19 +++ ...estHelixReadOnlyStoreConfigRepository.java | 124 ++++++++------- .../venice/controller/VeniceHelixAdmin.java | 6 +- .../linkedin/venice/router/RouterServer.java | 6 +- 6 files changed, 151 insertions(+), 155 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/VeniceMetadataRepositoryBuilder.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/VeniceMetadataRepositoryBuilder.java index 96ad76d961..c5b5064b61 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/VeniceMetadataRepositoryBuilder.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/repository/VeniceMetadataRepositoryBuilder.java @@ -142,11 +142,8 @@ private void initServerStoreAndSchemaRepository() { // Load existing store config and setup watches storeRepo.refresh(); - storeConfigRepo = new HelixReadOnlyStoreConfigRepository( - zkClient, - adapter, - clusterConfig.getRefreshAttemptsForZkReconnect(), - clusterConfig.getRefreshIntervalForZkReconnectInMs()); + storeConfigRepo = new HelixReadOnlyStoreConfigRepository(zkClient, adapter); + storeConfigRepo.refresh(); readOnlyZKSharedSchemaRepository = new HelixReadOnlyZKSharedSchemaRepository( readOnlyZKSharedSystemStoreRepository, diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadOnlyStoreConfigRepository.java b/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadOnlyStoreConfigRepository.java index 11a6bc93a0..29dc035344 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadOnlyStoreConfigRepository.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadOnlyStoreConfigRepository.java @@ -34,78 +34,58 @@ public class HelixReadOnlyStoreConfigRepository implements ReadOnlyStoreConfigRepository, VeniceResource { private static final Logger LOGGER = LogManager.getLogger(HelixReadOnlyStoreConfigRepository.class); - private final AtomicReference> storeConfigMap; + private final AtomicReference> loadedStoreConfigMap; + private final AtomicReference> availableStoreSet; private final ZkStoreConfigAccessor accessor; private final StoreConfigChangedListener storeConfigChangedListener; private final StoreConfigAddedOrDeletedChangedListener storeConfigAddedOrDeletedListener; private final ZkClient zkClient; private final CachedResourceZkStateListener zkStateListener; - private final int refreshAttemptsForZkReconnect; - private final long refreshIntervalForZkReconnectInMs; - - public HelixReadOnlyStoreConfigRepository( - ZkClient zkClient, - HelixAdapterSerializer adapterSerializer, - int refreshAttemptsForZkReconnect, - long refreshIntervalForZkReconnectInMs) { - this( - zkClient, - new ZkStoreConfigAccessor(zkClient, adapterSerializer, Optional.empty()), - refreshAttemptsForZkReconnect, - refreshIntervalForZkReconnectInMs); + + public HelixReadOnlyStoreConfigRepository(ZkClient zkClient, HelixAdapterSerializer adapterSerializer) { + this(zkClient, new ZkStoreConfigAccessor(zkClient, adapterSerializer, Optional.empty())); } - public HelixReadOnlyStoreConfigRepository( - ZkClient zkClient, - ZkStoreConfigAccessor accessor, - int refreshAttemptsForZkReconnect, - long refreshIntervalForZkReconnectInMs) { + public HelixReadOnlyStoreConfigRepository(ZkClient zkClient, ZkStoreConfigAccessor accessor) { this.zkClient = zkClient; this.accessor = accessor; - this.storeConfigMap = new AtomicReference<>(new HashMap<>()); + this.loadedStoreConfigMap = new AtomicReference<>(new HashMap<>()); + this.availableStoreSet = new AtomicReference<>(new HashSet<>()); storeConfigChangedListener = new StoreConfigChangedListener(); storeConfigAddedOrDeletedListener = new StoreConfigAddedOrDeletedChangedListener(); - this.refreshAttemptsForZkReconnect = refreshAttemptsForZkReconnect; - this.refreshIntervalForZkReconnectInMs = refreshIntervalForZkReconnectInMs; // This repository already retry on getChildren, so do not need extra retry in listener. zkStateListener = new CachedResourceZkStateListener(this); } /** - * Obtain all store configs and attach ZK watches on every store config received. - * Note that calling this method would load all stores under the store config path from ZK and increases the ZK watch - * count significantly. + * Obtain all available stores and load them into cache, but it doesn't fetch the store configs and attach ZK watch yet */ @Override public void refresh() { - LOGGER.info("Loading all store configs from zk."); + LOGGER.info("Loading all store names from zk."); accessor.subscribeStoreConfigAddedOrDeletedListener(storeConfigAddedOrDeletedListener); - List configList = - accessor.getAllStoreConfigs(refreshAttemptsForZkReconnect, refreshIntervalForZkReconnectInMs); - LOGGER.info("Found {} store configs.", configList.size()); - Map configMap = new HashMap<>(); - for (StoreConfig config: configList) { - configMap.put(config.getStoreName(), config); - accessor.subscribeStoreConfigDataChangedListener(config.getStoreName(), storeConfigChangedListener); - } - storeConfigMap.set(configMap); + availableStoreSet.set(new HashSet<>(accessor.getAllStores())); + LOGGER.info("Found {} stores.", availableStoreSet.get().size()); zkClient.subscribeStateChanges(zkStateListener); - LOGGER.info("All store configs are loaded."); + LOGGER.info("All store names are loaded."); } @Override public void clear() { LOGGER.info("Clearing all store configs in local"); accessor.unsubscribeStoreConfigAddedOrDeletedListener(storeConfigAddedOrDeletedListener); - for (String storeName: storeConfigMap.get().keySet()) { + for (String storeName: loadedStoreConfigMap.get().keySet()) { accessor.unsubscribeStoreConfigDataChangedListener(storeName, storeConfigChangedListener); } - this.storeConfigMap.set(Collections.emptyMap()); + this.loadedStoreConfigMap.set(Collections.emptyMap()); + this.availableStoreSet.set(Collections.emptySet()); zkClient.unsubscribeStateChanges(zkStateListener); LOGGER.info("Cleared all store configs in local"); } /** + * Get the store config by store name. It would fetch the store config from ZK if it's not in cache yet and attach ZK + * watch. * The corresponding Venice store config is returned for metadata system store's store config. This is the most * natural way to handle cluster discovery for metadata system stores and store migration. */ @@ -120,21 +100,20 @@ public Optional getStoreConfig(String storeName) { if (systemStoreType != null && systemStoreType.equals(VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE)) { veniceStoreName = VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.extractRegularStoreName(storeName); } - StoreConfig config = storeConfigMap.get().get(veniceStoreName); - // Lazy fetch. It should happen when the entire store config map is not loaded, and only do ad-hoc fetch - if (config == null) { - config = accessor.getStoreConfig(veniceStoreName); - if (config != null) { - storeConfigMap.get().put(config.getStoreName(), config); - accessor.subscribeStoreConfigDataChangedListener(veniceStoreName, storeConfigChangedListener); - } - } - if (config != null) { - return Optional.of(config.cloneStoreConfig()); - } else { - return Optional.empty(); + if (availableStoreSet.get().contains(storeName)) { + StoreConfig config = loadedStoreConfigMap.get().get(storeName); + if (config == null) { + // lazy fetch from ZK and attach watch + config = accessor.getStoreConfig(veniceStoreName); + if (config != null) { + loadedStoreConfigMap.get().put(config.getStoreName(), config); + accessor.subscribeStoreConfigDataChangedListener(veniceStoreName, storeConfigChangedListener); + } + } + return Optional.ofNullable(config != null ? config.cloneStoreConfig() : null); } + return Optional.empty(); } @Override @@ -148,61 +127,72 @@ public StoreConfig getStoreConfigOrThrow(String storeName) { @Override public List getAllStoreConfigs() { - return new ArrayList<>(storeConfigMap.get().values()); + return new ArrayList<>(loadedStoreConfigMap.get().values()); } @VisibleForTesting - protected StoreConfigAddedOrDeletedChangedListener getStoreConfigAddedOrDeletedListener() { + StoreConfigAddedOrDeletedChangedListener getStoreConfigAddedOrDeletedListener() { return storeConfigAddedOrDeletedListener; } - protected StoreConfigChangedListener getStoreConfigChangedListener() { + @VisibleForTesting + StoreConfigChangedListener getStoreConfigChangedListener() { return storeConfigChangedListener; } + @VisibleForTesting + Set getAvailableStoreSet() { + return availableStoreSet.get(); + } + + @VisibleForTesting + Map getLoadedStoreConfigMap() { + return loadedStoreConfigMap.get(); + } + protected class StoreConfigAddedOrDeletedChangedListener implements IZkChildListener { @Override - public void handleChildChange(String parentPath, List currentChildren) throws Exception { - synchronized (storeConfigMap) { - Map map = new HashMap<>(storeConfigMap.get()); + public void handleChildChange(String parentPath, List currentChildren) { + synchronized (availableStoreSet) { + Set set = new HashSet<>(availableStoreSet.get()); List newStores = - currentChildren.stream().filter(newStore -> !map.containsKey(newStore)).collect(Collectors.toList()); + currentChildren.stream().filter(newStore -> !set.contains(newStore)).collect(Collectors.toList()); - Set deletedStores = new HashSet<>(map.keySet()); - currentChildren.forEach(deletedStores::remove); + // obtain the stores that are removed + currentChildren.forEach(set::remove); LOGGER.info( "Store configs list is changed. {} new configs. And will delete {} configs.", newStores.size(), - deletedStores.size()); - // New added store configs - List newConfigs = accessor.getStoreConfigs(newStores); - for (StoreConfig config: newConfigs) { - map.put(config.getStoreName(), config); - accessor.subscribeStoreConfigDataChangedListener(config.getStoreName(), storeConfigChangedListener); - } - - // Deleted store configs - for (String deletedStore: deletedStores) { - map.remove(deletedStore); - accessor.unsubscribeStoreConfigDataChangedListener(deletedStore, storeConfigChangedListener); + set.size()); + + // update the available store set + availableStoreSet.set(new HashSet<>(currentChildren)); + + synchronized (loadedStoreConfigMap) { + Map map = new HashMap<>(loadedStoreConfigMap.get()); + // Deleted store configs + for (String deletedStore: set) { + map.remove(deletedStore); + accessor.unsubscribeStoreConfigDataChangedListener(deletedStore, storeConfigChangedListener); + } + loadedStoreConfigMap.set(map); } - storeConfigMap.set(map); } } } protected class StoreConfigChangedListener implements IZkDataListener { @Override - public void handleDataChange(String dataPath, Object data) throws Exception { + public void handleDataChange(String dataPath, Object data) { if (!(data instanceof StoreConfig)) { throw new VeniceException( "Invalid data from zk notification. Required: StoreConfig, but get: " + data.getClass().getName()); } StoreConfig config = (StoreConfig) data; - synchronized (storeConfigMap) { - Map map = new HashMap<>(storeConfigMap.get()); + synchronized (loadedStoreConfigMap) { + Map map = new HashMap<>(loadedStoreConfigMap.get()); map.put(config.getStoreName(), config); - storeConfigMap.set(map); + loadedStoreConfigMap.set(map); } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/StoreConfig.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/StoreConfig.java index 25cb570baa..e53cc408aa 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/StoreConfig.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/StoreConfig.java @@ -3,6 +3,7 @@ import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.systemstore.schemas.StoreClusterConfig; import com.linkedin.venice.utils.AvroRecordUtils; +import java.util.Objects; /** @@ -88,4 +89,22 @@ public StoreConfig cloneStoreConfig() { clonedStoreConfig.setMigrationDestCluster(getMigrationDestCluster()); return clonedStoreConfig; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + StoreConfig that = (StoreConfig) o; + + return storeClusterConfig.deleting == that.storeClusterConfig.deleting + && storeClusterConfig.storeName.equals(that.storeClusterConfig.storeName) + && storeClusterConfig.cluster.equals(that.storeClusterConfig.cluster) + && (Objects.equals(storeClusterConfig.migrationSrcCluster, that.storeClusterConfig.migrationSrcCluster)) + && (Objects.equals(storeClusterConfig.migrationDestCluster, that.storeClusterConfig.migrationDestCluster)); + } } diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/helix/TestHelixReadOnlyStoreConfigRepository.java b/internal/venice-common/src/test/java/com/linkedin/venice/helix/TestHelixReadOnlyStoreConfigRepository.java index 7d90d6389f..71cc301a01 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/helix/TestHelixReadOnlyStoreConfigRepository.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/helix/TestHelixReadOnlyStoreConfigRepository.java @@ -4,12 +4,12 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import com.linkedin.venice.meta.StoreConfig; import java.util.ArrayList; import java.util.List; -import java.util.Optional; import java.util.stream.Collectors; import org.apache.helix.zookeeper.impl.client.ZkClient; import org.testng.Assert; @@ -21,124 +21,122 @@ public class TestHelixReadOnlyStoreConfigRepository { private ZkStoreConfigAccessor mockAccessor; private HelixReadOnlyStoreConfigRepository storeConfigRepository; + private static final String DEFAULT_STORE_NAME = "testGetStoreConfigStore"; + private static final String DEFAULT_CLUSTER_NAME = "testGetStoreConfigCluster"; + private static final StoreConfig DEFAULT_STORE_CONFIG = new StoreConfig(DEFAULT_STORE_NAME); + static { + DEFAULT_STORE_CONFIG.setCluster(DEFAULT_CLUSTER_NAME); + } + @BeforeMethod public void setUp() { mockAccessor = mock(ZkStoreConfigAccessor.class); - storeConfigRepository = new HelixReadOnlyStoreConfigRepository(mock(ZkClient.class), mockAccessor, 1, 1000); + storeConfigRepository = new HelixReadOnlyStoreConfigRepository(mock(ZkClient.class), mockAccessor); } @Test public void testGetStoreConfig() { - String clusterName = "testGetStoreConfigCluster"; - String storeName = "testGetStoreConfigStore"; - StoreConfig config = new StoreConfig(storeName); - config.setCluster(clusterName); - List list = new ArrayList<>(); - list.add(config); - doReturn(list).when(mockAccessor).getAllStoreConfigs(1, 1000); + List list = new ArrayList<>(); + list.add(DEFAULT_STORE_NAME); + doReturn(list).when(mockAccessor).getAllStores(); + // 1.) obtain a config doesn't exist in available store set storeConfigRepository.refresh(); - Assert.assertEquals( - storeConfigRepository.getStoreConfig(storeName).get().getCluster(), - clusterName, - "Should get the cluster from config correctly."); - Assert.assertFalse( - storeConfigRepository.getStoreConfig("non-existing-store").isPresent(), - "Store config should not exist."); + Assert.assertFalse(storeConfigRepository.getStoreConfig(DEFAULT_STORE_NAME + "test").isPresent()); + + // 2.) obtain a config exists in available store set but not in cache + + // a.) ZK has no store config + doReturn(null).when(mockAccessor).getStoreConfig(DEFAULT_STORE_NAME); + Assert.assertFalse(storeConfigRepository.getStoreConfig(DEFAULT_STORE_NAME).isPresent()); + + // b. ZK has the store config + doReturn(DEFAULT_STORE_CONFIG).when(mockAccessor).getStoreConfig(DEFAULT_STORE_NAME); + Assert.assertEquals(storeConfigRepository.getStoreConfig(DEFAULT_STORE_NAME).get(), DEFAULT_STORE_CONFIG); + verify(mockAccessor, times(1)).subscribeStoreConfigDataChangedListener(eq(DEFAULT_STORE_NAME), any()); + + // 3.) Obtain a config exists in available store set and in cache + Assert.assertEquals(storeConfigRepository.getStoreConfig(DEFAULT_STORE_NAME).get(), DEFAULT_STORE_CONFIG); + // the invocation count should not increase and remain at 1 + verify(mockAccessor, times(1)).subscribeStoreConfigDataChangedListener(eq(DEFAULT_STORE_NAME), any()); } @Test public void testRefreshAndClear() { int storeCount = 10; - List list = new ArrayList<>(); + List storeNames = new ArrayList<>(); for (int i = 0; i < storeCount; i++) { - StoreConfig config = new StoreConfig("testRefreshAndClearStore" + i); - config.setCluster("testRefreshAndClearCluster" + i); - list.add(config); + String name = "testRefreshAndClearStore" + i; + storeNames.add(name); } - doReturn(list).when(mockAccessor).getAllStoreConfigs(1, 1000); + doReturn(storeNames).when(mockAccessor).getAllStores(); storeConfigRepository.refresh(); for (int i = 0; i < storeCount; i++) { - Assert.assertEquals( - storeConfigRepository.getStoreConfig("testRefreshAndClearStore" + i).get().getCluster(), - "testRefreshAndClearCluster" + i, - "Should already load all configs correctly."); + Assert.assertTrue(storeConfigRepository.getAvailableStoreSet().contains("testRefreshAndClearStore" + i)); } + Assert.assertEquals(storeConfigRepository.getLoadedStoreConfigMap().size(), 0); + storeConfigRepository.clear(); for (int i = 0; i < storeCount; i++) { - Assert.assertFalse( - storeConfigRepository.getStoreConfig("testRefreshAndClearStore" + i).isPresent(), - "Should already clear all configs correctly."); + Assert.assertFalse(storeConfigRepository.getAvailableStoreSet().contains("testRefreshAndClearStore" + i)); } + Assert.assertEquals(storeConfigRepository.getLoadedStoreConfigMap().size(), 0); } @Test public void testGetStoreConfigChildrenChangedNotification() throws Exception { + String STORE_PREFIX = "testRefreshAndClearStore"; + String CLUSTER_PREFIX = "testRefreshAndClearCluster"; HelixReadOnlyStoreConfigRepository.StoreConfigAddedOrDeletedChangedListener listener = storeConfigRepository.getStoreConfigAddedOrDeletedListener(); int storeCount = 10; List list = new ArrayList<>(); + List storeList = new ArrayList<>(); for (int i = 0; i < storeCount; i++) { - StoreConfig config = new StoreConfig("testRefreshAndClearStore" + i); - config.setCluster("testRefreshAndClearCluster" + i); + StoreConfig config = new StoreConfig(STORE_PREFIX + i); + storeList.add(STORE_PREFIX + i); + config.setCluster(CLUSTER_PREFIX + i); list.add(config); } - doReturn(list).when(mockAccessor).getAllStoreConfigs(1, 1000); + doReturn(storeList).when(mockAccessor).getAllStores(); storeConfigRepository.refresh(); + for (int i = 0; i < storeCount; i++) { + doReturn(list.get(i)).when(mockAccessor).getStoreConfig(STORE_PREFIX + i); + storeConfigRepository.getStoreConfig(STORE_PREFIX + i); + } List storeNames = list.stream().map(config -> config.getStoreName()).collect(Collectors.toList()); storeNames.remove(0); String newStoreName = "testRefreshAndClearStoreNew"; storeNames.add(newStoreName); - List newStoreNames = new ArrayList<>(); - newStoreNames.add(newStoreName); - List newStoreConfigList = new ArrayList<>(); StoreConfig newStoreConfig = new StoreConfig(newStoreName); newStoreConfig.setCluster("testRefreshAndClearClusterNew"); - newStoreConfigList.add(newStoreConfig); - doReturn(newStoreConfigList).when(mockAccessor).getStoreConfigs(eq(newStoreNames)); listener.handleChildChange("", storeNames); - Assert.assertFalse(storeConfigRepository.getStoreConfig("testRefreshAndClearStore" + 0).isPresent()); - Assert.assertEquals( - storeConfigRepository.getStoreConfig(newStoreName).get().getCluster(), - newStoreConfig.getCluster()); + Assert.assertFalse(storeConfigRepository.getStoreConfig(STORE_PREFIX + 0).isPresent()); + Assert.assertTrue(storeConfigRepository.getAvailableStoreSet().contains(newStoreName)); } @Test public void testGetUpdateStoreConfigNotification() throws Exception { - String storeNAme = "testGetUpdateStoreConfigNotification"; - List list = new ArrayList<>(); - StoreConfig config = new StoreConfig(storeNAme); + String storeName = "testGetUpdateStoreConfigNotification"; + StoreConfig config = new StoreConfig(storeName); + List list = new ArrayList<>(); + list.add(storeName); + doReturn(list).when(mockAccessor).getAllStores(); config.setCluster("testCluster"); - list.add(config); - doReturn(list).when(mockAccessor).getAllStoreConfigs(1, 1000); + doReturn(config).when(mockAccessor).getStoreConfig(storeName); storeConfigRepository.refresh(); + storeConfigRepository.getStoreConfigOrThrow(storeName); HelixReadOnlyStoreConfigRepository.StoreConfigChangedListener listener = storeConfigRepository.getStoreConfigChangedListener(); - StoreConfig newConfig = new StoreConfig(storeNAme); + StoreConfig newConfig = new StoreConfig(storeName); newConfig.setCluster("newCluster"); listener.handleDataChange("", newConfig); - Assert.assertEquals(storeConfigRepository.getStoreConfig(storeNAme).get().getCluster(), newConfig.getCluster()); - } - - @Test - public void testStoreConfigLazyFetch() { - String storeName = "testLazyFetchStore"; - Optional storeConfigOptional = storeConfigRepository.getStoreConfig(storeName); - // config is empty - Assert.assertFalse(storeConfigOptional.isPresent()); - - // config is fetched - StoreConfig config = mock(StoreConfig.class); - doReturn(config).when(config).cloneStoreConfig(); - doReturn(config).when(mockAccessor).getStoreConfig(storeName); - storeConfigOptional = storeConfigRepository.getStoreConfig(storeName); - Assert.assertEquals(storeConfigOptional.get(), config); - verify(mockAccessor).subscribeStoreConfigDataChangedListener(eq(storeName), any()); + Assert.assertEquals(storeConfigRepository.getStoreConfig(storeName).get().getCluster(), newConfig.getCluster()); } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index fe8170c501..ec72a06ab0 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -538,11 +538,7 @@ public VeniceHelixAdmin( this.allowlistAccessor = new ZkAllowlistAccessor(zkClient, adapterSerializer); this.executionIdAccessor = new ZkExecutionIdAccessor(zkClient, adapterSerializer); - this.storeConfigRepo = new HelixReadOnlyStoreConfigRepository( - zkClient, - adapterSerializer, - commonConfig.getRefreshAttemptsForZkReconnect(), - commonConfig.getRefreshIntervalForZkReconnectInMs()); + this.storeConfigRepo = new HelixReadOnlyStoreConfigRepository(zkClient, adapterSerializer); storeConfigRepo.refresh(); this.storeGraveyard = new HelixStoreGraveyard(zkClient, adapterSerializer, multiClusterConfigs.getClusters()); veniceWriterFactory = new VeniceWriterFactory( diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/RouterServer.java b/services/venice-router/src/main/java/com/linkedin/venice/router/RouterServer.java index 12a7cdfc3e..c70f2bedd7 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/RouterServer.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/RouterServer.java @@ -349,11 +349,7 @@ public RouterServer( this.hybridStoreQuotaRepository = config.isHelixHybridStoreQuotaEnabled() ? Optional.of(new HelixHybridStoreQuotaRepository(manager)) : Optional.empty(); - this.storeConfigRepository = new HelixReadOnlyStoreConfigRepository( - zkClient, - adapter, - config.getRefreshAttemptsForZkReconnect(), - config.getRefreshIntervalForZkReconnectInMs()); + this.storeConfigRepository = new HelixReadOnlyStoreConfigRepository(zkClient, adapter); this.liveInstanceMonitor = new HelixLiveInstanceMonitor(this.zkClient, config.getClusterName()); this.pushStatusStoreReader = new PushStatusStoreReader( From ad69be101183c7c0c125348e54ef9b27452c826e Mon Sep 17 00:00:00 2001 From: "Xin(Adam) Chen" Date: Fri, 27 Sep 2024 20:20:07 -0700 Subject: [PATCH 3/4] update --- .../java/com/linkedin/venice/helix/ZkStoreConfigAccessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/helix/ZkStoreConfigAccessor.java b/internal/venice-common/src/main/java/com/linkedin/venice/helix/ZkStoreConfigAccessor.java index 6ad036f662..bfeb488b70 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/helix/ZkStoreConfigAccessor.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/helix/ZkStoreConfigAccessor.java @@ -46,7 +46,7 @@ public ZkStoreConfigAccessor( } public List getAllStores() { - return dataAccessor.getChildNames(ROOT_PATH, AccessOption.PERSISTENT); + return HelixUtils.listPathContents(dataAccessor, ROOT_PATH); } public List getAllStoreConfigs( From 0b0ad8ac191977605349de24989ab3c44317fc73 Mon Sep 17 00:00:00 2001 From: "Xin(Adam) Chen" Date: Fri, 27 Sep 2024 20:33:37 -0700 Subject: [PATCH 4/4] static --- .../java/com/linkedin/venice/meta/StoreConfig.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/StoreConfig.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/StoreConfig.java index e53cc408aa..93b6893b9e 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/StoreConfig.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/StoreConfig.java @@ -90,6 +90,16 @@ public StoreConfig cloneStoreConfig() { return clonedStoreConfig; } + @Override + public int hashCode() { + return Objects.hash( + storeClusterConfig.storeName, + storeClusterConfig.cluster, + storeClusterConfig.deleting, + storeClusterConfig.migrationSrcCluster, + storeClusterConfig.migrationDestCluster); + } + @Override public boolean equals(Object o) { if (this == o) {