Skip to content

Commit

Permalink
feat: Add getNewPartitions method to CloseStream for Bigtable ChangeS…
Browse files Browse the repository at this point in the history
…tream (#1655)

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/java-bigtable/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> ☕️

If you write sample code, please follow the [samples format](
https://togithub.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md).
  • Loading branch information
tengzhonger committed Mar 1, 2023
1 parent 0e283bf commit 8847fed
Show file tree
Hide file tree
Showing 9 changed files with 228 additions and 19 deletions.
6 changes: 6 additions & 0 deletions google-cloud-bigtable/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@
<method>*getStatus*</method>
<to>com.google.cloud.bigtable.common.Status</to>
</difference>
<!-- add new method is ok because CloseStream is InternalApi -->
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/bigtable/data/v2/models/CloseStream</className>
<method>*getNewPartitions*</method>
</difference>
<!-- change method return type is ok because ChangeStreamMutation is InternalApi -->
<difference>
<differenceType>7006</differenceType>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.google.auto.value.AutoValue;
import com.google.bigtable.v2.ReadChangeStreamResponse;
import com.google.cloud.bigtable.common.Status;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.io.Serializable;
import java.util.List;
Expand All @@ -35,8 +37,22 @@ public abstract class CloseStream implements ChangeStreamRecord, Serializable {

private static CloseStream create(
com.google.rpc.Status status,
List<ChangeStreamContinuationToken> changeStreamContinuationTokens) {
return new AutoValue_CloseStream(Status.fromProto(status), changeStreamContinuationTokens);
List<ChangeStreamContinuationToken> changeStreamContinuationTokens,
List<ByteStringRange> newPartitions) {
if (status.getCode() == 0) {
Preconditions.checkState(
changeStreamContinuationTokens.isEmpty(),
"An OK CloseStream should not have continuation tokens.");
} else {
Preconditions.checkState(
!changeStreamContinuationTokens.isEmpty(),
"A non-OK CloseStream should have continuation token(s).");
Preconditions.checkState(
changeStreamContinuationTokens.size() == newPartitions.size(),
"Number of continuation tokens does not match number of new partitions.");
}
return new AutoValue_CloseStream(
Status.fromProto(status), changeStreamContinuationTokens, newPartitions);
}

/** Wraps the protobuf {@link ReadChangeStreamResponse.CloseStream}. */
Expand All @@ -46,6 +62,13 @@ public static CloseStream fromProto(@Nonnull ReadChangeStreamResponse.CloseStrea
closeStream.getStatus(),
closeStream.getContinuationTokensList().stream()
.map(ChangeStreamContinuationToken::fromProto)
.collect(ImmutableList.toImmutableList()),
closeStream.getNewPartitionsList().stream()
.map(
newPartition ->
ByteStringRange.create(
newPartition.getRowRange().getStartKeyClosed(),
newPartition.getRowRange().getEndKeyOpen()))
.collect(ImmutableList.toImmutableList()));
}

Expand All @@ -56,4 +79,8 @@ public static CloseStream fromProto(@Nonnull ReadChangeStreamResponse.CloseStrea
@InternalApi("Intended for use by the BigtableIO in apache/beam only.")
@Nonnull
public abstract List<ChangeStreamContinuationToken> getChangeStreamContinuationTokens();

@InternalApi("Intended for use by the BigtableIO in apache/beam only.")
@Nonnull
public abstract List<ByteStringRange> getNewPartitions();
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,13 @@ public StreamResumptionStrategy<ReadChangeStreamRequest, ChangeStreamRecordT> cr
public ChangeStreamRecordT processResponse(ChangeStreamRecordT response) {
// Update the token from a Heartbeat or a ChangeStreamMutation.
// We don't worry about resumption after CloseStream, since the server
// will return an OK status right after sending a CloseStream.
// will close the stream with an OK status right after sending a CloseStream,
// no matter what status the CloseStream.Status is:
// 1) ... => CloseStream.Ok => final OK. This means the read finishes successfully.
// 2) ... => CloseStream.Error => final OK. This means the client should start
// a new ReadChangeStream call with the continuation tokens specified in
// CloseStream.
// Either case, we don't need to retry after receiving a CloseStream.
if (changeStreamRecordAdapter.isHeartbeat(response)) {
this.token = changeStreamRecordAdapter.getTokenFromHeartbeat(response);
} else if (changeStreamRecordAdapter.isChangeStreamMutation(response)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,20 @@
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.function.ThrowingRunnable;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Instant;

@RunWith(JUnit4.class)
public class ChangeStreamRecordTest {

@Rule public ExpectedException expect = ExpectedException.none();

@Test
public void heartbeatSerializationTest() throws IOException, ClassNotFoundException {
ReadChangeStreamResponse.Heartbeat heartbeatProto =
Expand All @@ -60,7 +66,7 @@ public void heartbeatSerializationTest() throws IOException, ClassNotFoundExcept

@Test
public void closeStreamSerializationTest() throws IOException, ClassNotFoundException {
Status status = Status.newBuilder().setCode(0).build();
Status status = Status.newBuilder().setCode(11).build();
RowRange rowRange1 =
RowRange.newBuilder()
.setStartKeyClosed(ByteString.copyFromUtf8(""))
Expand All @@ -85,6 +91,8 @@ public void closeStreamSerializationTest() throws IOException, ClassNotFoundExce
.setPartition(StreamPartition.newBuilder().setRowRange(rowRange2).build())
.setToken(token2)
.build())
.addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange1))
.addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange2))
.setStatus(status)
.build();
CloseStream closeStream = CloseStream.fromProto(closeStreamProto);
Expand All @@ -98,6 +106,7 @@ public void closeStreamSerializationTest() throws IOException, ClassNotFoundExce
assertThat(actual.getChangeStreamContinuationTokens())
.isEqualTo(closeStream.getChangeStreamContinuationTokens());
assertThat(actual.getStatus()).isEqualTo(closeStream.getStatus());
assertThat(actual.getNewPartitions()).isEqualTo(closeStream.getNewPartitions());
}

@Test
Expand Down Expand Up @@ -129,7 +138,7 @@ public void heartbeatTest() {

@Test
public void closeStreamTest() {
Status status = Status.newBuilder().setCode(0).build();
Status status = Status.newBuilder().setCode(11).build();
RowRange rowRange1 =
RowRange.newBuilder()
.setStartKeyClosed(ByteString.copyFromUtf8(""))
Expand All @@ -154,6 +163,8 @@ public void closeStreamTest() {
.setPartition(StreamPartition.newBuilder().setRowRange(rowRange2).build())
.setToken(token2)
.build())
.addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange1))
.addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange2))
.setStatus(status)
.build();
CloseStream actualCloseStream = CloseStream.fromProto(closeStreamProto);
Expand All @@ -169,5 +180,65 @@ public void closeStreamTest() {
ByteStringRange.create(rowRange2.getStartKeyClosed(), rowRange2.getEndKeyOpen()));
assertThat(token2)
.isEqualTo(actualCloseStream.getChangeStreamContinuationTokens().get(1).getToken());
assertThat(actualCloseStream.getNewPartitions().get(0))
.isEqualTo(
ByteStringRange.create(rowRange1.getStartKeyClosed(), rowRange1.getEndKeyOpen()));
assertThat(actualCloseStream.getNewPartitions().get(1))
.isEqualTo(
ByteStringRange.create(rowRange2.getStartKeyClosed(), rowRange2.getEndKeyOpen()));
}

// Tests that an OK CloseStream should not have continuation tokens.
@Test(expected = IllegalStateException.class)
public void closeStreamOkWithContinuationTokenShouldFail() {
Status status = Status.newBuilder().setCode(0).build();
RowRange rowRange =
RowRange.newBuilder()
.setStartKeyClosed(ByteString.copyFromUtf8(""))
.setEndKeyOpen(ByteString.copyFromUtf8("apple"))
.build();
String token = "close-stream-token-1";
ReadChangeStreamResponse.CloseStream closeStreamProto =
ReadChangeStreamResponse.CloseStream.newBuilder()
.addContinuationTokens(
StreamContinuationToken.newBuilder()
.setPartition(StreamPartition.newBuilder().setRowRange(rowRange))
.setToken(token))
.setStatus(status)
.build();
Assert.assertThrows(
IllegalStateException.class, (ThrowingRunnable) CloseStream.fromProto(closeStreamProto));
}

// Tests that a non-OK CloseStream should have continuation tokens.
@Test(expected = IllegalStateException.class)
public void closeStreamErrorWithoutContinuationTokenShouldFail() {
Status status = Status.newBuilder().setCode(11).build();
ReadChangeStreamResponse.CloseStream closeStreamProto =
ReadChangeStreamResponse.CloseStream.newBuilder().setStatus(status).build();
Assert.assertThrows(
IllegalStateException.class, (ThrowingRunnable) CloseStream.fromProto(closeStreamProto));
}

// Tests that the number of continuation tokens should match the number of new partitions.
@Test(expected = IllegalStateException.class)
public void closeStreamTokenAndNewPartitionCountMismatchedTest() {
Status status = Status.newBuilder().setCode(11).build();
RowRange rowRange =
RowRange.newBuilder()
.setStartKeyClosed(ByteString.copyFromUtf8(""))
.setEndKeyOpen(ByteString.copyFromUtf8("apple"))
.build();
String token = "close-stream-token-1";
ReadChangeStreamResponse.CloseStream closeStreamProto =
ReadChangeStreamResponse.CloseStream.newBuilder()
.addContinuationTokens(
StreamContinuationToken.newBuilder()
.setPartition(StreamPartition.newBuilder().setRowRange(rowRange))
.setToken(token))
.setStatus(status)
.build();
Assert.assertThrows(
IllegalStateException.class, (ThrowingRunnable) CloseStream.fromProto(closeStreamProto));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,6 @@ public void heartbeatTest() {
public void closeStreamTest() {
ReadChangeStreamResponse.CloseStream expectedCloseStream =
ReadChangeStreamResponse.CloseStream.newBuilder()
.addContinuationTokens(
StreamContinuationToken.newBuilder().setToken("random-token").build())
.setStatus(Status.newBuilder().setCode(0).build())
.build();
assertThat(changeStreamRecordBuilder.onCloseStream(expectedCloseStream))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ public void closeStreamTest() {
ReadChangeStreamResponse.CloseStream closeStreamProto =
ReadChangeStreamResponse.CloseStream.newBuilder()
.addContinuationTokens(streamContinuationToken)
.setStatus(Status.newBuilder().setCode(0).build())
.addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange))
.setStatus(Status.newBuilder().setCode(11))
.build();
ReadChangeStreamResponse response =
ReadChangeStreamResponse.newBuilder().setCloseStream(closeStreamProto).build();
Expand All @@ -127,5 +128,8 @@ public void closeStreamTest() {
.isEqualTo(ByteStringRange.create(rowRange.getStartKeyClosed(), rowRange.getEndKeyOpen()));
assertThat(changeStreamContinuationToken.getToken())
.isEqualTo(streamContinuationToken.getToken());
assertThat(closeStream.getNewPartitions().size()).isEqualTo(1);
assertThat(closeStream.getNewPartitions().get(0))
.isEqualTo(ByteStringRange.create(rowRange.getStartKeyClosed(), rowRange.getEndKeyOpen()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.google.cloud.bigtable.data.v2.models.DeleteFamily;
import com.google.cloud.bigtable.data.v2.models.Entry;
import com.google.cloud.bigtable.data.v2.models.Heartbeat;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
import com.google.cloud.bigtable.data.v2.models.SetCell;
import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi;
import com.google.cloud.conformance.bigtable.v2.ChangeStreamTestDefinition.ChangeStreamTestFile;
Expand Down Expand Up @@ -173,6 +174,14 @@ public void test() throws Exception {
.setToken(token.getToken())
.build());
}
for (ByteStringRange newPartition : closeStream.getNewPartitions()) {
builder.addNewPartitions(
StreamPartition.newBuilder()
.setRowRange(
RowRange.newBuilder()
.setStartKeyClosed(newPartition.getStart())
.setEndKeyOpen(newPartition.getEnd())));
}
ReadChangeStreamResponse.CloseStream closeStreamProto = builder.build();
actualResults.add(
ReadChangeStreamTest.Result.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@ private StreamContinuationToken createStreamContinuationToken(@Nonnull String to
.build();
}

private StreamPartition createNewPartitionForCloseStream() {
return StreamPartition.newBuilder()
.setRowRange(
RowRange.newBuilder()
.setStartKeyClosed(ByteString.copyFromUtf8(START_KEY_CLOSED))
.setEndKeyOpen(ByteString.copyFromUtf8(END_KEY_OPEN)))
.build();
}

private ReadChangeStreamResponse.Heartbeat createHeartbeat(
StreamContinuationToken streamContinuationToken) {
return ReadChangeStreamResponse.Heartbeat.newBuilder()
Expand All @@ -130,11 +139,18 @@ private ReadChangeStreamResponse.Heartbeat createHeartbeat(
.build();
}

private ReadChangeStreamResponse.CloseStream createCloseStream() {
return ReadChangeStreamResponse.CloseStream.newBuilder()
.addContinuationTokens(createStreamContinuationToken(CLOSE_STREAM_TOKEN))
.setStatus(com.google.rpc.Status.newBuilder().setCode(0).build())
.build();
private ReadChangeStreamResponse.CloseStream createCloseStream(boolean isOk) {
ReadChangeStreamResponse.CloseStream.Builder builder =
ReadChangeStreamResponse.CloseStream.newBuilder();
if (isOk) {
builder.setStatus(com.google.rpc.Status.newBuilder().setCode(0));
} else {
builder
.setStatus(com.google.rpc.Status.newBuilder().setCode(11))
.addContinuationTokens(createStreamContinuationToken(CLOSE_STREAM_TOKEN))
.addNewPartitions(createNewPartitionForCloseStream());
}
return builder.build();
}

private ReadChangeStreamResponse.DataChange createDataChange(boolean done) {
Expand Down Expand Up @@ -178,7 +194,7 @@ public void happyPathHeartbeatTest() {
@Test
public void happyPathCloseStreamTest() {
ReadChangeStreamResponse closeStreamResponse =
ReadChangeStreamResponse.newBuilder().setCloseStream(createCloseStream()).build();
ReadChangeStreamResponse.newBuilder().setCloseStream(createCloseStream(true)).build();
service.expectations.add(
RpcExpectation.create().expectInitialRequest().respondWith(closeStreamResponse));
List<ChangeStreamRecord> actualResults = getResults();
Expand Down Expand Up @@ -221,7 +237,7 @@ public void singleHeartbeatImmediateRetryTest() {
public void singleCloseStreamImmediateRetryTest() {
// CloseStream.
ReadChangeStreamResponse closeStreamResponse =
ReadChangeStreamResponse.newBuilder().setCloseStream(createCloseStream()).build();
ReadChangeStreamResponse.newBuilder().setCloseStream(createCloseStream(false)).build();
service.expectations.add(
RpcExpectation.create().expectInitialRequest().respondWithStatus(Code.UNAVAILABLE));
// Resume with the exact same request.
Expand Down

0 comments on commit 8847fed

Please sign in to comment.