diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java index 221b05f587..344ed06c3c 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java @@ -48,7 +48,8 @@ private static CloseStream create( !changeStreamContinuationTokens.isEmpty(), "A non-OK CloseStream should have continuation token(s)."); Preconditions.checkState( - changeStreamContinuationTokens.size() == newPartitions.size(), + newPartitions.size() == 0 + || changeStreamContinuationTokens.size() == newPartitions.size(), "Number of continuation tokens does not match number of new partitions."); } return new AutoValue_CloseStream( diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java index c00221be3d..3f09d9b443 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java @@ -236,9 +236,45 @@ public void closeStreamTokenAndNewPartitionCountMismatchedTest() { StreamContinuationToken.newBuilder() .setPartition(StreamPartition.newBuilder().setRowRange(rowRange)) .setToken(token)) + .addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange)) + .addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange)) .setStatus(status) .build(); Assert.assertThrows( IllegalStateException.class, (ThrowingRunnable) CloseStream.fromProto(closeStreamProto)); } + + // Tests that number of continuation tokens and new partitions don't need to match if new + // partitions is empty. + @Test + public void closeStreamTokenAndZeroNewPartitionMismatchNoExceptionTest() + throws IOException, ClassNotFoundException { + Status status = Status.newBuilder().setCode(11).build(); + RowRange rowRange = + RowRange.newBuilder() + .setStartKeyClosed(ByteString.copyFromUtf8("")) + .setEndKeyOpen(ByteString.copyFromUtf8("apple")) + .build(); + String token = "close-stream-token-1"; + ReadChangeStreamResponse.CloseStream closeStreamProto = + ReadChangeStreamResponse.CloseStream.newBuilder() + .addContinuationTokens( + StreamContinuationToken.newBuilder() + .setPartition(StreamPartition.newBuilder().setRowRange(rowRange)) + .setToken(token)) + .setStatus(status) + .build(); + CloseStream closeStream = CloseStream.fromProto(closeStreamProto); + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(bos); + oos.writeObject(closeStream); + oos.close(); + ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray())); + CloseStream actual = (CloseStream) ois.readObject(); + assertThat(actual.getChangeStreamContinuationTokens()) + .isEqualTo(closeStream.getChangeStreamContinuationTokens()); + assertThat(actual.getStatus()).isEqualTo(closeStream.getStatus()); + assertThat(actual.getNewPartitions()).isEqualTo(closeStream.getNewPartitions()); + } }