Skip to content

Commit

Permalink
fix: an atempt to solve test failure in nightly build (#2330)
Browse files Browse the repository at this point in the history
  • Loading branch information
GaoleMeng committed Nov 28, 2023
1 parent ba134e4 commit f77465e
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 14 deletions.
Expand Up @@ -371,22 +371,24 @@ static BigQueryWriteSettings getBigQueryWriteSettings(Builder builder) throws IO

// Validate whether the fetched connection pool matched certain properties.
private void validateFetchedConnectonPool(StreamWriter.Builder builder) {
String paramsValidatedFailed = "";
if (!Objects.equals(
this.singleConnectionOrConnectionPool.connectionWorkerPool().getTraceId(),
builder.traceId)) {
paramsValidatedFailed = "Trace id";
} else if (!Objects.equals(
this.singleConnectionOrConnectionPool.connectionWorkerPool().limitExceededBehavior(),
builder.limitExceededBehavior)) {
paramsValidatedFailed = "Limit Exceeds Behavior";
}

if (!paramsValidatedFailed.isEmpty()) {
String storedTraceId =
this.singleConnectionOrConnectionPool.connectionWorkerPool().getTraceId();
if (!Objects.equals(storedTraceId, builder.traceId)) {
throw new IllegalArgumentException(
String.format(
"%s used for the same connection pool for the same location must be the same!",
paramsValidatedFailed));
"Trace id used for the same connection pool for the same location must be the same, "
+ "however stored trace id is %s, and expected trace id is %s.",
storedTraceId, builder.traceId));
}
FlowController.LimitExceededBehavior storedLimitExceededBehavior =
singleConnectionOrConnectionPool.connectionWorkerPool().limitExceededBehavior();
if (!Objects.equals(storedLimitExceededBehavior, builder.limitExceededBehavior)) {
throw new IllegalArgumentException(
String.format(
"Limit exceeded behavior setting used for the same connection pool for the same "
+ "location must be the same, however stored value is %s, and expected "
+ "value is %s.",
storedLimitExceededBehavior, builder.limitExceededBehavior));
}
}

Expand Down
Expand Up @@ -73,6 +73,8 @@ public class ITBigQueryWriteManualClientTest {
private static final String TABLE = "testtable";
private static final String TABLE2 = "complicatedtable";

private static final String TEST_TRACE_ID = "DATAFLOW:job_id";

private static final String DESCRIPTION = "BigQuery Write Java manual client test dataset";

private static BigQueryWriteClient client;
Expand Down Expand Up @@ -928,6 +930,7 @@ public void testStreamWriterWithDefaultValue() throws ExecutionException, Interr
ProtoSchemaConverter.convert(SimpleTypeForDefaultValue.getDescriptor()))
.setDefaultMissingValueInterpretation(MissingValueInterpretation.DEFAULT_VALUE)
.setEnableConnectionPool(true)
.setTraceId(TEST_TRACE_ID)
.build()) {
// 1. row has both fields set.
SimpleTypeForDefaultValue simpleTypeForDefaultValue1 =
Expand Down Expand Up @@ -1534,16 +1537,19 @@ public void testMultiplexingMixedLocation()
StreamWriter.newBuilder(defaultStream1)
.setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor()))
.setEnableConnectionPool(true)
.setTraceId(TEST_TRACE_ID)
.build();
StreamWriter streamWriter2 =
StreamWriter.newBuilder(defaultStream2)
.setWriterSchema(ProtoSchemaConverter.convert(ComplicateType.getDescriptor()))
.setEnableConnectionPool(true)
.setTraceId(TEST_TRACE_ID)
.build();
StreamWriter streamWriter3 =
StreamWriter.newBuilder(defaultStream3)
.setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor()))
.setEnableConnectionPool(true)
.setTraceId(TEST_TRACE_ID)
.build();
ApiFuture<AppendRowsResponse> response1 =
streamWriter1.append(CreateProtoRows(new String[] {"aaa"}));
Expand All @@ -1557,6 +1563,9 @@ public void testMultiplexingMixedLocation()
assertEquals("us", streamWriter1.getLocation());
assertEquals("us", streamWriter2.getLocation());
assertEquals("eu", streamWriter3.getLocation());
streamWriter1.close();
streamWriter2.close();
streamWriter3.close();
}

@Test
Expand Down

0 comments on commit f77465e

Please sign in to comment.