From e927debe67e888a4747fbd769c46202f07571b88 Mon Sep 17 00:00:00 2001 From: tylerhu3 Date: Sun, 21 Jul 2024 22:23:47 -0700 Subject: [PATCH] Add check to ensure version is valid before ingestion --- .../controller/VeniceParentHelixAdmin.java | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index 0bec8fbac6..69c78828a8 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -182,6 +182,7 @@ import com.linkedin.venice.meta.StoreDataAudit; import com.linkedin.venice.meta.StoreGraveyard; import com.linkedin.venice.meta.StoreInfo; +import com.linkedin.venice.meta.SystemStore; import com.linkedin.venice.meta.VeniceUserStoreType; import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.VersionStatus; @@ -985,7 +986,8 @@ public void deleteStore( } /** - * @see Admin#addVersionAndStartIngestion(String, String, String, int, int, Version.PushType, String, long, int, boolean, int) + * @see Admin#addVersionAndStartIngestion(String, String, String, int, int, + * Version.PushType, String, long, int, boolean, int) */ @Override public void addVersionAndStartIngestion( @@ -1000,8 +1002,25 @@ public void addVersionAndStartIngestion( int ignoredRmdVersionID, boolean versionSwapDeferred, int repushSourceVersion) { - // Parent controller will always pick the replicationMetadataVersionId from configs. + // Parent controller will always pick the replicationMetadataVersionId from + // configs. final int replicationMetadataVersionId = getRmdVersionID(storeName, clusterName); + + // Ensure valid version + if (getStore(clusterName, storeName).isDaVinciPushStatusStoreEnabled()) { + Set> versionSet = getCurrentVersionsForMultiColos( + clusterName, + SystemStore.SYSTEM_STORE_NAME_PREFIX + VeniceSystemStoreUtils.DAVINCI_PUSH_STATUS_STORE_STR + "_" + storeName) + .entrySet(); + for (Map.Entry versionColoTuple: versionSet) { + if (versionColoTuple.getValue() < 1) { + String dataCenterName = versionColoTuple.getKey(); + String errorSource = SystemStore.SYSTEM_STORE_NAME_PREFIX + + VeniceSystemStoreUtils.DAVINCI_PUSH_STATUS_STORE_STR + "_" + storeName; + throw new VeniceException("Invalid Version for " + dataCenterName + " " + errorSource); + } + } + } Version version = getVeniceHelixAdmin().addVersionOnly( clusterName, storeName,