Skip to content

Commit

Permalink
fix: fix for flaky connection close issue (#2034)
Browse files Browse the repository at this point in the history
* Added hasReachedEnd flag in next() method to avoid blocking

* Added and wired markEoS and markLast methods. Minor refactor
  • Loading branch information
prash-mi committed May 10, 2022
1 parent aeca04d commit db3daac
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 21 deletions.
Expand Up @@ -106,18 +106,27 @@ public ResultSet getResultSet() {
}

private class BigQueryResultSet extends AbstractJdbcResultSet {
private boolean hasReachedEnd =
false; // flag which will be set to true when we have encountered a EndOfStream or when
// curTup.isLast(). Ref: https://github.com/googleapis/java-bigquery/issues/2033

@Override
/*Advances the result set to the next row, returning false if no such row exists. Potentially blocking operation*/
public boolean next() throws SQLException {
if (hasReachedEnd) { // if end of stream is reached then we can simply return false
return false;
}
try {
cursor = buffer.take(); // advance the cursor,Potentially blocking operation
if (isEndOfStream(cursor)) { // check for end of stream
cursor = null;
hasReachedEnd = true;
return false;
} else if (cursor instanceof Row) {
Row curTup = (Row) cursor;
if (curTup.isLast()) { // last Tuple
cursor = null;
hasReachedEnd = true;
return false;
}
return true;
Expand Down
Expand Up @@ -20,6 +20,7 @@
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;

import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.services.bigquery.model.GetQueryResultsResponse;
import com.google.api.services.bigquery.model.JobConfigurationQuery;
import com.google.api.services.bigquery.model.QueryParameter;
Expand Down Expand Up @@ -496,6 +497,38 @@ void parseRpcDataAsync(
queryTaskExecutor.execute(parseDataTask);
}

/**
* This method is called when the current thread is interrupted, this communicates to ResultSet by
* adding a EoS
*
* @param buffer
*/
@InternalApi
void markEoS(BlockingQueue<AbstractList<FieldValue>> buffer) { // package-private
try {
buffer.put(new EndOfFieldValueList()); // All the pages has been processed, put this marker
} catch (InterruptedException e) {
logger.log(Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted", e);
}
}

/**
* This method is called when the current thread is interrupted, this communicates to ResultSet by
* adding a isLast Row
*
* @param buffer
*/
@InternalApi
void markLast(BlockingQueue<BigQueryResultImpl.Row> buffer) { // package-private
try {
buffer.put(
new BigQueryResultImpl.Row(
null, true)); // All the pages has been processed, put this marker
} catch (InterruptedException e) {
logger.log(Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted", e);
}
}

@VisibleForTesting
void populateBufferAsync(
BlockingQueue<Tuple<TableDataList, Boolean>> rpcResponseQueue,
Expand All @@ -516,18 +549,25 @@ void populateBufferAsync(
"\n" + Thread.currentThread().getName() + " Interrupted",
e); // Thread might get interrupted while calling the Cancel method, which is
// expected, so logging this instead of throwing the exception back
markEoS(
buffer); // Thread has been interrupted, communicate to ResultSet by adding EoS
}

if (Thread.currentThread().isInterrupted()
|| fieldValueLists
== null) { // do not process further pages and shutdown (outerloop)
markEoS(
buffer); // Thread has been interrupted, communicate to ResultSet by adding EoS
break;
}

for (FieldValueList fieldValueList : fieldValueLists) {
try {
if (Thread.currentThread()
.isInterrupted()) { // do not process further pages and shutdown (inner loop)
markEoS(
buffer); // Thread has been interrupted, communicate to ResultSet by adding
// EoS
break;
}
buffer.put(fieldValueList);
Expand All @@ -537,19 +577,15 @@ void populateBufferAsync(
}
}

if (Thread.currentThread()
.isInterrupted()) { // clear the buffer for any outstanding records
buffer.clear();
rpcResponseQueue
.clear(); // IMP - so that if it's full then it unblocks and the interrupt logic
// could trigger
}

try {
buffer.put(
new EndOfFieldValueList()); // All the pages has been processed, put this marker
} catch (InterruptedException e) {
throw new BigQueryException(0, e.getMessage(), e);
if (Thread.currentThread()
.isInterrupted()) { // clear the buffer for any outstanding records
rpcResponseQueue
.clear(); // IMP - so that if it's full then it unblocks and the interrupt logic
// could trigger
buffer.clear();
}
markEoS(buffer); // All the pages has been processed, put this marker
} finally {
queryTaskExecutor.shutdownNow(); // Shutdown the thread pool
}
Expand Down Expand Up @@ -790,12 +826,8 @@ private void processArrowStreamAsync(
} catch (Exception e) {
throw BigQueryException.translateAndThrow(e);
} finally {
try {
buffer.put(new BigQueryResultImpl.Row(null, true)); // marking end of stream
queryTaskExecutor.shutdownNow(); // Shutdown the thread pool
} catch (InterruptedException e) {
logger.log(Level.WARNING, "\n Error occurred ", e);
}
markLast(buffer); // marking end of stream
queryTaskExecutor.shutdownNow(); // Shutdown the thread pool
}
};

Expand Down Expand Up @@ -856,6 +888,7 @@ private void processRows(

if (Thread.currentThread().isInterrupted()
|| queryTaskExecutor.isShutdown()) { // do not process and shutdown
markLast(buffer); // puts an isLast Row in the buffer for ResultSet to process
break; // exit the loop, root will be cleared in the finally block
}

Expand All @@ -869,9 +902,7 @@ private void processRows(
}
buffer.put(new BigQueryResultImpl.Row(curRow));
}
root.clear(); // TODO: make sure to clear the root while implementing the thread
// interruption logic (Connection.close method)

root.clear();
} catch (RuntimeException | InterruptedException e) {
throw BigQueryException.translateAndThrow(e);
} finally {
Expand Down

0 comments on commit db3daac

Please sign in to comment.