Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add getInflightWaitSeconds implementation #1835

Merged
merged 36 commits into from Oct 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
5a63d95
feat: Split writer into connection worker and wrapper, this is a
GaoleMeng Sep 9, 2022
5a13302
feat: add connection worker pool skeleton, used for multiplexing client
GaoleMeng Sep 13, 2022
0297204
Merge branch 'main' into main
GaoleMeng Sep 14, 2022
8a81ad3
feat: add Load api for connection worker for multiplexing client
GaoleMeng Sep 14, 2022
68fd040
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 14, 2022
3106dae
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 15, 2022
5bf04e5
Merge branch 'googleapis:main' into main
GaoleMeng Sep 15, 2022
2fc7551
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng Sep 15, 2022
7a6d919
feat: add multiplexing support to connection worker. We will treat every
GaoleMeng Sep 15, 2022
3ba7659
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 16, 2022
f379a78
Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 16, 2022
9307776
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng Sep 16, 2022
de73013
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 16, 2022
19005a1
feat: port the multiplexing client core algorithm and basic tests
GaoleMeng Sep 19, 2022
c5d14ba
Merge branch 'googleapis:main' into main
GaoleMeng Sep 19, 2022
644360a
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 20, 2022
3099d82
Merge branch 'googleapis:main' into main
GaoleMeng Sep 20, 2022
e707dd6
Merge branch 'googleapis:main' into main
GaoleMeng Sep 20, 2022
9e7a8fa
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng Sep 20, 2022
31f1755
Merge branch 'googleapis:main' into main
GaoleMeng Sep 20, 2022
44c36fc
feat: wire multiplexing connection pool to stream writer
GaoleMeng Sep 20, 2022
87a4036
feat: some fixes for multiplexing client
GaoleMeng Sep 23, 2022
c92ea1b
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 23, 2022
019520c
Merge branch 'googleapis:main' into main
GaoleMeng Sep 26, 2022
47893df
feat: fix some todos, and reject the mixed behavior of passed in clie…
GaoleMeng Sep 27, 2022
8bd4e6a
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 27, 2022
83409b0
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 27, 2022
f7dd72d
Merge branch 'googleapis:main' into main
GaoleMeng Sep 27, 2022
a48399f
Merge branch 'googleapis:main' into main
GaoleMeng Sep 29, 2022
6789bc9
feat: fix the bug that we may peek into the write_stream field but it's
GaoleMeng Sep 29, 2022
46b4e6c
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 29, 2022
dfd4dd9
Merge branch 'googleapis:main' into main
GaoleMeng Sep 29, 2022
d68ae70
feat: fix the bug that we may peek into the write_stream field but it's
GaoleMeng Sep 29, 2022
2983fe9
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng Sep 29, 2022
d406256
Merge branch 'googleapis:main' into main
GaoleMeng Oct 13, 2022
22e9e07
feat: add getInflightWaitSeconds implementation
GaoleMeng Oct 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -392,6 +392,21 @@ public void close(StreamWriter streamWriter) {
}
}

/** Fetch the wait seconds from corresponding worker. */
public long getInflightWaitSeconds(StreamWriter streamWriter) {
lock.lock();
try {
ConnectionWorker connectionWorker = streamWriterToConnection.get(streamWriter);
if (connectionWorker == null) {
return 0;
} else {
return connectionWorker.getInflightWaitSeconds();
}
} finally {
lock.unlock();
}
}

/** Enable Test related logic. */
public static void enableTestingLogic() {
enableTesting = true;
Expand Down
Expand Up @@ -141,10 +141,9 @@ public void close(StreamWriter streamWriter) {
}
}

long getInflightWaitSeconds() {
long getInflightWaitSeconds(StreamWriter streamWriter) {
yirutang marked this conversation as resolved.
Show resolved Hide resolved
if (getKind() == Kind.CONNECTION_WORKER_POOL) {
throw new IllegalStateException(
"getInflightWaitSeconds is not supported in multiplexing mode.");
return connectionWorkerPool().getInflightWaitSeconds(streamWriter);
}
return connectionWorker().getInflightWaitSeconds();
}
Expand Down Expand Up @@ -363,7 +362,7 @@ public ApiFuture<AppendRowsResponse> append(ProtoRows rows, long offset) {
* stream case.
*/
public long getInflightWaitSeconds() {
return singleConnectionOrConnectionPool.getInflightWaitSeconds();
return singleConnectionOrConnectionPool.getInflightWaitSeconds(this);
}

/** @return a unique Id for the writer. */
Expand Down
Expand Up @@ -29,6 +29,7 @@
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.api.gax.rpc.UnknownException;
import com.google.cloud.bigquery.storage.test.Test.FooType;
import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool.Settings;
import com.google.cloud.bigquery.storage.v1.StorageError.StorageErrorCode;
import com.google.cloud.bigquery.storage.v1.StreamWriter.SingleConnectionOrConnectionPool.Kind;
import com.google.common.base.Strings;
Expand Down Expand Up @@ -60,7 +61,8 @@
@RunWith(JUnit4.class)
public class StreamWriterTest {
private static final Logger log = Logger.getLogger(StreamWriterTest.class.getName());
private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/s";
private static final String TEST_STREAM_1 = "projects/p/datasets/d/tables/t/streams/s";
private static final String TEST_STREAM_2 = "projects/p/datasets/d/tables/t/streams/s";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be a different name?

private static final String TEST_TRACE_ID = "DATAFLOW:job_id";
private FakeScheduledExecutorService fakeExecutor;
private FakeBigQueryWrite testBigQueryWrite;
Expand Down Expand Up @@ -94,7 +96,7 @@ public void tearDown() throws Exception {
}

private StreamWriter getMultiplexingTestStreamWriter() throws IOException {
return StreamWriter.newBuilder(TEST_STREAM, client)
return StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(createProtoSchema())
.setTraceId(TEST_TRACE_ID)
.setLocation("US")
Expand All @@ -103,7 +105,7 @@ private StreamWriter getMultiplexingTestStreamWriter() throws IOException {
}

private StreamWriter getTestStreamWriter() throws IOException {
return StreamWriter.newBuilder(TEST_STREAM, client)
return StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(createProtoSchema())
.setTraceId(TEST_TRACE_ID)
.build();
Expand Down Expand Up @@ -197,7 +199,7 @@ private void verifyAppendRequests(long appendCount) {
if (i == 0) {
// First request received by server should have schema and stream name.
assertTrue(serverRequest.getProtoRows().hasWriterSchema());
assertEquals(serverRequest.getWriteStream(), TEST_STREAM);
assertEquals(serverRequest.getWriteStream(), TEST_STREAM_1);
assertEquals(serverRequest.getTraceId(), TEST_TRACE_ID);
} else {
// Following request should not have schema and stream name.
Expand All @@ -210,7 +212,7 @@ private void verifyAppendRequests(long appendCount) {

public void testBuildBigQueryWriteClientInWriter() throws Exception {
StreamWriter writer =
StreamWriter.newBuilder(TEST_STREAM)
StreamWriter.newBuilder(TEST_STREAM_1)
.setCredentialsProvider(NoCredentialsProvider.create())
.setChannelProvider(serviceHelper.createChannelProvider())
.setWriterSchema(createProtoSchema())
Expand Down Expand Up @@ -253,7 +255,7 @@ public void testNoSchema() throws Exception {
new ThrowingRunnable() {
@Override
public void run() throws Throwable {
StreamWriter.newBuilder(TEST_STREAM, client).build();
StreamWriter.newBuilder(TEST_STREAM_1, client).build();
}
});
assertEquals(ex.getStatus().getCode(), Status.INVALID_ARGUMENT.getCode());
Expand All @@ -267,23 +269,23 @@ public void testInvalidTraceId() throws Exception {
new ThrowingRunnable() {
@Override
public void run() throws Throwable {
StreamWriter.newBuilder(TEST_STREAM).setTraceId("abc");
StreamWriter.newBuilder(TEST_STREAM_1).setTraceId("abc");
}
});
assertThrows(
IllegalArgumentException.class,
new ThrowingRunnable() {
@Override
public void run() throws Throwable {
StreamWriter.newBuilder(TEST_STREAM).setTraceId("abc:");
StreamWriter.newBuilder(TEST_STREAM_1).setTraceId("abc:");
}
});
assertThrows(
IllegalArgumentException.class,
new ThrowingRunnable() {
@Override
public void run() throws Throwable {
StreamWriter.newBuilder(TEST_STREAM).setTraceId(":abc");
StreamWriter.newBuilder(TEST_STREAM_1).setTraceId(":abc");
}
});
}
Expand Down Expand Up @@ -487,7 +489,7 @@ public void serverCloseWhileRequestsInflight() throws Exception {
@Test
public void testZeroMaxInflightRequests() throws Exception {
StreamWriter writer =
StreamWriter.newBuilder(TEST_STREAM, client)
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(createProtoSchema())
.setMaxInflightRequests(0)
.build();
Expand All @@ -499,7 +501,7 @@ public void testZeroMaxInflightRequests() throws Exception {
@Test
public void testZeroMaxInflightBytes() throws Exception {
StreamWriter writer =
StreamWriter.newBuilder(TEST_STREAM, client)
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(createProtoSchema())
.setMaxInflightBytes(0)
.build();
Expand All @@ -511,7 +513,7 @@ public void testZeroMaxInflightBytes() throws Exception {
@Test
public void testOneMaxInflightRequests() throws Exception {
StreamWriter writer =
StreamWriter.newBuilder(TEST_STREAM, client)
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(createProtoSchema())
.setMaxInflightRequests(1)
.build();
Expand All @@ -525,10 +527,45 @@ public void testOneMaxInflightRequests() throws Exception {
writer.close();
}

@Test
public void testOneMaxInflightRequests_MultiplexingCase() throws Exception {
ConnectionWorkerPool.setOptions(Settings.builder().setMaxConnectionsPerRegion(2).build());
StreamWriter writer1 =
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(createProtoSchema())
.setLocation("US")
.setEnableConnectionPool(true)
.setMaxInflightRequests(1)
.build();
StreamWriter writer2 =
StreamWriter.newBuilder(TEST_STREAM_2, client)
.setWriterSchema(createProtoSchema())
.setMaxInflightRequests(1)
.setEnableConnectionPool(true)
.setMaxInflightRequests(1)
.setLocation("US")
.build();

// Server will sleep 1 second before every response.
testBigQueryWrite.setResponseSleep(Duration.ofSeconds(1));
testBigQueryWrite.addResponse(createAppendResponse(0));
testBigQueryWrite.addResponse(createAppendResponse(1));

ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer1, new String[] {"A"});
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer2, new String[] {"A"});

assertTrue(writer1.getInflightWaitSeconds() >= 1);
assertTrue(writer2.getInflightWaitSeconds() >= 1);
assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
assertEquals(1, appendFuture2.get().getAppendResult().getOffset().getValue());
writer1.close();
writer2.close();
}

@Test
public void testAppendsWithTinyMaxInflightBytes() throws Exception {
StreamWriter writer =
StreamWriter.newBuilder(TEST_STREAM, client)
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(createProtoSchema())
.setMaxInflightBytes(1)
.build();
Expand Down Expand Up @@ -560,7 +597,7 @@ public void testAppendsWithTinyMaxInflightBytes() throws Exception {
@Test
public void testAppendsWithTinyMaxInflightBytesThrow() throws Exception {
StreamWriter writer =
StreamWriter.newBuilder(TEST_STREAM, client)
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(createProtoSchema())
.setMaxInflightBytes(1)
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException)
Expand Down Expand Up @@ -595,7 +632,7 @@ public void testLimitBehaviorIgnoreNotAccepted() throws Exception {
@Override
public void run() throws Throwable {
StreamWriter writer =
StreamWriter.newBuilder(TEST_STREAM, client)
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(createProtoSchema())
.setMaxInflightBytes(1)
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Ignore)
Expand Down Expand Up @@ -745,7 +782,7 @@ public void testExtractDatasetName() throws Exception {
@Test(timeout = 10000)
public void testCloseDisconnectedStream() throws Exception {
StreamWriter writer =
StreamWriter.newBuilder(TEST_STREAM)
StreamWriter.newBuilder(TEST_STREAM_1)
.setCredentialsProvider(NoCredentialsProvider.create())
.setChannelProvider(serviceHelper.createChannelProvider())
.setWriterSchema(createProtoSchema())
Expand Down