Skip to content

Commit

Permalink
feat: Add getNewPartitions method to CloseStream for Bigtable ChangeS…
Browse files Browse the repository at this point in the history
…tream
  • Loading branch information
Teng Zhong committed Feb 28, 2023
1 parent 0e283bf commit 9b23a29
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 6 deletions.
6 changes: 6 additions & 0 deletions google-cloud-bigtable/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@
<method>*getStatus*</method>
<to>com.google.cloud.bigtable.common.Status</to>
</difference>
<!-- add new method is ok because CloseStream is InternalApi -->
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/bigtable/data/v2/models/CloseStream</className>
<method>*getNewPartitions*</method>
</difference>
<!-- change method return type is ok because ChangeStreamMutation is InternalApi -->
<difference>
<differenceType>7006</differenceType>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.auto.value.AutoValue;
import com.google.bigtable.v2.ReadChangeStreamResponse;
import com.google.cloud.bigtable.common.Status;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
import com.google.common.collect.ImmutableList;
import java.io.Serializable;
import java.util.List;
Expand All @@ -35,8 +36,10 @@ public abstract class CloseStream implements ChangeStreamRecord, Serializable {

private static CloseStream create(
com.google.rpc.Status status,
List<ChangeStreamContinuationToken> changeStreamContinuationTokens) {
return new AutoValue_CloseStream(Status.fromProto(status), changeStreamContinuationTokens);
List<ChangeStreamContinuationToken> changeStreamContinuationTokens,
List<ByteStringRange> newPartitions) {
return new AutoValue_CloseStream(
Status.fromProto(status), changeStreamContinuationTokens, newPartitions);
}

/** Wraps the protobuf {@link ReadChangeStreamResponse.CloseStream}. */
Expand All @@ -46,6 +49,13 @@ public static CloseStream fromProto(@Nonnull ReadChangeStreamResponse.CloseStrea
closeStream.getStatus(),
closeStream.getContinuationTokensList().stream()
.map(ChangeStreamContinuationToken::fromProto)
.collect(ImmutableList.toImmutableList()),
closeStream.getNewPartitionsList().stream()
.map(
newPartition ->
ByteStringRange.create(
newPartition.getRowRange().getStartKeyClosed(),
newPartition.getRowRange().getEndKeyOpen()))
.collect(ImmutableList.toImmutableList()));
}

Expand All @@ -56,4 +66,8 @@ public static CloseStream fromProto(@Nonnull ReadChangeStreamResponse.CloseStrea
@InternalApi("Intended for use by the BigtableIO in apache/beam only.")
@Nonnull
public abstract List<ChangeStreamContinuationToken> getChangeStreamContinuationTokens();

@InternalApi("Intended for use by the BigtableIO in apache/beam only.")
@Nonnull
public abstract List<ByteStringRange> getNewPartitions();
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ public void closeStreamSerializationTest() throws IOException, ClassNotFoundExce
.setPartition(StreamPartition.newBuilder().setRowRange(rowRange2).build())
.setToken(token2)
.build())
.addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange1))
.addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange2))
.setStatus(status)
.build();
CloseStream closeStream = CloseStream.fromProto(closeStreamProto);
Expand All @@ -98,6 +100,7 @@ public void closeStreamSerializationTest() throws IOException, ClassNotFoundExce
assertThat(actual.getChangeStreamContinuationTokens())
.isEqualTo(closeStream.getChangeStreamContinuationTokens());
assertThat(actual.getStatus()).isEqualTo(closeStream.getStatus());
assertThat(actual.getNewPartitions()).isEqualTo(closeStream.getNewPartitions());
}

@Test
Expand Down Expand Up @@ -154,6 +157,8 @@ public void closeStreamTest() {
.setPartition(StreamPartition.newBuilder().setRowRange(rowRange2).build())
.setToken(token2)
.build())
.addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange1))
.addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange2))
.setStatus(status)
.build();
CloseStream actualCloseStream = CloseStream.fromProto(closeStreamProto);
Expand All @@ -169,5 +174,11 @@ public void closeStreamTest() {
ByteStringRange.create(rowRange2.getStartKeyClosed(), rowRange2.getEndKeyOpen()));
assertThat(token2)
.isEqualTo(actualCloseStream.getChangeStreamContinuationTokens().get(1).getToken());
assertThat(actualCloseStream.getNewPartitions().get(0))
.isEqualTo(
ByteStringRange.create(rowRange1.getStartKeyClosed(), rowRange1.getEndKeyOpen()));
assertThat(actualCloseStream.getNewPartitions().get(1))
.isEqualTo(
ByteStringRange.create(rowRange2.getStartKeyClosed(), rowRange2.getEndKeyOpen()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public void closeStreamTest() {
ReadChangeStreamResponse.CloseStream closeStreamProto =
ReadChangeStreamResponse.CloseStream.newBuilder()
.addContinuationTokens(streamContinuationToken)
.addNewPartitions(StreamPartition.newBuilder().setRowRange(rowRange))
.setStatus(Status.newBuilder().setCode(0).build())
.build();
ReadChangeStreamResponse response =
Expand All @@ -127,5 +128,8 @@ public void closeStreamTest() {
.isEqualTo(ByteStringRange.create(rowRange.getStartKeyClosed(), rowRange.getEndKeyOpen()));
assertThat(changeStreamContinuationToken.getToken())
.isEqualTo(streamContinuationToken.getToken());
assertThat(closeStream.getNewPartitions().size()).isEqualTo(1);
assertThat(closeStream.getNewPartitions().get(0))
.isEqualTo(ByteStringRange.create(rowRange.getStartKeyClosed(), rowRange.getEndKeyOpen()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.google.cloud.bigtable.data.v2.models.DeleteFamily;
import com.google.cloud.bigtable.data.v2.models.Entry;
import com.google.cloud.bigtable.data.v2.models.Heartbeat;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
import com.google.cloud.bigtable.data.v2.models.SetCell;
import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi;
import com.google.cloud.conformance.bigtable.v2.ChangeStreamTestDefinition.ChangeStreamTestFile;
Expand Down Expand Up @@ -173,6 +174,14 @@ public void test() throws Exception {
.setToken(token.getToken())
.build());
}
for (ByteStringRange newPartition : closeStream.getNewPartitions()) {
builder.addNewPartitions(
StreamPartition.newBuilder()
.setRowRange(
RowRange.newBuilder()
.setStartKeyClosed(newPartition.getStart())
.setEndKeyOpen(newPartition.getEnd())));
}
ReadChangeStreamResponse.CloseStream closeStreamProto = builder.build();
actualResults.add(
ReadChangeStreamTest.Result.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@ private StreamContinuationToken createStreamContinuationToken(@Nonnull String to
.build();
}

private StreamPartition createNewPartitionForCloseStream() {
return StreamPartition.newBuilder()
.setRowRange(
RowRange.newBuilder()
.setStartKeyClosed(ByteString.copyFromUtf8(START_KEY_CLOSED))
.setEndKeyOpen(ByteString.copyFromUtf8(END_KEY_OPEN)))
.build();
}

private ReadChangeStreamResponse.Heartbeat createHeartbeat(
StreamContinuationToken streamContinuationToken) {
return ReadChangeStreamResponse.Heartbeat.newBuilder()
Expand All @@ -133,6 +142,7 @@ private ReadChangeStreamResponse.Heartbeat createHeartbeat(
private ReadChangeStreamResponse.CloseStream createCloseStream() {
return ReadChangeStreamResponse.CloseStream.newBuilder()
.addContinuationTokens(createStreamContinuationToken(CLOSE_STREAM_TOKEN))
.addNewPartitions(createNewPartitionForCloseStream())
.setStatus(com.google.rpc.Status.newBuilder().setCode(0).build())
.build();
}
Expand Down
80 changes: 76 additions & 4 deletions google-cloud-bigtable/src/test/resources/changestream.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,25 @@
"partition": {
"row_range": {
"start_key_closed": "0000000000000001",
"end_key_open": "0000000000000002"
"end_key_open": "0000000000000003"
}
},
"token": "close-stream-token-2"
}
],
"new_partitions": [
{
"row_range": {
"start_key_closed": "",
"end_key_open": "0000000000000002"
}
},
{
"row_range": {
"start_key_closed": "0000000000000002",
"end_key_open": "0000000000000003"
}
}
]
}
}
Expand All @@ -92,11 +106,25 @@
"partition": {
"row_range": {
"start_key_closed": "0000000000000001",
"end_key_open": "0000000000000002"
"end_key_open": "0000000000000003"
}
},
"token": "close-stream-token-2"
}
],
"new_partitions": [
{
"row_range": {
"start_key_closed": "",
"end_key_open": "0000000000000002"
}
},
{
"row_range": {
"start_key_closed": "0000000000000002",
"end_key_open": "0000000000000003"
}
}
]
}
},
Expand Down Expand Up @@ -137,6 +165,14 @@
},
"token": "close-stream-token-1"
}
],
"new_partitions": [
{
"row_range": {
"start_key_closed": "",
"end_key_open": "0000000000000002"
}
}
]
}
}
Expand Down Expand Up @@ -176,6 +212,14 @@
},
"token": "close-stream-token-1"
}
],
"new_partitions": [
{
"row_range": {
"start_key_closed": "",
"end_key_open": "0000000000000002"
}
}
]
}
},
Expand Down Expand Up @@ -1280,11 +1324,25 @@
"partition": {
"row_range": {
"start_key_closed": "0000000000000001",
"end_key_open": "0000000000000002"
"end_key_open": "0000000000000003"
}
},
"token": "close-stream-token-2"
}
],
"new_partitions": [
{
"row_range": {
"start_key_closed": "",
"end_key_open": "0000000000000002"
}
},
{
"row_range": {
"start_key_closed": "0000000000000002",
"end_key_open": "0000000000000003"
}
}
]
}
}
Expand Down Expand Up @@ -1363,11 +1421,25 @@
"partition": {
"row_range": {
"start_key_closed": "0000000000000001",
"end_key_open": "0000000000000002"
"end_key_open": "0000000000000003"
}
},
"token": "close-stream-token-2"
}
],
"new_partitions": [
{
"row_range": {
"start_key_closed": "",
"end_key_open": "0000000000000002"
}
},
{
"row_range": {
"start_key_closed": "0000000000000002",
"end_key_open": "0000000000000003"
}
}
]
}
},
Expand Down

0 comments on commit 9b23a29

Please sign in to comment.