Skip to content

Commit

Permalink
Harden update-store workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
nisargthakkar committed Aug 1, 2024
1 parent cdbd176 commit 9edf55a
Show file tree
Hide file tree
Showing 57 changed files with 4,100 additions and 2,780 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,6 @@ private StoreInfo getStoreInfo(Consumer<StoreInfo> info, boolean applyFirst) {
storeInfo.setChunkingEnabled(false);
storeInfo.setCompressionStrategy(CompressionStrategy.NO_OP);
storeInfo.setWriteComputationEnabled(false);
storeInfo.setIncrementalPushEnabled(false);
storeInfo.setNativeReplicationSourceFabric("dc-0");
Map<String, Integer> coloMaps = new HashMap<String, Integer>() {
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,15 @@ private ConfigKeys() {
"controller.store.graveyard.cleanup.sleep.interval.between.list.fetch.minutes";

/**
* Whether the superset schema generation in Parent Controller should be done via passed callback or not.
* Whether the superset schema generation in Primary Controller should be done via passed callback or not.
*/
public static final String CONTROLLER_EXTERNAL_SUPERSET_SCHEMA_GENERATION_ENABLED =
"controller.external.superset.schema.generation.enabled";

/**
* Whether the superset schema generation in Primary Controller should be done via passed callback or not.
*/
@Deprecated
public static final String CONTROLLER_PARENT_EXTERNAL_SUPERSET_SCHEMA_GENERATION_ENABLED =
"controller.parent.external.superset.schema.generation.enabled";

Expand Down Expand Up @@ -1139,6 +1146,7 @@ private ConfigKeys() {
*/
public static final String ENABLE_INCREMENTAL_PUSH_FOR_HYBRID_ACTIVE_ACTIVE_USER_STORES =
"enable.incremental.push.for.hybrid.active.active.user.stores";

/**
* We will use this config to determine whether we should enable partial update for hybrid active-active user stores.
* If this config is set to true, we will enable partial update for hybrid active-active user stores whose latest value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,15 @@ private void deleteStores(List<String> storeNames) {

public StoragePersona getPersonaContainingStore(String storeName) {
String personaName = storeNamePersonaMap.get(storeName);
if (personaName == null)
if (personaName == null) {
return null;
}
return getPersona(personaName);
}

private boolean isStoreSetValid(StoragePersona persona, Optional<Store> additionalStore) {
Set<String> setToValidate = new HashSet<>();
if (additionalStore.isPresent())
setToValidate.add(additionalStore.get().getName());
additionalStore.ifPresent(store -> setToValidate.add(store.getName()));
setToValidate.addAll(persona.getStoresToEnforce());
return setToValidate.stream()
.allMatch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public enum BackupStrategy {
// KEEP_IN_KAFKA_ONLY,
/** Keep in user-specified store eg HDD, other DB */
// KEEP_IN_USER_STORE;
private int value;
private final int value;

BackupStrategy(int v) {
this.value = v;
Expand All @@ -35,6 +35,10 @@ public enum BackupStrategy {
Arrays.stream(values()).forEach(s -> idMapping.put(s.value, s));
}

public int getValue() {
return value;
}

public static BackupStrategy fromInt(int i) {
BackupStrategy strategy = idMapping.get(i);
if (strategy == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,9 @@ public interface HybridStoreConfig extends DataModelBackedStructure<StoreHybridC
BufferReplayPolicy getBufferReplayPolicy();

HybridStoreConfig clone();

default boolean isHybrid() {
return this.getRewindTimeInSeconds() >= 0 && (this.getOffsetLagThresholdToGoOnline() >= 0
|| this.getProducerTimestampLagThresholdToGoOnlineInSeconds() >= 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ public int hashCode() {

@JsonIgnore
public PartitionerConfig clone() {
return new PartitionerConfigImpl(getPartitionerClass(), getPartitionerParams(), getAmplificationFactor());
return new PartitionerConfigImpl(
getPartitionerClass(),
new HashMap<>(getPartitionerParams()),
getAmplificationFactor());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public ZKStore(Store store) {
setSchemaAutoRegisterFromPushJobEnabled(store.isSchemaAutoRegisterFromPushJobEnabled());
setLatestSuperSetValueSchemaId(store.getLatestSuperSetValueSchemaId());
setHybridStoreDiskQuotaEnabled(store.isHybridStoreDiskQuotaEnabled());
setEtlStoreConfig(store.getEtlStoreConfig());
setEtlStoreConfig(store.getEtlStoreConfig().clone());
setStoreMetadataSystemStoreEnabled(store.isStoreMetadataSystemStoreEnabled());
setLatestVersionPromoteToCurrentTimestamp(store.getLatestVersionPromoteToCurrentTimestamp());
setBackupVersionRetentionMs(store.getBackupVersionRetentionMs());
Expand All @@ -219,7 +219,7 @@ public ZKStore(Store store) {
setStoreMetaSystemStoreEnabled(store.isStoreMetaSystemStoreEnabled());
setActiveActiveReplicationEnabled(store.isActiveActiveReplicationEnabled());
setRmdVersion(store.getRmdVersion());
setViewConfigs(store.getViewConfigs());
setViewConfigs(new HashMap<>(store.getViewConfigs()));
setStorageNodeReadQuotaEnabled(store.isStorageNodeReadQuotaEnabled());
setUnusedSchemaDeletionEnabled(store.isUnusedSchemaDeletionEnabled());
setMinCompactionLagSeconds(store.getMinCompactionLagSeconds());
Expand Down Expand Up @@ -361,11 +361,7 @@ public void setLargestUsedVersionNumber(int largestUsedVersionNumber) {
@SuppressWarnings("unused") // Used by Serializer/De-serializer for storing to Zoo Keeper
@Override
public long getStorageQuotaInByte() {
// This is a safeguard in case that some old stores do not have storage quota field
return (this.storeProperties.storageQuotaInByte <= 0
&& this.storeProperties.storageQuotaInByte != UNLIMITED_STORAGE_QUOTA)
? DEFAULT_STORAGE_QUOTA
: this.storeProperties.storageQuotaInByte;
return this.storeProperties.storageQuotaInByte;
}

@SuppressWarnings("unused") // Used by Serializer/De-serializer for storing to Zoo Keeper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ public static int calculatePartitionCount(
partitionCount,
storageQuota,
storeName);
return (int) partitionCount;

// At least 1 partition
return partitionCount <= 0 ? 1 : (int) partitionCount;
}

public static VenicePartitioner getVenicePartitioner(PartitionerConfig config) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.linkedin.venice.meta;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertThrows;

import com.linkedin.venice.exceptions.VeniceException;
import org.testng.annotations.Test;


public class BackupStrategyTest {
@Test
public void testFromInt() {
assertEquals(BackupStrategy.fromInt(0), BackupStrategy.KEEP_MIN_VERSIONS);
assertEquals(BackupStrategy.fromInt(1), BackupStrategy.DELETE_ON_NEW_PUSH_START);
assertThrows(VeniceException.class, () -> BackupStrategy.fromInt(2));
}

@Test
public void testGetValue() {
assertEquals(BackupStrategy.KEEP_MIN_VERSIONS.getValue(), 0);
assertEquals(BackupStrategy.DELETE_ON_NEW_PUSH_START.getValue(), 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,26 @@ public void deserializes() throws IOException {
Assert.assertEquals(fasterXml.getRewindTimeInSeconds(), 123L);
Assert.assertEquals(fasterXml.getDataReplicationPolicy(), DataReplicationPolicy.NON_AGGREGATE);
}

@Test
public void testIsHybrid() {
HybridStoreConfig hybridStoreConfig;
hybridStoreConfig = new HybridStoreConfigImpl(-1, -1, -1, null, null);
Assert.assertFalse(hybridStoreConfig.isHybrid());

hybridStoreConfig = new HybridStoreConfigImpl(100, -1, -1, null, null);
Assert.assertFalse(hybridStoreConfig.isHybrid());

hybridStoreConfig = new HybridStoreConfigImpl(100, 100, -1, null, null);
Assert.assertTrue(hybridStoreConfig.isHybrid());

hybridStoreConfig = new HybridStoreConfigImpl(100, 100, 100, null, null);
Assert.assertTrue(hybridStoreConfig.isHybrid());

hybridStoreConfig = new HybridStoreConfigImpl(100, -1, 100, null, null);
Assert.assertTrue(hybridStoreConfig.isHybrid());

hybridStoreConfig = new HybridStoreConfigImpl(-1, -1, 100, null, null);
Assert.assertFalse(hybridStoreConfig.isHybrid());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,40 @@ public void testClusterLevelActiveActiveReplicationConfigForNewHybridStores() th
assertFalse(parentControllerClient.getStore(storeName).getStore().isActiveActiveReplicationEnabled());
});
}

@Test(timeOut = TEST_TIMEOUT)
public void testClusterLevelActiveActiveReplicationConfigForNewIncrementalPushStores() throws IOException {
String storeName = Utils.getUniqueString("test-store-incremental");
String pushJobId1 = "test-push-job-id-1";
parentControllerClient.createNewStore(storeName, "test-owner", "\"string\"", "\"string\"");
parentControllerClient.emptyPush(storeName, pushJobId1, 1);

// Version 1 should exist.
StoreInfo store = assertCommand(parentControllerClient.getStore(storeName)).getStore();
assertEquals(store.getVersions().size(), 1);

// Check store level Active/Active is enabled or not
assertFalse(store.isActiveActiveReplicationEnabled());
assertFalse(store.isIncrementalPushEnabled());
assertFalse(store.isActiveActiveReplicationEnabled());

// Convert to incremental push store
assertCommand(
parentControllerClient.updateStore(storeName, new UpdateStoreQueryParams().setIncrementalPushEnabled(true)));
TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT, TimeUnit.MILLISECONDS, () -> {
StoreInfo storeToTest = parentControllerClient.getStore(storeName).getStore();
assertTrue(storeToTest.isIncrementalPushEnabled());
assertTrue(storeToTest.isActiveActiveReplicationEnabled());
});

// After inc push is disabled, even default A/A config for pure hybrid store is false,
// original store A/A config is enabled.
assertCommand(
parentControllerClient.updateStore(storeName, new UpdateStoreQueryParams().setIncrementalPushEnabled(false)));
TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT, TimeUnit.MILLISECONDS, () -> {
StoreInfo storeToTest = parentControllerClient.getStore(storeName).getStore();
assertFalse(storeToTest.isIncrementalPushEnabled());
assertTrue(storeToTest.isActiveActiveReplicationEnabled());
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceMultiRegionClusterCreateOptions;
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
import com.linkedin.venice.meta.DataReplicationPolicy;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Time;
Expand Down Expand Up @@ -89,6 +90,7 @@ public void testClusterLevelNativeReplicationConfigForNewStores() {
parentControllerClient.updateStore(
storeName,
new UpdateStoreQueryParams().setIncrementalPushEnabled(true)
.setHybridDataReplicationPolicy(DataReplicationPolicy.NONE)
.setHybridRewindSeconds(1L)
.setHybridOffsetLagThreshold(10)));
TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT, TimeUnit.MILLISECONDS, () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,27 +479,6 @@ public void testUpdateStoreMetadata() throws Exception {
PartitionerConfig partitionerConfig = new PartitionerConfigImpl();
veniceAdmin.setStorePartitionerConfig(clusterName, storeName, partitionerConfig);

veniceAdmin.setIncrementalPushEnabled(clusterName, storeName, true);
Assert.assertTrue(veniceAdmin.getStore(clusterName, storeName).isIncrementalPushEnabled());

veniceAdmin.setBootstrapToOnlineTimeoutInHours(clusterName, storeName, 48);
Assert.assertEquals(veniceAdmin.getStore(clusterName, storeName).getBootstrapToOnlineTimeoutInHours(), 48);

veniceAdmin.setHybridStoreDiskQuotaEnabled(clusterName, storeName, true);
Assert.assertTrue(veniceAdmin.getStore(clusterName, storeName).isHybridStoreDiskQuotaEnabled());

// test setting per-store RMD (replication metadata) version ID
int rmdVersion = veniceAdmin.getStore(clusterName, storeName).getRmdVersion();
Assert.assertEquals(rmdVersion, -1);

veniceAdmin.setReplicationMetadataVersionID(clusterName, storeName, 2);
rmdVersion = veniceAdmin.getStore(clusterName, storeName).getRmdVersion();
Assert.assertNotEquals(rmdVersion, -1);
Assert.assertEquals(rmdVersion, 2);

// test hybrid config
// set incrementalPushEnabled to be false as hybrid and incremental are mutex
veniceAdmin.setIncrementalPushEnabled(clusterName, storeName, false);
Assert.assertFalse(veniceAdmin.getStore(clusterName, storeName).isHybrid());
veniceAdmin.updateStore(
clusterName,
Expand Down Expand Up @@ -644,26 +623,18 @@ public void testGetRealTimeTopic() {
Assert.assertThrows(VeniceNoStoreException.class, () -> veniceAdmin.getRealTimeTopic(clusterName, storeName));

veniceAdmin.createStore(clusterName, storeName, "owner", KEY_SCHEMA, VALUE_SCHEMA);

// Must not be able to get a real time topic if the store is not hybrid
Assert.assertThrows(VeniceException.class, () -> veniceAdmin.getRealTimeTopic(clusterName, storeName));

veniceAdmin.updateStore(
clusterName,
storeName,
new UpdateStoreQueryParams().setHybridRewindSeconds(25L).setHybridOffsetLagThreshold(100L)); // make store
// hybrid

try {
veniceAdmin.getRealTimeTopic(clusterName, storeName);
Assert.fail("Must not be able to get a real time topic until the store is initialized with a version");
} catch (VeniceException e) {
Assert.assertTrue(
e.getMessage().contains("is not initialized with a version"),
"Got unexpected error message: " + e.getMessage());
}

int partitions = 2; // TODO verify partition count for RT topic.
veniceAdmin.incrementVersionIdempotent(clusterName, storeName, Version.guidBasedDummyPushId(), partitions, 1);

String rtTopic = veniceAdmin.getRealTimeTopic(clusterName, storeName);
Assert.assertEquals(rtTopic, Version.composeRealTimeTopic(storeName));
String expectedRtTopic = Version.composeRealTimeTopic(storeName);
Assert.assertEquals(veniceAdmin.getRealTimeTopic(clusterName, storeName), expectedRtTopic);
}

@Test(timeOut = TOTAL_TIMEOUT_FOR_LONG_TEST_MS)
Expand Down Expand Up @@ -1467,33 +1438,27 @@ public void leakyTopicTruncation() {
}
}

@Test(timeOut = TOTAL_TIMEOUT_FOR_LONG_TEST_MS)
public void testSetLargestUsedVersion() {
String storeName = "testSetLargestUsedVersion";
veniceAdmin.createStore(clusterName, storeName, storeOwner, KEY_SCHEMA, VALUE_SCHEMA);
Store store = veniceAdmin.getStore(clusterName, storeName);
Assert.assertEquals(store.getLargestUsedVersionNumber(), 0);

Version version =
veniceAdmin.incrementVersionIdempotent(clusterName, storeName, Version.guidBasedDummyPushId(), 1, 1);
store = veniceAdmin.getStore(clusterName, storeName);
Assert.assertTrue(version.getNumber() > 0);
Assert.assertEquals(store.getLargestUsedVersionNumber(), version.getNumber());

veniceAdmin.setStoreLargestUsedVersion(clusterName, storeName, 0);
store = veniceAdmin.getStore(clusterName, storeName);
Assert.assertEquals(store.getLargestUsedVersionNumber(), 0);
}

@Test(timeOut = TOTAL_TIMEOUT_FOR_LONG_TEST_MS)
public void testWriteComputationEnabled() {
String storeName = Utils.getUniqueString("test_store");
veniceAdmin.createStore(clusterName, storeName, storeOwner, "\"string\"", "\"string\"");
String VALUE_FIELD_NAME = "int_field";
String SECOND_VALUE_FIELD_NAME = "opt_int_field";
String VALUE_SCHEMA_V2_STR = "{\n" + "\"type\": \"record\",\n" + "\"name\": \"TestValueSchema\",\n"
+ "\"namespace\": \"com.linkedin.venice.fastclient.schema\",\n" + "\"fields\": [\n" + " {\"name\": \""
+ VALUE_FIELD_NAME + "\", \"type\": \"int\", \"default\": 10},\n" + "{\"name\": \"" + SECOND_VALUE_FIELD_NAME
+ "\", \"type\": [\"null\", \"int\"], \"default\": null}]\n" + "}";

veniceAdmin.createStore(clusterName, storeName, storeOwner, "\"string\"", VALUE_SCHEMA_V2_STR);

Store store = veniceAdmin.getStore(clusterName, storeName);
Assert.assertFalse(store.isWriteComputationEnabled());

veniceAdmin.updateStore(clusterName, storeName, new UpdateStoreQueryParams().setWriteComputationEnabled(true));
veniceAdmin.updateStore(
clusterName,
storeName,
new UpdateStoreQueryParams().setHybridRewindSeconds(1000)
.setHybridOffsetLagThreshold(1000)
.setWriteComputationEnabled(true));
store = veniceAdmin.getStore(clusterName, storeName);
Assert.assertTrue(store.isWriteComputationEnabled());
}
Expand Down Expand Up @@ -1744,10 +1709,12 @@ public void testVersionLevelActiveActiveReplicationConfig() {
String pushJobId1 = "test-push-job-id-1";
veniceAdmin.createStore(clusterName, storeName, "test-owner", KEY_SCHEMA, VALUE_SCHEMA);
/**
* Enable L/F and Active/Active replication
* Enable L/F, NR and Active/Active replication
*/
veniceAdmin
.updateStore(clusterName, storeName, new UpdateStoreQueryParams().setActiveActiveReplicationEnabled(true));
veniceAdmin.updateStore(
clusterName,
storeName,
new UpdateStoreQueryParams().setNativeReplicationEnabled(true).setActiveActiveReplicationEnabled(true));

/**
* Add version 1
Expand Down Expand Up @@ -1887,7 +1854,10 @@ public void testUpdateStoreWithVersionInheritedConfigs() {
veniceAdmin.updateStore(
clusterName,
storeName,
new UpdateStoreQueryParams().setHybridOffsetLagThreshold(1)
new UpdateStoreQueryParams().setNativeReplicationEnabled(true)
.setActiveActiveReplicationEnabled(true)
.setChunkingEnabled(true)
.setHybridOffsetLagThreshold(1)
.setHybridRewindSeconds(1)
.setStoreViews(viewConfig));
veniceAdmin.incrementVersionIdempotent(clusterName, storeName, Version.guidBasedDummyPushId(), 1, 1);
Expand Down
Loading

0 comments on commit 9edf55a

Please sign in to comment.