Skip to content

Commit

Permalink
fix: Flaky connenction close issue (#2044)
Browse files Browse the repository at this point in the history
* Added EoS mark in populate buffer. changed log level to Fine. Minor refactor

* Updated count assertion @ testConnectionClose

* Updated condition to trigger `connection.close` at testConnectionClose

* Added and wired flagEndOfStream. Refactored and improved Thread interrupt logic

* Add testConnectionClose for checking connection close while using Read API

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
prash-mi and gcf-owl-bot[bot] committed May 18, 2022
1 parent 251d468 commit 9993717
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 84 deletions.
Expand Up @@ -87,7 +87,11 @@ class ConnectionImpl implements Connection {
Executors.newFixedThreadPool(MAX_PROCESS_QUERY_THREADS_CNT);
private final Logger logger = Logger.getLogger(this.getClass().getName());
private BigQueryReadClient bqReadClient;
private static final long EXECUTOR_TIMEOUT_SEC = 5;
private static final long EXECUTOR_TIMEOUT_SEC = 10;
private BlockingQueue<AbstractList<FieldValue>>
bufferFvl; // initialized lazily iff we end up using the tabledata.list end point
private BlockingQueue<BigQueryResultImpl.Row>
bufferRow; // initialized lazily iff we end up using Read API

ConnectionImpl(
ConnectionSettings connectionSettings,
Expand All @@ -107,6 +111,19 @@ class ConnectionImpl implements Connection {
: Math.min(connectionSettings.getNumBufferedRows() * 2, 100000));
}

/**
* This method returns the number of records to be stored in the buffer and it ensures that it is
* between a reasonable range
*
* @return The max number of records to be stored in the buffer
*/
private int getBufferSize() {
return (connectionSettings == null
|| connectionSettings.getNumBufferedRows() == null
|| connectionSettings.getNumBufferedRows() < 10000
? 20000
: Math.min(connectionSettings.getNumBufferedRows() * 2, 100000));
}
/**
* Cancel method shutdowns the pageFetcher and producerWorker threads gracefully using interrupt.
* The pageFetcher threat will not request for any subsequent threads after interrupting and
Expand All @@ -119,12 +136,14 @@ class ConnectionImpl implements Connection {
@BetaApi
@Override
public synchronized boolean close() throws BigQuerySQLException {
flagEndOfStream(); // an End of Stream flag in the buffer so that the `ResultSet.next()` stops
// advancing the cursor
queryTaskExecutor.shutdownNow();
try {
queryTaskExecutor.awaitTermination(
EXECUTOR_TIMEOUT_SEC, TimeUnit.SECONDS); // wait for the executor shutdown
if (queryTaskExecutor.awaitTermination(EXECUTOR_TIMEOUT_SEC, TimeUnit.SECONDS)) {
return true;
} // else queryTaskExecutor.isShutdown() will be returned outside this try block
} catch (InterruptedException e) {
e.printStackTrace();
logger.log(
Level.WARNING,
"\n" + Thread.currentThread().getName() + " Exception while awaitTermination",
Expand Down Expand Up @@ -330,7 +349,7 @@ BigQueryResult tableDataList(GetQueryResultsResponse firstPage, JobId jobId) {
BigQueryResultStats bigQueryResultStats = getBigQueryResultSetStats(jobId);

// Keeps the deserialized records at the row level, which is consumed by BigQueryResult
BlockingQueue<AbstractList<FieldValue>> buffer = new LinkedBlockingDeque<>(bufferSize);
bufferFvl = new LinkedBlockingDeque<>(getBufferSize());

// Keeps the parsed FieldValueLists
BlockingQueue<Tuple<Iterable<FieldValueList>, Boolean>> pageCache =
Expand All @@ -352,11 +371,11 @@ BigQueryResult tableDataList(GetQueryResultsResponse firstPage, JobId jobId) {
// throughput

populateBufferAsync(
rpcResponseQueue, pageCache, buffer); // spawns a thread to populate the buffer
rpcResponseQueue, pageCache, bufferFvl); // spawns a thread to populate the buffer

// This will work for pagination as well, as buffer is getting updated asynchronously
return new BigQueryResultImpl<AbstractList<FieldValue>>(
schema, numRows, buffer, bigQueryResultStats);
schema, numRows, bufferFvl, bigQueryResultStats);
}

@VisibleForTesting
Expand Down Expand Up @@ -384,7 +403,7 @@ BigQueryResult processQueryResponseResults(
BigQueryResultStats bigQueryResultStats =
new BigQueryResultStatsImpl(queryStatistics, sessionInfo);

BlockingQueue<AbstractList<FieldValue>> buffer = new LinkedBlockingDeque<>(bufferSize);
bufferFvl = new LinkedBlockingDeque<>(getBufferSize());
BlockingQueue<Tuple<Iterable<FieldValueList>, Boolean>> pageCache =
new LinkedBlockingDeque<>(
getPageCacheSize(connectionSettings.getNumBufferedRows(), schema));
Expand All @@ -401,10 +420,10 @@ BigQueryResult processQueryResponseResults(
parseRpcDataAsync(results.getRows(), schema, pageCache, rpcResponseQueue);

// Thread to populate the buffer (a blocking queue) shared with the consumer
populateBufferAsync(rpcResponseQueue, pageCache, buffer);
populateBufferAsync(rpcResponseQueue, pageCache, bufferFvl);

return new BigQueryResultImpl<AbstractList<FieldValue>>(
schema, numRows, buffer, bigQueryResultStats);
schema, numRows, bufferFvl, bigQueryResultStats);
}

@VisibleForTesting
Expand All @@ -420,6 +439,11 @@ void runNextPageTaskAsync(
while (pageToken != null) { // paginate for non null token
if (Thread.currentThread().isInterrupted()
|| queryTaskExecutor.isShutdown()) { // do not process further pages and shutdown
logger.log(
Level.WARNING,
"\n"
+ Thread.currentThread().getName()
+ " Interrupted @ runNextPageTaskAsync");
break;
}
TableDataList tabledataList = tableDataListRpc(destinationTable, pageToken);
Expand All @@ -432,12 +456,12 @@ void runNextPageTaskAsync(
}
rpcResponseQueue.put(
Tuple.of(
null,
false)); // this will stop the parseDataTask as well in case of interrupt or
// when the pagination completes
null, false)); // this will stop the parseDataTask as well when the pagination
// completes
} catch (Exception e) {
throw new BigQueryException(0, e.getMessage(), e);
}
} // We cannot do queryTaskExecutor.shutdownNow() here as populate buffer method may not
// have finished processing the records and even that will be interrupted
};
queryTaskExecutor.execute(nextPageTask);
}
Expand All @@ -460,7 +484,9 @@ void parseRpcDataAsync(
pageCache.put(
Tuple.of(firstFieldValueLists, true)); // this is the first page which we have received.
} catch (InterruptedException e) {
throw new BigQueryException(0, e.getMessage(), e);
logger.log(
Level.WARNING,
"\n" + Thread.currentThread().getName() + " Interrupted @ parseRpcDataAsync");
}

// rpcResponseQueue will get null tuple if Cancel method is called, so no need to explicitly use
Expand All @@ -470,6 +496,14 @@ void parseRpcDataAsync(
try {
boolean hasMorePages = true;
while (hasMorePages) {
if (Thread.currentThread().isInterrupted()
|| queryTaskExecutor.isShutdown()) { // do not process further data and shutdown
logger.log(
Level.WARNING,
"\n" + Thread.currentThread().getName() + " Interrupted @ parseRpcDataAsync");
break;
}
// no interrupt received till this point, continue processing
Tuple<TableDataList, Boolean> rpcResponse = rpcResponseQueue.take();
TableDataList tabledataList = rpcResponse.x();
hasMorePages = rpcResponse.y();
Expand All @@ -482,55 +516,24 @@ void parseRpcDataAsync(
} catch (InterruptedException e) {
logger.log(
Level.WARNING,
"\n" + Thread.currentThread().getName() + " Interrupted",
"\n" + Thread.currentThread().getName() + " Interrupted @ parseRpcDataAsync",
e); // Thread might get interrupted while calling the Cancel method, which is
// expected, so logging this instead of throwing the exception back
}
try {
pageCache.put(Tuple.of(null, false)); // no further pages
pageCache.put(Tuple.of(null, false)); // no further pages, graceful exit scenario
} catch (InterruptedException e) {
logger.log(
Level.WARNING,
"\n" + Thread.currentThread().getName() + " Interrupted",
"\n" + Thread.currentThread().getName() + " Interrupted @ parseRpcDataAsync",
e); // Thread might get interrupted while calling the Cancel method, which is
// expected, so logging this instead of throwing the exception back
}
} // We cannot do queryTaskExecutor.shutdownNow() here as populate buffer method may not
// have finished processing the records and even that will be interrupted
};
queryTaskExecutor.execute(parseDataTask);
}

/**
* This method is called when the current thread is interrupted, this communicates to ResultSet by
* adding a EoS
*
* @param buffer
*/
@InternalApi
void markEoS(BlockingQueue<AbstractList<FieldValue>> buffer) { // package-private
try {
buffer.put(new EndOfFieldValueList()); // All the pages has been processed, put this marker
} catch (InterruptedException e) {
logger.log(Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted", e);
}
}

/**
* This method is called when the current thread is interrupted, this communicates to ResultSet by
* adding a isLast Row
*
* @param buffer
*/
@InternalApi
void markLast(BlockingQueue<BigQueryResultImpl.Row> buffer) { // package-private
try {
buffer.put(
new BigQueryResultImpl.Row(
null, true)); // All the pages has been processed, put this marker
} catch (InterruptedException e) {
logger.log(Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted", e);
}
}

@VisibleForTesting
void populateBufferAsync(
BlockingQueue<Tuple<TableDataList, Boolean>> rpcResponseQueue,
Expand All @@ -551,25 +554,21 @@ void populateBufferAsync(
"\n" + Thread.currentThread().getName() + " Interrupted",
e); // Thread might get interrupted while calling the Cancel method, which is
// expected, so logging this instead of throwing the exception back
markEoS(
buffer); // Thread has been interrupted, communicate to ResultSet by adding EoS
break;
}

if (Thread.currentThread().isInterrupted()
|| queryTaskExecutor.isShutdown()
|| fieldValueLists
== null) { // do not process further pages and shutdown (outerloop)
markEoS(
buffer); // Thread has been interrupted, communicate to ResultSet by adding EoS
break;
}

for (FieldValueList fieldValueList : fieldValueLists) {
try {
if (Thread.currentThread()
.isInterrupted()) { // do not process further pages and shutdown (inner loop)
markEoS(
buffer); // Thread has been interrupted, communicate to ResultSet by adding
// EoS
if (Thread.currentThread().isInterrupted()
|| queryTaskExecutor
.isShutdown()) { // do not process further pages and shutdown (inner loop)
break;
}
buffer.put(fieldValueList);
Expand All @@ -578,24 +577,55 @@ void populateBufferAsync(
}
}
}

try {
if (Thread.currentThread()
.isInterrupted()) { // clear the buffer for any outstanding records
rpcResponseQueue
.clear(); // IMP - so that if it's full then it unblocks and the interrupt logic
// could trigger
buffer.clear();
}
markEoS(buffer); // All the pages has been processed, put this marker
buffer.put(
new EndOfFieldValueList()); // All the pages has been processed, put this marker
} catch (InterruptedException e) {
logger.log(
Level.WARNING,
"\n" + Thread.currentThread().getName() + " Interrupted @ populateBufferAsync",
e);
} finally {
queryTaskExecutor.shutdownNow(); // Shutdown the thread pool
queryTaskExecutor
.shutdownNow(); // Shutdown the thread pool. All the records are now processed
}
};

queryTaskExecutor.execute(populateBufferRunnable);
}

/**
* In an interrupt scenario, like when the background threads are still working and the user calls
* `connection.close() then we need to add an End of Stream flag in the buffer so that the
* `ResultSet.next()` stops advancing the cursor. We cannot rely on the `populateBufferAsync`
* method to do this as the `BlockingQueue.put()` call will error out after the interrupt is
* triggerred
*/
@InternalApi
void flagEndOfStream() { // package-private
try {
if (bufferFvl != null) { // that is tabledata.list endpoint is used
bufferFvl.put(
new EndOfFieldValueList()); // All the pages has been processed, put this marker
} else if (bufferRow != null) {
bufferRow.put(
new BigQueryResultImpl.Row(
null, true)); // All the pages has been processed, put this marker
} else {
logger.log(
Level.WARNING,
"\n"
+ Thread.currentThread().getName()
+ " Could not flag End of Stream, both the buffer types are null. This might happen when the connection is close without executing a query");
}
} catch (InterruptedException e) {
logger.log(
Level.WARNING,
"\n" + Thread.currentThread().getName() + " Interrupted @ flagEndOfStream",
e);
}
}

/* Helper method that parse and populate a page with TableRows */
private static Iterable<FieldValueList> getIterableFieldValueList(
Iterable<TableRow> tableDataPb, final Schema schema) {
Expand Down Expand Up @@ -783,17 +813,17 @@ BigQueryResult highThroughPutRead(
;

ReadSession readSession = bqReadClient.createReadSession(builder.build());
BlockingQueue<BigQueryResultImpl.Row> buffer = new LinkedBlockingDeque<>(bufferSize);
bufferRow = new LinkedBlockingDeque<>(getBufferSize());
Map<String, Integer> arrowNameToIndex = new HashMap<>();
// deserialize and populate the buffer async, so that the client isn't blocked
processArrowStreamAsync(
readSession,
buffer,
bufferRow,
new ArrowRowReader(readSession.getArrowSchema(), arrowNameToIndex),
schema);

logger.log(Level.INFO, "\n Using BigQuery Read API");
return new BigQueryResultImpl<BigQueryResultImpl.Row>(schema, totalRows, buffer, stats);
return new BigQueryResultImpl<BigQueryResultImpl.Row>(schema, totalRows, bufferRow, stats);

} catch (IOException e) {
throw BigQueryException.translateAndThrow(e);
Expand Down Expand Up @@ -827,8 +857,18 @@ private void processArrowStreamAsync(

} catch (Exception e) {
throw BigQueryException.translateAndThrow(e);
} finally {
markLast(buffer); // marking end of stream
} finally { // logic needed for graceful shutdown
// marking end of stream
try {
buffer.put(
new BigQueryResultImpl.Row(
null, true)); // All the pages has been processed, put this marker
} catch (InterruptedException e) {
logger.log(
Level.WARNING,
"\n" + Thread.currentThread().getName() + " Interrupted @ markLast",
e);
}
queryTaskExecutor.shutdownNow(); // Shutdown the thread pool
}
};
Expand Down Expand Up @@ -890,7 +930,6 @@ private void processRows(

if (Thread.currentThread().isInterrupted()
|| queryTaskExecutor.isShutdown()) { // do not process and shutdown
markLast(buffer); // puts an isLast Row in the buffer for ResultSet to process
break; // exit the loop, root will be cleared in the finally block
}

Expand Down Expand Up @@ -981,9 +1020,6 @@ boolean isFastQuerySupported() {

@VisibleForTesting
boolean useReadAPI(Long totalRows, Long pageRows, Schema schema, Boolean hasQueryParameters) {

// TODO(prasmish) get this logic review - totalRows and pageRows are returned null when the job
// is not complete
if ((totalRows == null || pageRows == null)
&& Boolean.TRUE.equals(
connectionSettings
Expand All @@ -992,7 +1028,6 @@ boolean useReadAPI(Long totalRows, Long pageRows, Schema schema, Boolean hasQuer
return true;
}

// Schema schema = Schema.fromPb(tableSchema);
// Read API does not yet support Interval Type or QueryParameters
if (containsIntervalType(schema) || hasQueryParameters) {
logger.log(Level.INFO, "\n Schema has IntervalType, or QueryParameters. Disabling ReadAPI");
Expand Down
Expand Up @@ -2652,12 +2652,11 @@ public void testConnectionClose() throws SQLException {
int cnt = 0;
while (rs.next()) {
++cnt;
if (cnt > 57000) { // breaking at 57K, query reads 300K
if (cnt == 57000) { // breaking at 57000th record, query reads 300K
assertTrue(connection.close()); // we should be able to cancel the connection
}
}
assertTrue(
cnt < 60000); // Few extra records are still read (generally ~10) even after canceling, as
assertTrue(cnt < 100000); // Extra records are still read even after canceling, as
// the backgrounds threads are still active while the interrupt occurs and the
// buffer and pageCache are cleared
}
Expand Down

0 comments on commit 9993717

Please sign in to comment.