Skip to content

Commit

Permalink
feat: add getInflightWaitSeconds implementation (#1835)
Browse files Browse the repository at this point in the history
* feat: Split writer into connection worker and wrapper, this is a
prerequisite for multiplexing client

* feat: add connection worker pool skeleton, used for multiplexing client

* feat: add Load api for connection worker for multiplexing client

* feat: add multiplexing support to connection worker. We will treat every
new stream name as a switch of destinationt

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: port the multiplexing client core algorithm and basic tests
also fixed a tiny bug inside fake bigquery write impl for getting thre
response from offset

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: wire multiplexing connection pool to stream writer

* feat: some fixes for multiplexing client

* feat: fix some todos, and reject the mixed behavior of passed in client or not

* feat: fix the bug that we may peek into the write_stream field but it's
possible the proto schema does not contain this field

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: fix the bug that we may peek into the write_stream field but it's
possible the proto schema does not contain this field

* feat: add getInflightWaitSeconds implementation

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
GaoleMeng and gcf-owl-bot[bot] committed Oct 18, 2022
1 parent 7e8d900 commit b569116
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 20 deletions.
Expand Up @@ -392,6 +392,21 @@ public void close(StreamWriter streamWriter) {
}
}

/** Fetch the wait seconds from corresponding worker. */
public long getInflightWaitSeconds(StreamWriter streamWriter) {
lock.lock();
try {
ConnectionWorker connectionWorker = streamWriterToConnection.get(streamWriter);
if (connectionWorker == null) {
return 0;
} else {
return connectionWorker.getInflightWaitSeconds();
}
} finally {
lock.unlock();
}
}

/** Enable Test related logic. */
public static void enableTestingLogic() {
enableTesting = true;
Expand Down
Expand Up @@ -141,10 +141,9 @@ public void close(StreamWriter streamWriter) {
}
}

long getInflightWaitSeconds() {
long getInflightWaitSeconds(StreamWriter streamWriter) {
if (getKind() == Kind.CONNECTION_WORKER_POOL) {
throw new IllegalStateException(
"getInflightWaitSeconds is not supported in multiplexing mode.");
return connectionWorkerPool().getInflightWaitSeconds(streamWriter);
}
return connectionWorker().getInflightWaitSeconds();
}
Expand Down Expand Up @@ -363,7 +362,7 @@ public ApiFuture<AppendRowsResponse> append(ProtoRows rows, long offset) {
* stream case.
*/
public long getInflightWaitSeconds() {
return singleConnectionOrConnectionPool.getInflightWaitSeconds();
return singleConnectionOrConnectionPool.getInflightWaitSeconds(this);
}

/** @return a unique Id for the writer. */
Expand Down
Expand Up @@ -29,6 +29,7 @@
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.api.gax.rpc.UnknownException;
import com.google.cloud.bigquery.storage.test.Test.FooType;
import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool.Settings;
import com.google.cloud.bigquery.storage.v1.StorageError.StorageErrorCode;
import com.google.cloud.bigquery.storage.v1.StreamWriter.SingleConnectionOrConnectionPool.Kind;
import com.google.common.base.Strings;
Expand Down Expand Up @@ -60,7 +61,8 @@
@RunWith(JUnit4.class)
public class StreamWriterTest {
private static final Logger log = Logger.getLogger(StreamWriterTest.class.getName());
private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/s";
private static final String TEST_STREAM_1 = "projects/p/datasets/d/tables/t/streams/s";
private static final String TEST_STREAM_2 = "projects/p/datasets/d/tables/t/streams/s";
private static final String TEST_TRACE_ID = "DATAFLOW:job_id";
private FakeScheduledExecutorService fakeExecutor;
private FakeBigQueryWrite testBigQueryWrite;
Expand Down Expand Up @@ -94,7 +96,7 @@ public void tearDown() throws Exception {
}

private StreamWriter getMultiplexingTestStreamWriter() throws IOException {
return StreamWriter.newBuilder(TEST_STREAM, client)
return StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(createProtoSchema())
.setTraceId(TEST_TRACE_ID)
.setLocation("US")
Expand All @@ -103,7 +105,7 @@ private StreamWriter getMultiplexingTestStreamWriter() throws IOException {
}

private StreamWriter getTestStreamWriter() throws IOException {
return StreamWriter.newBuilder(TEST_STREAM, client)
return StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(createProtoSchema())
.setTraceId(TEST_TRACE_ID)
.build();
Expand Down Expand Up @@ -197,7 +199,7 @@ private void verifyAppendRequests(long appendCount) {
if (i == 0) {
// First request received by server should have schema and stream name.
assertTrue(serverRequest.getProtoRows().hasWriterSchema());
assertEquals(serverRequest.getWriteStream(), TEST_STREAM);
assertEquals(serverRequest.getWriteStream(), TEST_STREAM_1);
assertEquals(serverRequest.getTraceId(), TEST_TRACE_ID);
} else {
// Following request should not have schema and stream name.
Expand All @@ -210,7 +212,7 @@ private void verifyAppendRequests(long appendCount) {

public void testBuildBigQueryWriteClientInWriter() throws Exception {
StreamWriter writer =
StreamWriter.newBuilder(TEST_STREAM)
StreamWriter.newBuilder(TEST_STREAM_1)
.setCredentialsProvider(NoCredentialsProvider.create())
.setChannelProvider(serviceHelper.createChannelProvider())
.setWriterSchema(createProtoSchema())
Expand Down Expand Up @@ -253,7 +255,7 @@ public void testNoSchema() throws Exception {
new ThrowingRunnable() {
@Override
public void run() throws Throwable {
StreamWriter.newBuilder(TEST_STREAM, client).build();
StreamWriter.newBuilder(TEST_STREAM_1, client).build();
}
});
assertEquals(ex.getStatus().getCode(), Status.INVALID_ARGUMENT.getCode());
Expand All @@ -267,23 +269,23 @@ public void testInvalidTraceId() throws Exception {
new ThrowingRunnable() {
@Override
public void run() throws Throwable {
StreamWriter.newBuilder(TEST_STREAM).setTraceId("abc");
StreamWriter.newBuilder(TEST_STREAM_1).setTraceId("abc");
}
});
assertThrows(
IllegalArgumentException.class,
new ThrowingRunnable() {
@Override
public void run() throws Throwable {
StreamWriter.newBuilder(TEST_STREAM).setTraceId("abc:");
StreamWriter.newBuilder(TEST_STREAM_1).setTraceId("abc:");
}
});
assertThrows(
IllegalArgumentException.class,
new ThrowingRunnable() {
@Override
public void run() throws Throwable {
StreamWriter.newBuilder(TEST_STREAM).setTraceId(":abc");
StreamWriter.newBuilder(TEST_STREAM_1).setTraceId(":abc");
}
});
}
Expand Down Expand Up @@ -487,7 +489,7 @@ public void serverCloseWhileRequestsInflight() throws Exception {
@Test
public void testZeroMaxInflightRequests() throws Exception {
StreamWriter writer =
StreamWriter.newBuilder(TEST_STREAM, client)
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(createProtoSchema())
.setMaxInflightRequests(0)
.build();
Expand All @@ -499,7 +501,7 @@ public void testZeroMaxInflightRequests() throws Exception {
@Test
public void testZeroMaxInflightBytes() throws Exception {
StreamWriter writer =
StreamWriter.newBuilder(TEST_STREAM, client)
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(createProtoSchema())
.setMaxInflightBytes(0)
.build();
Expand All @@ -511,7 +513,7 @@ public void testZeroMaxInflightBytes() throws Exception {
@Test
public void testOneMaxInflightRequests() throws Exception {
StreamWriter writer =
StreamWriter.newBuilder(TEST_STREAM, client)
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(createProtoSchema())
.setMaxInflightRequests(1)
.build();
Expand All @@ -525,10 +527,45 @@ public void testOneMaxInflightRequests() throws Exception {
writer.close();
}

@Test
public void testOneMaxInflightRequests_MultiplexingCase() throws Exception {
ConnectionWorkerPool.setOptions(Settings.builder().setMaxConnectionsPerRegion(2).build());
StreamWriter writer1 =
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(createProtoSchema())
.setLocation("US")
.setEnableConnectionPool(true)
.setMaxInflightRequests(1)
.build();
StreamWriter writer2 =
StreamWriter.newBuilder(TEST_STREAM_2, client)
.setWriterSchema(createProtoSchema())
.setMaxInflightRequests(1)
.setEnableConnectionPool(true)
.setMaxInflightRequests(1)
.setLocation("US")
.build();

// Server will sleep 1 second before every response.
testBigQueryWrite.setResponseSleep(Duration.ofSeconds(1));
testBigQueryWrite.addResponse(createAppendResponse(0));
testBigQueryWrite.addResponse(createAppendResponse(1));

ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer1, new String[] {"A"});
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer2, new String[] {"A"});

assertTrue(writer1.getInflightWaitSeconds() >= 1);
assertTrue(writer2.getInflightWaitSeconds() >= 1);
assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
assertEquals(1, appendFuture2.get().getAppendResult().getOffset().getValue());
writer1.close();
writer2.close();
}

@Test
public void testAppendsWithTinyMaxInflightBytes() throws Exception {
StreamWriter writer =
StreamWriter.newBuilder(TEST_STREAM, client)
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(createProtoSchema())
.setMaxInflightBytes(1)
.build();
Expand Down Expand Up @@ -560,7 +597,7 @@ public void testAppendsWithTinyMaxInflightBytes() throws Exception {
@Test
public void testAppendsWithTinyMaxInflightBytesThrow() throws Exception {
StreamWriter writer =
StreamWriter.newBuilder(TEST_STREAM, client)
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(createProtoSchema())
.setMaxInflightBytes(1)
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException)
Expand Down Expand Up @@ -595,7 +632,7 @@ public void testLimitBehaviorIgnoreNotAccepted() throws Exception {
@Override
public void run() throws Throwable {
StreamWriter writer =
StreamWriter.newBuilder(TEST_STREAM, client)
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(createProtoSchema())
.setMaxInflightBytes(1)
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Ignore)
Expand Down Expand Up @@ -745,7 +782,7 @@ public void testExtractDatasetName() throws Exception {
@Test(timeout = 10000)
public void testCloseDisconnectedStream() throws Exception {
StreamWriter writer =
StreamWriter.newBuilder(TEST_STREAM)
StreamWriter.newBuilder(TEST_STREAM_1)
.setCredentialsProvider(NoCredentialsProvider.create())
.setChannelProvider(serviceHelper.createChannelProvider())
.setWriterSchema(createProtoSchema())
Expand Down

0 comments on commit b569116

Please sign in to comment.