Skip to content

Commit

Permalink
feat: fix some todos and reject stream writer if it's created with mi…
Browse files Browse the repository at this point in the history
…xed behavior of passed in client or not (#1803)

* feat: Split writer into connection worker and wrapper, this is a
prerequisite for multiplexing client

* feat: add connection worker pool skeleton, used for multiplexing client

* feat: add Load api for connection worker for multiplexing client

* feat: add multiplexing support to connection worker. We will treat every
new stream name as a switch of destinationt

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: port the multiplexing client core algorithm and basic tests
also fixed a tiny bug inside fake bigquery write impl for getting thre
response from offset

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: wire multiplexing connection pool to stream writer

* feat: some fixes for multiplexing client

* feat: fix some todos, and reject the mixed behavior of passed in client or not

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
GaoleMeng and gcf-owl-bot[bot] committed Sep 27, 2022
1 parent 6380f71 commit 1a69192
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 46 deletions.
Expand Up @@ -25,6 +25,7 @@
import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -324,18 +325,19 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w
// Though atomic integer is super lightweight, add extra if check in case adding future logic.
testValueCreateConnectionCount.getAndIncrement();
}
// TODO(gaole): figure out a better way to handle header / request body mismatch
// currently we use different header for the client in each connection worker to be different
// currently we use different header for the client in each connection worker to be different
// as the backend require the header to have the same write_stream field as request body.
BigQueryWriteClient clientAfterModification = client;
if (ownsBigQueryWriteClient) {
BigQueryWriteSettings settings = client.getSettings();

// Every header to write api is required to set write_stream in the header to help routing
// the request to correct region.
HashMap<String, String> newHeaders = new HashMap<>();
newHeaders.putAll(settings.toBuilder().getHeaderProvider().getHeaders());
newHeaders.put("x-goog-request-params", "write_stream=" + streamName);
BigQueryWriteSettings stubSettings =
settings
.toBuilder()
.setHeaderProvider(
FixedHeaderProvider.create("x-goog-request-params", "write_stream=" + streamName))
.build();
settings.toBuilder().setHeaderProvider(FixedHeaderProvider.create(newHeaders)).build();
clientAfterModification = BigQueryWriteClient.create(stubSettings);
}
ConnectionWorker connectionWorker =
Expand Down
Expand Up @@ -33,7 +33,6 @@
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

/**
Expand Down Expand Up @@ -68,6 +67,9 @@ public class StreamWriter implements AutoCloseable {
*/
private final SingleConnectionOrConnectionPool singleConnectionOrConnectionPool;

/** Test only param to tell how many times a client is created. */
private static int testOnlyClientCreatedTimes = 0;

/**
* Static map from {@link ConnectionPoolKey} to connection pool. Note this map is static to be
* shared by every stream writer in the same process.
Expand Down Expand Up @@ -169,25 +171,7 @@ private StreamWriter(Builder builder) throws IOException {
this.streamName = builder.streamName;
this.writerSchema = builder.writerSchema;
this.location = builder.location;
boolean ownsBigQueryWriteClient;
if (builder.client == null) {
BigQueryWriteSettings stubSettings =
BigQueryWriteSettings.newBuilder()
.setCredentialsProvider(builder.credentialsProvider)
.setTransportChannelProvider(builder.channelProvider)
.setBackgroundExecutorProvider(builder.executorProvider)
.setEndpoint(builder.endpoint)
// (b/185842996): Temporily fix this by explicitly providing the header.
.setHeaderProvider(
FixedHeaderProvider.create(
"x-goog-request-params", "write_stream=" + this.streamName))
.build();
client = BigQueryWriteClient.create(stubSettings);
ownsBigQueryWriteClient = true;
} else {
client = builder.client;
ownsBigQueryWriteClient = false;
}
boolean ownsBigQueryWriteClient = builder.client == null;
if (!builder.enableConnectionPool) {
this.singleConnectionOrConnectionPool =
SingleConnectionOrConnectionPool.ofSingleConnection(
Expand All @@ -198,7 +182,7 @@ private StreamWriter(Builder builder) throws IOException {
builder.maxInflightBytes,
builder.limitExceededBehavior,
builder.traceId,
client,
getBigQueryWriteClient(builder),
ownsBigQueryWriteClient));
} else {
if (builder.location == null || builder.location.isEmpty()) {
Expand All @@ -212,29 +196,39 @@ private StreamWriter(Builder builder) throws IOException {
SingleConnectionOrConnectionPool.ofConnectionPool(
connectionPoolMap.computeIfAbsent(
ConnectionPoolKey.create(builder.location),
(key) ->
new ConnectionWorkerPool(
(key) -> {
try {
return new ConnectionWorkerPool(
builder.maxInflightRequest,
builder.maxInflightBytes,
builder.limitExceededBehavior,
builder.traceId,
client,
ownsBigQueryWriteClient)));
getBigQueryWriteClient(builder),
ownsBigQueryWriteClient);
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
validateFetchedConnectonPool(builder);
// Shut down the passed in client. Internally we will create another client inside connection
// pool for every new connection worker.
// TODO(gaole): instead of perform close outside of pool approach, change to always create
// new client in connection.
if (client != singleConnectionOrConnectionPool.connectionWorkerPool().bigQueryWriteClient()
&& ownsBigQueryWriteClient) {
client.shutdown();
try {
client.awaitTermination(150, TimeUnit.SECONDS);
} catch (InterruptedException unused) {
// Ignore interruption as this client is not used.
}
client.close();
}
}
}

private BigQueryWriteClient getBigQueryWriteClient(Builder builder) throws IOException {
if (builder.client == null) {
BigQueryWriteSettings stubSettings =
BigQueryWriteSettings.newBuilder()
.setCredentialsProvider(builder.credentialsProvider)
.setTransportChannelProvider(builder.channelProvider)
.setEndpoint(builder.endpoint)
// (b/185842996): Temporily fix this by explicitly providing the header.
.setHeaderProvider(
FixedHeaderProvider.create(
"x-goog-request-params", "write_stream=" + this.streamName))
.build();
testOnlyClientCreatedTimes++;
return BigQueryWriteClient.create(stubSettings);
} else {
return builder.client;
}
}

Expand All @@ -245,6 +239,10 @@ private void validateFetchedConnectonPool(StreamWriter.Builder builder) {
this.singleConnectionOrConnectionPool.connectionWorkerPool().getTraceId(),
builder.traceId)) {
paramsValidatedFailed = "Trace id";
} else if (!Objects.equals(
this.singleConnectionOrConnectionPool.connectionWorkerPool().ownsBigQueryWriteClient(),
builder.client == null)) {
paramsValidatedFailed = "Whether using passed in clients";
} else if (!Objects.equals(
this.singleConnectionOrConnectionPool.connectionWorkerPool().limitExceededBehavior(),
builder.limitExceededBehavior)) {
Expand Down Expand Up @@ -361,6 +359,17 @@ SingleConnectionOrConnectionPool.Kind getConnectionOperationType() {
return singleConnectionOrConnectionPool.getKind();
}

@VisibleForTesting
static int getTestOnlyClientCreatedTimes() {
return testOnlyClientCreatedTimes;
}

@VisibleForTesting
static void cleanUp() {
testOnlyClientCreatedTimes = 0;
connectionPoolMap.clear();
}

/** A builder of {@link StreamWriter}s. */
public static final class Builder {
private static final long DEFAULT_MAX_INFLIGHT_REQUESTS = 1000L;
Expand Down
Expand Up @@ -82,13 +82,15 @@ public void setUp() throws Exception {
.setCredentialsProvider(NoCredentialsProvider.create())
.setTransportChannelProvider(serviceHelper.createChannelProvider())
.build());
StreamWriter.cleanUp();
}

@After
public void tearDown() throws Exception {
log.info("tearDown called");
client.close();
serviceHelper.stop();
StreamWriter.cleanUp();
}

private StreamWriter getMultiplexingTestStreamWriter() throws IOException {
Expand Down Expand Up @@ -722,6 +724,25 @@ public void testInitialization_operationKind() throws Exception {
}
}

@Test
public void createStreamWithDifferentWhetherOwnsClient() throws Exception {
StreamWriter streamWriter1 = getMultiplexingTestStreamWriter();

assertThrows(
IllegalArgumentException.class,
new ThrowingRunnable() {
@Override
public void run() throws Throwable {
StreamWriter.newBuilder(TEST_STREAM)
.setWriterSchema(createProtoSchema())
.setTraceId(TEST_TRACE_ID)
.setLocation("US")
.setEnableConnectionPool(true)
.build();
}
});
}

// Timeout to ensure close() doesn't wait for done callback timeout.
@Test(timeout = 10000)
public void testCloseDisconnectedStream() throws Exception {
Expand Down

0 comments on commit 1a69192

Please sign in to comment.