From 07bcfd9a0967f781fb8e5e0d764654dbf7bcda91 Mon Sep 17 00:00:00 2001 From: Tony Tang Date: Tue, 21 Mar 2023 11:14:12 -0400 Subject: [PATCH] fix: if new_partitions is size 0, do not enforce size check (#1673) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Do not enforce new_partitions and change_stream_continuation_tokens to be the same size if new_partitions has size of 0 because Cloud Bigtable backend may not be updated to serve new_partitions field yet. `new_partitions` is a new field and the backend may not be serving this field. Change-Id: Id21c293b92c304f05b905ca8e8b3988b9241866e Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/java-bigtable/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes # ☕️ If you write sample code, please follow the [samples format]( https://togithub.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md). --- .../bigtable/data/v2/models/CloseStream.java | 3 +- .../v2/models/ChangeStreamRecordTest.java | 36 +++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java index 221b05f587..344ed06c3c 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java @@ -48,7 +48,8 @@ private static CloseStream create( !changeStreamContinuationTokens.isEmpty(), "A non-OK CloseStream should have continuation token(s)."); Preconditions.checkState( - changeStreamContinuationTokens.size() == newPartitions.size(), + newPartitions.size() == 0 + || changeStreamContinuationTokens.size() == newPartitions.size(), "Number of continuation tokens does not match number of new partitions."); } return new AutoValue_CloseStream( diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java index c00221be3d..3f09d9b443 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java @@ -236,9 +236,45 @@ public void closeStreamTokenAndNewPartitionCountMismatchedTest() { StreamContinuationToken.newBuilder() .setPartition(StreamPartition.newBuilder().setRowRange(rowRange)) .setToken(token)) + .addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange)) + .addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange)) .setStatus(status) .build(); Assert.assertThrows( IllegalStateException.class, (ThrowingRunnable) CloseStream.fromProto(closeStreamProto)); } + + // Tests that number of continuation tokens and new partitions don't need to match if new + // partitions is empty. + @Test + public void closeStreamTokenAndZeroNewPartitionMismatchNoExceptionTest() + throws IOException, ClassNotFoundException { + Status status = Status.newBuilder().setCode(11).build(); + RowRange rowRange = + RowRange.newBuilder() + .setStartKeyClosed(ByteString.copyFromUtf8("")) + .setEndKeyOpen(ByteString.copyFromUtf8("apple")) + .build(); + String token = "close-stream-token-1"; + ReadChangeStreamResponse.CloseStream closeStreamProto = + ReadChangeStreamResponse.CloseStream.newBuilder() + .addContinuationTokens( + StreamContinuationToken.newBuilder() + .setPartition(StreamPartition.newBuilder().setRowRange(rowRange)) + .setToken(token)) + .setStatus(status) + .build(); + CloseStream closeStream = CloseStream.fromProto(closeStreamProto); + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(bos); + oos.writeObject(closeStream); + oos.close(); + ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray())); + CloseStream actual = (CloseStream) ois.readObject(); + assertThat(actual.getChangeStreamContinuationTokens()) + .isEqualTo(closeStream.getChangeStreamContinuationTokens()); + assertThat(actual.getStatus()).isEqualTo(closeStream.getStatus()); + assertThat(actual.getNewPartitions()).isEqualTo(closeStream.getNewPartitions()); + } }