Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix for flaky connection close issue #2034

Merged
merged 2 commits into from May 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -106,18 +106,27 @@ public ResultSet getResultSet() {
}

private class BigQueryResultSet extends AbstractJdbcResultSet {
private boolean hasReachedEnd =
false; // flag which will be set to true when we have encountered a EndOfStream or when
// curTup.isLast(). Ref: https://github.com/googleapis/java-bigquery/issues/2033

@Override
/*Advances the result set to the next row, returning false if no such row exists. Potentially blocking operation*/
public boolean next() throws SQLException {
if (hasReachedEnd) { // if end of stream is reached then we can simply return false
return false;
}
try {
cursor = buffer.take(); // advance the cursor,Potentially blocking operation
if (isEndOfStream(cursor)) { // check for end of stream
cursor = null;
hasReachedEnd = true;
return false;
} else if (cursor instanceof Row) {
Row curTup = (Row) cursor;
if (curTup.isLast()) { // last Tuple
cursor = null;
hasReachedEnd = true;
return false;
}
return true;
Expand Down
Expand Up @@ -20,6 +20,7 @@
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;

import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.services.bigquery.model.GetQueryResultsResponse;
import com.google.api.services.bigquery.model.JobConfigurationQuery;
import com.google.api.services.bigquery.model.QueryParameter;
Expand Down Expand Up @@ -496,6 +497,38 @@ void parseRpcDataAsync(
queryTaskExecutor.execute(parseDataTask);
}

/**
* This method is called when the current thread is interrupted, this communicates to ResultSet by
* adding a EoS
*
* @param buffer
*/
@InternalApi
void markEoS(BlockingQueue<AbstractList<FieldValue>> buffer) { // package-private
try {
buffer.put(new EndOfFieldValueList()); // All the pages has been processed, put this marker
} catch (InterruptedException e) {
logger.log(Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted", e);
}
}

/**
* This method is called when the current thread is interrupted, this communicates to ResultSet by
* adding a isLast Row
*
* @param buffer
*/
@InternalApi
void markLast(BlockingQueue<BigQueryResultImpl.Row> buffer) { // package-private
try {
buffer.put(
new BigQueryResultImpl.Row(
null, true)); // All the pages has been processed, put this marker
} catch (InterruptedException e) {
logger.log(Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted", e);
}
}

@VisibleForTesting
void populateBufferAsync(
BlockingQueue<Tuple<TableDataList, Boolean>> rpcResponseQueue,
Expand All @@ -516,18 +549,25 @@ void populateBufferAsync(
"\n" + Thread.currentThread().getName() + " Interrupted",
e); // Thread might get interrupted while calling the Cancel method, which is
// expected, so logging this instead of throwing the exception back
markEoS(
buffer); // Thread has been interrupted, communicate to ResultSet by adding EoS
}

if (Thread.currentThread().isInterrupted()
|| fieldValueLists
== null) { // do not process further pages and shutdown (outerloop)
markEoS(
buffer); // Thread has been interrupted, communicate to ResultSet by adding EoS
break;
}

for (FieldValueList fieldValueList : fieldValueLists) {
try {
if (Thread.currentThread()
.isInterrupted()) { // do not process further pages and shutdown (inner loop)
markEoS(
buffer); // Thread has been interrupted, communicate to ResultSet by adding
// EoS
break;
}
buffer.put(fieldValueList);
Expand All @@ -537,19 +577,15 @@ void populateBufferAsync(
}
}

if (Thread.currentThread()
.isInterrupted()) { // clear the buffer for any outstanding records
buffer.clear();
rpcResponseQueue
.clear(); // IMP - so that if it's full then it unblocks and the interrupt logic
// could trigger
}

try {
buffer.put(
new EndOfFieldValueList()); // All the pages has been processed, put this marker
} catch (InterruptedException e) {
throw new BigQueryException(0, e.getMessage(), e);
if (Thread.currentThread()
.isInterrupted()) { // clear the buffer for any outstanding records
rpcResponseQueue
.clear(); // IMP - so that if it's full then it unblocks and the interrupt logic
// could trigger
buffer.clear();
}
markEoS(buffer); // All the pages has been processed, put this marker
} finally {
queryTaskExecutor.shutdownNow(); // Shutdown the thread pool
}
Expand Down Expand Up @@ -790,12 +826,8 @@ private void processArrowStreamAsync(
} catch (Exception e) {
throw BigQueryException.translateAndThrow(e);
} finally {
try {
buffer.put(new BigQueryResultImpl.Row(null, true)); // marking end of stream
queryTaskExecutor.shutdownNow(); // Shutdown the thread pool
} catch (InterruptedException e) {
logger.log(Level.WARNING, "\n Error occurred ", e);
}
markLast(buffer); // marking end of stream
queryTaskExecutor.shutdownNow(); // Shutdown the thread pool
}
};

Expand Down Expand Up @@ -856,6 +888,7 @@ private void processRows(

if (Thread.currentThread().isInterrupted()
|| queryTaskExecutor.isShutdown()) { // do not process and shutdown
markLast(buffer); // puts an isLast Row in the buffer for ResultSet to process
break; // exit the loop, root will be cleared in the finally block
}

Expand All @@ -869,9 +902,7 @@ private void processRows(
}
buffer.put(new BigQueryResultImpl.Row(curRow));
}
root.clear(); // TODO: make sure to clear the root while implementing the thread
// interruption logic (Connection.close method)

root.clear();
} catch (RuntimeException | InterruptedException e) {
throw BigQueryException.translateAndThrow(e);
} finally {
Expand Down