Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add schema comparision to the main request loop for multiplexing to correctly update schema #1865

Merged
merged 40 commits into from Nov 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
5a63d95
feat: Split writer into connection worker and wrapper, this is a
GaoleMeng Sep 9, 2022
5a13302
feat: add connection worker pool skeleton, used for multiplexing client
GaoleMeng Sep 13, 2022
0297204
Merge branch 'main' into main
GaoleMeng Sep 14, 2022
8a81ad3
feat: add Load api for connection worker for multiplexing client
GaoleMeng Sep 14, 2022
68fd040
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 14, 2022
3106dae
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 15, 2022
5bf04e5
Merge branch 'googleapis:main' into main
GaoleMeng Sep 15, 2022
2fc7551
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng Sep 15, 2022
7a6d919
feat: add multiplexing support to connection worker. We will treat every
GaoleMeng Sep 15, 2022
3ba7659
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 16, 2022
f379a78
Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 16, 2022
9307776
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng Sep 16, 2022
de73013
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 16, 2022
19005a1
feat: port the multiplexing client core algorithm and basic tests
GaoleMeng Sep 19, 2022
c5d14ba
Merge branch 'googleapis:main' into main
GaoleMeng Sep 19, 2022
644360a
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 20, 2022
3099d82
Merge branch 'googleapis:main' into main
GaoleMeng Sep 20, 2022
e707dd6
Merge branch 'googleapis:main' into main
GaoleMeng Sep 20, 2022
9e7a8fa
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng Sep 20, 2022
31f1755
Merge branch 'googleapis:main' into main
GaoleMeng Sep 20, 2022
44c36fc
feat: wire multiplexing connection pool to stream writer
GaoleMeng Sep 20, 2022
87a4036
feat: some fixes for multiplexing client
GaoleMeng Sep 23, 2022
c92ea1b
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 23, 2022
019520c
Merge branch 'googleapis:main' into main
GaoleMeng Sep 26, 2022
47893df
feat: fix some todos, and reject the mixed behavior of passed in clie…
GaoleMeng Sep 27, 2022
8bd4e6a
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 27, 2022
83409b0
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 27, 2022
f7dd72d
Merge branch 'googleapis:main' into main
GaoleMeng Sep 27, 2022
a48399f
Merge branch 'googleapis:main' into main
GaoleMeng Sep 29, 2022
6789bc9
feat: fix the bug that we may peek into the write_stream field but it's
GaoleMeng Sep 29, 2022
46b4e6c
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 29, 2022
dfd4dd9
Merge branch 'googleapis:main' into main
GaoleMeng Sep 29, 2022
d68ae70
feat: fix the bug that we may peek into the write_stream field but it's
GaoleMeng Sep 29, 2022
2983fe9
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng Sep 29, 2022
d406256
Merge branch 'googleapis:main' into main
GaoleMeng Oct 13, 2022
22e9e07
feat: add getInflightWaitSeconds implementation
GaoleMeng Oct 13, 2022
fdb4e1c
Merge branch 'googleapis:main' into main
GaoleMeng Oct 21, 2022
0469474
Merge branch 'googleapis:main' into main
GaoleMeng Nov 2, 2022
d1b7740
feat: Add schema comparision in connection loop to ensure schema upda…
GaoleMeng Nov 3, 2022
e4cd529
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Nov 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -49,7 +49,7 @@ If you are using Maven without BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies:

```Groovy
implementation platform('com.google.cloud:libraries-bom:26.1.3')
implementation platform('com.google.cloud:libraries-bom:26.1.4')

implementation 'com.google.cloud:google-cloud-bigquerystorage'
```
Expand Down
Expand Up @@ -457,7 +457,6 @@ private void appendLoop() {
&& !streamName.isEmpty()
&& !originalRequest.getWriteStream().equals(streamName)) {
streamName = originalRequest.getWriteStream();
writerSchema = originalRequest.getProtoRows().getWriterSchema();
GaoleMeng marked this conversation as resolved.
Show resolved Hide resolved
isMultiplexing = true;
firstRequestForDestinationSwitch = true;
}
Expand All @@ -470,17 +469,22 @@ private void appendLoop() {
if (this.traceId != null) {
originalRequestBuilder.setTraceId(this.traceId);
}
firstRequestForDestinationSwitch = false;
} else if (isMultiplexing) {
// If we are not at the first request after table switch, but we are in multiplexing
// mode, we only need the stream name but not the schema in the request.
originalRequestBuilder.getProtoRowsBuilder().clearWriterSchema();
} else {
// If we are not at the first request or in multiplexing, create request with no schema
// and no stream name.
} else if (!isMultiplexing) {
// If we are not in multiplexing and not in the first request, clear the stream name.
originalRequestBuilder.clearWriteStream();
}

// We don't use message differencer to speed up the comparing process.
// `equals(...)` can bring us false positive, e.g. two repeated field can be considered the
// same but is not considered equals(). However as long as it's never provide false negative
// we will always correctly pass writer schema to backend.
if (firstRequestForDestinationSwitch
|| !originalRequest.getProtoRows().getWriterSchema().equals(writerSchema)) {
writerSchema = originalRequest.getProtoRows().getWriterSchema();
} else {
originalRequestBuilder.getProtoRowsBuilder().clearWriterSchema();
}
firstRequestForDestinationSwitch = false;

// Send should only throw an exception if there is a problem with the request. The catch
// block will handle this case, and return the exception with the result.
Expand Down
Expand Up @@ -169,6 +169,118 @@ public void testMultiplexedAppendSuccess() throws Exception {
}
}

@Test
public void testAppendInSameStream_switchSchema() throws Exception {
try (ConnectionWorker connectionWorker = createConnectionWorker()) {
long appendCount = 20;
for (long i = 0; i < appendCount; i++) {
testBigQueryWrite.addResponse(createAppendResponse(i));
}
List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();

// Schema1 and schema2 are the same content, but different instance.
ProtoSchema schema1 = createProtoSchema("foo");
ProtoSchema schema2 = createProtoSchema("foo");
// Schema3 is a different schema
ProtoSchema schema3 = createProtoSchema("bar");

// We do a pattern of:
// send to stream1, schema1
// send to stream1, schema2
// send to stream1, schema3
// send to stream1, schema3
// send to stream1, schema1
// ...
for (long i = 0; i < appendCount; i++) {
switch ((int) i % 4) {
case 0:
futures.add(
sendTestMessage(
connectionWorker,
TEST_STREAM_1,
schema1,
createFooProtoRows(new String[] {String.valueOf(i)}),
i));
break;
case 1:
futures.add(
sendTestMessage(
connectionWorker,
TEST_STREAM_1,
schema2,
createFooProtoRows(new String[] {String.valueOf(i)}),
i));
break;
case 2:
case 3:
futures.add(
sendTestMessage(
connectionWorker,
TEST_STREAM_1,
schema3,
createFooProtoRows(new String[] {String.valueOf(i)}),
i));
break;
default: // fall out
break;
}
}
// In the real world the response won't contain offset for default stream, but we use offset
// here just to test response.
for (int i = 0; i < appendCount; i++) {
Int64Value offset = futures.get(i).get().getAppendResult().getOffset();
assertThat(offset).isEqualTo(Int64Value.of(i));
}
assertThat(testBigQueryWrite.getAppendRequests().size()).isEqualTo(appendCount);
for (int i = 0; i < appendCount; i++) {
AppendRowsRequest serverRequest = testBigQueryWrite.getAppendRequests().get(i);
assertThat(serverRequest.getProtoRows().getRows().getSerializedRowsCount())
.isGreaterThan(0);
assertThat(serverRequest.getOffset().getValue()).isEqualTo(i);

// We will get the request as the pattern of:
// (writer_stream: t1, schema: schema1)
// (writer_stream: _, schema: _)
// (writer_stream: _, schema: schema3)
// (writer_stream: _, schema: _)
// (writer_stream: _, schema: schema1)
// (writer_stream: _, schema: _)
switch (i % 4) {
case 0:
if (i == 0) {
assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);
}
assertThat(
serverRequest.getProtoRows().getWriterSchema().getProtoDescriptor().getName())
.isEqualTo("foo");
break;
case 1:
assertThat(serverRequest.getWriteStream()).isEmpty();
// Schema is empty if not at the first request after table switch.
assertThat(serverRequest.getProtoRows().hasWriterSchema()).isFalse();
break;
case 2:
assertThat(serverRequest.getWriteStream()).isEmpty();
// Schema is populated after table switch.
assertThat(
serverRequest.getProtoRows().getWriterSchema().getProtoDescriptor().getName())
.isEqualTo("bar");
break;
case 3:
assertThat(serverRequest.getWriteStream()).isEmpty();
// Schema is empty if not at the first request after table switch.
assertThat(serverRequest.getProtoRows().hasWriterSchema()).isFalse();
break;
default: // fall out
break;
}
}

assertThat(connectionWorker.getLoad().destinationCount()).isEqualTo(1);
assertThat(connectionWorker.getLoad().inFlightRequestsBytes()).isEqualTo(0);
}
}

private AppendRowsResponse createAppendResponse(long offset) {
return AppendRowsResponse.newBuilder()
.setAppendResult(
Expand Down
Expand Up @@ -61,8 +61,8 @@
@RunWith(JUnit4.class)
public class StreamWriterTest {
private static final Logger log = Logger.getLogger(StreamWriterTest.class.getName());
private static final String TEST_STREAM_1 = "projects/p/datasets/d/tables/t/streams/s";
private static final String TEST_STREAM_2 = "projects/p/datasets/d/tables/t/streams/s";
private static final String TEST_STREAM_1 = "projects/p/datasets/d1/tables/t1/streams/s1";
private static final String TEST_STREAM_2 = "projects/p/datasets/d2/tables/t2/streams/s2";
private static final String TEST_TRACE_ID = "DATAFLOW:job_id";
private FakeScheduledExecutorService fakeExecutor;
private FakeBigQueryWrite testBigQueryWrite;
Expand Down Expand Up @@ -112,13 +112,17 @@ private StreamWriter getTestStreamWriter() throws IOException {
}

private ProtoSchema createProtoSchema() {
return createProtoSchema("foo");
}

private ProtoSchema createProtoSchema(String fieldName) {
return ProtoSchema.newBuilder()
.setProtoDescriptor(
DescriptorProtos.DescriptorProto.newBuilder()
.setName("Message")
.addField(
DescriptorProtos.FieldDescriptorProto.newBuilder()
.setName("foo")
.setName(fieldName)
.setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING)
.setNumber(1)
.build())
Expand Down Expand Up @@ -562,6 +566,107 @@ public void testOneMaxInflightRequests_MultiplexingCase() throws Exception {
writer2.close();
}

@Test
public void testProtoSchemaPiping_nonMultiplexingCase() throws Exception {
ProtoSchema protoSchema = createProtoSchema();
StreamWriter writer =
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(protoSchema)
.setMaxInflightBytes(1)
.build();
long appendCount = 5;
for (int i = 0; i < appendCount; i++) {
testBigQueryWrite.addResponse(createAppendResponse(i));
}

List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
for (int i = 0; i < appendCount; i++) {
futures.add(writer.append(createProtoRows(new String[] {String.valueOf(i)}), i));
}

for (int i = 0; i < appendCount; i++) {
assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue());
}
assertEquals(appendCount, testBigQueryWrite.getAppendRequests().size());
for (int i = 0; i < appendCount; i++) {
AppendRowsRequest appendRowsRequest = testBigQueryWrite.getAppendRequests().get(i);
assertEquals(i, appendRowsRequest.getOffset().getValue());
if (i == 0) {
appendRowsRequest.getProtoRows().getWriterSchema().equals(protoSchema);
assertEquals(appendRowsRequest.getWriteStream(), TEST_STREAM_1);
} else {
appendRowsRequest.getProtoRows().getWriterSchema().equals(ProtoSchema.getDefaultInstance());
}
}
writer.close();
}

@Test
public void testProtoSchemaPiping_multiplexingCase() throws Exception {
// Use the shared connection mode.
ConnectionWorkerPool.setOptions(
Settings.builder().setMinConnectionsPerRegion(1).setMaxConnectionsPerRegion(1).build());
ProtoSchema schema1 = createProtoSchema("Schema1");
ProtoSchema schema2 = createProtoSchema("Schema2");
StreamWriter writer1 =
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(schema1)
.setLocation("US")
.setEnableConnectionPool(true)
.setMaxInflightRequests(1)
.build();
StreamWriter writer2 =
StreamWriter.newBuilder(TEST_STREAM_2, client)
.setWriterSchema(schema2)
.setMaxInflightRequests(1)
.setEnableConnectionPool(true)
.setLocation("US")
.build();

long appendCountPerStream = 5;
for (int i = 0; i < appendCountPerStream * 4; i++) {
testBigQueryWrite.addResponse(createAppendResponse(i));
}

List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
// In total insert append `appendCountPerStream` * 4 requests.
// We insert using the pattern of streamWriter1, streamWriter1, streamWriter2, streamWriter2
for (int i = 0; i < appendCountPerStream; i++) {
futures.add(writer1.append(createProtoRows(new String[] {String.valueOf(i)}), i * 4));
futures.add(writer1.append(createProtoRows(new String[] {String.valueOf(i)}), i * 4 + 1));
futures.add(writer2.append(createProtoRows(new String[] {String.valueOf(i)}), i * 4 + 2));
futures.add(writer2.append(createProtoRows(new String[] {String.valueOf(i)}), i * 4 + 3));
}

for (int i = 0; i < appendCountPerStream * 4; i++) {
AppendRowsRequest appendRowsRequest = testBigQueryWrite.getAppendRequests().get(i);
assertEquals(i, appendRowsRequest.getOffset().getValue());
if (i % 4 == 0) {
assertEquals(appendRowsRequest.getProtoRows().getWriterSchema(), schema1);
assertEquals(appendRowsRequest.getWriteStream(), TEST_STREAM_1);
} else if (i % 4 == 1) {
assertEquals(
appendRowsRequest.getProtoRows().getWriterSchema(), ProtoSchema.getDefaultInstance());
// Before entering multiplexing (i == 1) case, the write stream won't be populated.
if (i == 1) {
assertEquals(appendRowsRequest.getWriteStream(), "");
} else {
assertEquals(appendRowsRequest.getWriteStream(), TEST_STREAM_1);
}
} else if (i % 4 == 2) {
assertEquals(appendRowsRequest.getProtoRows().getWriterSchema(), schema2);
assertEquals(appendRowsRequest.getWriteStream(), TEST_STREAM_2);
} else {
assertEquals(
appendRowsRequest.getProtoRows().getWriterSchema(), ProtoSchema.getDefaultInstance());
assertEquals(appendRowsRequest.getWriteStream(), TEST_STREAM_2);
}
}

writer1.close();
writer2.close();
}

@Test
public void testAppendsWithTinyMaxInflightBytes() throws Exception {
StreamWriter writer =
Expand Down