From db3daacea8a91ab80b7e923f1480874b01cbad0c Mon Sep 17 00:00:00 2001 From: Prashant Mishra <11733935+prash-mi@users.noreply.github.com> Date: Tue, 10 May 2022 19:59:24 +0530 Subject: [PATCH] fix: fix for flaky connection close issue (#2034) * Added hasReachedEnd flag in next() method to avoid blocking * Added and wired markEoS and markLast methods. Minor refactor --- .../cloud/bigquery/BigQueryResultImpl.java | 9 +++ .../google/cloud/bigquery/ConnectionImpl.java | 73 +++++++++++++------ 2 files changed, 61 insertions(+), 21 deletions(-) diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryResultImpl.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryResultImpl.java index 7c24ca0dd..2a2aba7cd 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryResultImpl.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryResultImpl.java @@ -106,18 +106,27 @@ public ResultSet getResultSet() { } private class BigQueryResultSet extends AbstractJdbcResultSet { + private boolean hasReachedEnd = + false; // flag which will be set to true when we have encountered a EndOfStream or when + // curTup.isLast(). Ref: https://github.com/googleapis/java-bigquery/issues/2033 + @Override /*Advances the result set to the next row, returning false if no such row exists. Potentially blocking operation*/ public boolean next() throws SQLException { + if (hasReachedEnd) { // if end of stream is reached then we can simply return false + return false; + } try { cursor = buffer.take(); // advance the cursor,Potentially blocking operation if (isEndOfStream(cursor)) { // check for end of stream cursor = null; + hasReachedEnd = true; return false; } else if (cursor instanceof Row) { Row curTup = (Row) cursor; if (curTup.isLast()) { // last Tuple cursor = null; + hasReachedEnd = true; return false; } return true; diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java index c24a00888..b43615141 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java @@ -20,6 +20,7 @@ import static java.net.HttpURLConnection.HTTP_NOT_FOUND; import com.google.api.core.BetaApi; +import com.google.api.core.InternalApi; import com.google.api.services.bigquery.model.GetQueryResultsResponse; import com.google.api.services.bigquery.model.JobConfigurationQuery; import com.google.api.services.bigquery.model.QueryParameter; @@ -496,6 +497,38 @@ void parseRpcDataAsync( queryTaskExecutor.execute(parseDataTask); } + /** + * This method is called when the current thread is interrupted, this communicates to ResultSet by + * adding a EoS + * + * @param buffer + */ + @InternalApi + void markEoS(BlockingQueue> buffer) { // package-private + try { + buffer.put(new EndOfFieldValueList()); // All the pages has been processed, put this marker + } catch (InterruptedException e) { + logger.log(Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted", e); + } + } + + /** + * This method is called when the current thread is interrupted, this communicates to ResultSet by + * adding a isLast Row + * + * @param buffer + */ + @InternalApi + void markLast(BlockingQueue buffer) { // package-private + try { + buffer.put( + new BigQueryResultImpl.Row( + null, true)); // All the pages has been processed, put this marker + } catch (InterruptedException e) { + logger.log(Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted", e); + } + } + @VisibleForTesting void populateBufferAsync( BlockingQueue> rpcResponseQueue, @@ -516,11 +549,15 @@ void populateBufferAsync( "\n" + Thread.currentThread().getName() + " Interrupted", e); // Thread might get interrupted while calling the Cancel method, which is // expected, so logging this instead of throwing the exception back + markEoS( + buffer); // Thread has been interrupted, communicate to ResultSet by adding EoS } if (Thread.currentThread().isInterrupted() || fieldValueLists == null) { // do not process further pages and shutdown (outerloop) + markEoS( + buffer); // Thread has been interrupted, communicate to ResultSet by adding EoS break; } @@ -528,6 +565,9 @@ void populateBufferAsync( try { if (Thread.currentThread() .isInterrupted()) { // do not process further pages and shutdown (inner loop) + markEoS( + buffer); // Thread has been interrupted, communicate to ResultSet by adding + // EoS break; } buffer.put(fieldValueList); @@ -537,19 +577,15 @@ void populateBufferAsync( } } - if (Thread.currentThread() - .isInterrupted()) { // clear the buffer for any outstanding records - buffer.clear(); - rpcResponseQueue - .clear(); // IMP - so that if it's full then it unblocks and the interrupt logic - // could trigger - } - try { - buffer.put( - new EndOfFieldValueList()); // All the pages has been processed, put this marker - } catch (InterruptedException e) { - throw new BigQueryException(0, e.getMessage(), e); + if (Thread.currentThread() + .isInterrupted()) { // clear the buffer for any outstanding records + rpcResponseQueue + .clear(); // IMP - so that if it's full then it unblocks and the interrupt logic + // could trigger + buffer.clear(); + } + markEoS(buffer); // All the pages has been processed, put this marker } finally { queryTaskExecutor.shutdownNow(); // Shutdown the thread pool } @@ -790,12 +826,8 @@ private void processArrowStreamAsync( } catch (Exception e) { throw BigQueryException.translateAndThrow(e); } finally { - try { - buffer.put(new BigQueryResultImpl.Row(null, true)); // marking end of stream - queryTaskExecutor.shutdownNow(); // Shutdown the thread pool - } catch (InterruptedException e) { - logger.log(Level.WARNING, "\n Error occurred ", e); - } + markLast(buffer); // marking end of stream + queryTaskExecutor.shutdownNow(); // Shutdown the thread pool } }; @@ -856,6 +888,7 @@ private void processRows( if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { // do not process and shutdown + markLast(buffer); // puts an isLast Row in the buffer for ResultSet to process break; // exit the loop, root will be cleared in the finally block } @@ -869,9 +902,7 @@ private void processRows( } buffer.put(new BigQueryResultImpl.Row(curRow)); } - root.clear(); // TODO: make sure to clear the root while implementing the thread - // interruption logic (Connection.close method) - + root.clear(); } catch (RuntimeException | InterruptedException e) { throw BigQueryException.translateAndThrow(e); } finally {