diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java index cc330a90a..be5174b26 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java @@ -87,7 +87,11 @@ class ConnectionImpl implements Connection { Executors.newFixedThreadPool(MAX_PROCESS_QUERY_THREADS_CNT); private final Logger logger = Logger.getLogger(this.getClass().getName()); private BigQueryReadClient bqReadClient; - private static final long EXECUTOR_TIMEOUT_SEC = 5; + private static final long EXECUTOR_TIMEOUT_SEC = 10; + private BlockingQueue> + bufferFvl; // initialized lazily iff we end up using the tabledata.list end point + private BlockingQueue + bufferRow; // initialized lazily iff we end up using Read API ConnectionImpl( ConnectionSettings connectionSettings, @@ -107,6 +111,19 @@ class ConnectionImpl implements Connection { : Math.min(connectionSettings.getNumBufferedRows() * 2, 100000)); } + /** + * This method returns the number of records to be stored in the buffer and it ensures that it is + * between a reasonable range + * + * @return The max number of records to be stored in the buffer + */ + private int getBufferSize() { + return (connectionSettings == null + || connectionSettings.getNumBufferedRows() == null + || connectionSettings.getNumBufferedRows() < 10000 + ? 20000 + : Math.min(connectionSettings.getNumBufferedRows() * 2, 100000)); + } /** * Cancel method shutdowns the pageFetcher and producerWorker threads gracefully using interrupt. * The pageFetcher threat will not request for any subsequent threads after interrupting and @@ -119,12 +136,14 @@ class ConnectionImpl implements Connection { @BetaApi @Override public synchronized boolean close() throws BigQuerySQLException { + flagEndOfStream(); // an End of Stream flag in the buffer so that the `ResultSet.next()` stops + // advancing the cursor queryTaskExecutor.shutdownNow(); try { - queryTaskExecutor.awaitTermination( - EXECUTOR_TIMEOUT_SEC, TimeUnit.SECONDS); // wait for the executor shutdown + if (queryTaskExecutor.awaitTermination(EXECUTOR_TIMEOUT_SEC, TimeUnit.SECONDS)) { + return true; + } // else queryTaskExecutor.isShutdown() will be returned outside this try block } catch (InterruptedException e) { - e.printStackTrace(); logger.log( Level.WARNING, "\n" + Thread.currentThread().getName() + " Exception while awaitTermination", @@ -330,7 +349,7 @@ BigQueryResult tableDataList(GetQueryResultsResponse firstPage, JobId jobId) { BigQueryResultStats bigQueryResultStats = getBigQueryResultSetStats(jobId); // Keeps the deserialized records at the row level, which is consumed by BigQueryResult - BlockingQueue> buffer = new LinkedBlockingDeque<>(bufferSize); + bufferFvl = new LinkedBlockingDeque<>(getBufferSize()); // Keeps the parsed FieldValueLists BlockingQueue, Boolean>> pageCache = @@ -352,11 +371,11 @@ BigQueryResult tableDataList(GetQueryResultsResponse firstPage, JobId jobId) { // throughput populateBufferAsync( - rpcResponseQueue, pageCache, buffer); // spawns a thread to populate the buffer + rpcResponseQueue, pageCache, bufferFvl); // spawns a thread to populate the buffer // This will work for pagination as well, as buffer is getting updated asynchronously return new BigQueryResultImpl>( - schema, numRows, buffer, bigQueryResultStats); + schema, numRows, bufferFvl, bigQueryResultStats); } @VisibleForTesting @@ -384,7 +403,7 @@ BigQueryResult processQueryResponseResults( BigQueryResultStats bigQueryResultStats = new BigQueryResultStatsImpl(queryStatistics, sessionInfo); - BlockingQueue> buffer = new LinkedBlockingDeque<>(bufferSize); + bufferFvl = new LinkedBlockingDeque<>(getBufferSize()); BlockingQueue, Boolean>> pageCache = new LinkedBlockingDeque<>( getPageCacheSize(connectionSettings.getNumBufferedRows(), schema)); @@ -401,10 +420,10 @@ BigQueryResult processQueryResponseResults( parseRpcDataAsync(results.getRows(), schema, pageCache, rpcResponseQueue); // Thread to populate the buffer (a blocking queue) shared with the consumer - populateBufferAsync(rpcResponseQueue, pageCache, buffer); + populateBufferAsync(rpcResponseQueue, pageCache, bufferFvl); return new BigQueryResultImpl>( - schema, numRows, buffer, bigQueryResultStats); + schema, numRows, bufferFvl, bigQueryResultStats); } @VisibleForTesting @@ -420,6 +439,11 @@ void runNextPageTaskAsync( while (pageToken != null) { // paginate for non null token if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { // do not process further pages and shutdown + logger.log( + Level.WARNING, + "\n" + + Thread.currentThread().getName() + + " Interrupted @ runNextPageTaskAsync"); break; } TableDataList tabledataList = tableDataListRpc(destinationTable, pageToken); @@ -432,12 +456,12 @@ void runNextPageTaskAsync( } rpcResponseQueue.put( Tuple.of( - null, - false)); // this will stop the parseDataTask as well in case of interrupt or - // when the pagination completes + null, false)); // this will stop the parseDataTask as well when the pagination + // completes } catch (Exception e) { throw new BigQueryException(0, e.getMessage(), e); - } + } // We cannot do queryTaskExecutor.shutdownNow() here as populate buffer method may not + // have finished processing the records and even that will be interrupted }; queryTaskExecutor.execute(nextPageTask); } @@ -460,7 +484,9 @@ void parseRpcDataAsync( pageCache.put( Tuple.of(firstFieldValueLists, true)); // this is the first page which we have received. } catch (InterruptedException e) { - throw new BigQueryException(0, e.getMessage(), e); + logger.log( + Level.WARNING, + "\n" + Thread.currentThread().getName() + " Interrupted @ parseRpcDataAsync"); } // rpcResponseQueue will get null tuple if Cancel method is called, so no need to explicitly use @@ -470,6 +496,14 @@ void parseRpcDataAsync( try { boolean hasMorePages = true; while (hasMorePages) { + if (Thread.currentThread().isInterrupted() + || queryTaskExecutor.isShutdown()) { // do not process further data and shutdown + logger.log( + Level.WARNING, + "\n" + Thread.currentThread().getName() + " Interrupted @ parseRpcDataAsync"); + break; + } + // no interrupt received till this point, continue processing Tuple rpcResponse = rpcResponseQueue.take(); TableDataList tabledataList = rpcResponse.x(); hasMorePages = rpcResponse.y(); @@ -482,55 +516,24 @@ void parseRpcDataAsync( } catch (InterruptedException e) { logger.log( Level.WARNING, - "\n" + Thread.currentThread().getName() + " Interrupted", + "\n" + Thread.currentThread().getName() + " Interrupted @ parseRpcDataAsync", e); // Thread might get interrupted while calling the Cancel method, which is // expected, so logging this instead of throwing the exception back } try { - pageCache.put(Tuple.of(null, false)); // no further pages + pageCache.put(Tuple.of(null, false)); // no further pages, graceful exit scenario } catch (InterruptedException e) { logger.log( Level.WARNING, - "\n" + Thread.currentThread().getName() + " Interrupted", + "\n" + Thread.currentThread().getName() + " Interrupted @ parseRpcDataAsync", e); // Thread might get interrupted while calling the Cancel method, which is // expected, so logging this instead of throwing the exception back - } + } // We cannot do queryTaskExecutor.shutdownNow() here as populate buffer method may not + // have finished processing the records and even that will be interrupted }; queryTaskExecutor.execute(parseDataTask); } - /** - * This method is called when the current thread is interrupted, this communicates to ResultSet by - * adding a EoS - * - * @param buffer - */ - @InternalApi - void markEoS(BlockingQueue> buffer) { // package-private - try { - buffer.put(new EndOfFieldValueList()); // All the pages has been processed, put this marker - } catch (InterruptedException e) { - logger.log(Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted", e); - } - } - - /** - * This method is called when the current thread is interrupted, this communicates to ResultSet by - * adding a isLast Row - * - * @param buffer - */ - @InternalApi - void markLast(BlockingQueue buffer) { // package-private - try { - buffer.put( - new BigQueryResultImpl.Row( - null, true)); // All the pages has been processed, put this marker - } catch (InterruptedException e) { - logger.log(Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted", e); - } - } - @VisibleForTesting void populateBufferAsync( BlockingQueue> rpcResponseQueue, @@ -551,25 +554,21 @@ void populateBufferAsync( "\n" + Thread.currentThread().getName() + " Interrupted", e); // Thread might get interrupted while calling the Cancel method, which is // expected, so logging this instead of throwing the exception back - markEoS( - buffer); // Thread has been interrupted, communicate to ResultSet by adding EoS + break; } if (Thread.currentThread().isInterrupted() + || queryTaskExecutor.isShutdown() || fieldValueLists == null) { // do not process further pages and shutdown (outerloop) - markEoS( - buffer); // Thread has been interrupted, communicate to ResultSet by adding EoS break; } for (FieldValueList fieldValueList : fieldValueLists) { try { - if (Thread.currentThread() - .isInterrupted()) { // do not process further pages and shutdown (inner loop) - markEoS( - buffer); // Thread has been interrupted, communicate to ResultSet by adding - // EoS + if (Thread.currentThread().isInterrupted() + || queryTaskExecutor + .isShutdown()) { // do not process further pages and shutdown (inner loop) break; } buffer.put(fieldValueList); @@ -578,24 +577,55 @@ void populateBufferAsync( } } } - try { - if (Thread.currentThread() - .isInterrupted()) { // clear the buffer for any outstanding records - rpcResponseQueue - .clear(); // IMP - so that if it's full then it unblocks and the interrupt logic - // could trigger - buffer.clear(); - } - markEoS(buffer); // All the pages has been processed, put this marker + buffer.put( + new EndOfFieldValueList()); // All the pages has been processed, put this marker + } catch (InterruptedException e) { + logger.log( + Level.WARNING, + "\n" + Thread.currentThread().getName() + " Interrupted @ populateBufferAsync", + e); } finally { - queryTaskExecutor.shutdownNow(); // Shutdown the thread pool + queryTaskExecutor + .shutdownNow(); // Shutdown the thread pool. All the records are now processed } }; queryTaskExecutor.execute(populateBufferRunnable); } + /** + * In an interrupt scenario, like when the background threads are still working and the user calls + * `connection.close() then we need to add an End of Stream flag in the buffer so that the + * `ResultSet.next()` stops advancing the cursor. We cannot rely on the `populateBufferAsync` + * method to do this as the `BlockingQueue.put()` call will error out after the interrupt is + * triggerred + */ + @InternalApi + void flagEndOfStream() { // package-private + try { + if (bufferFvl != null) { // that is tabledata.list endpoint is used + bufferFvl.put( + new EndOfFieldValueList()); // All the pages has been processed, put this marker + } else if (bufferRow != null) { + bufferRow.put( + new BigQueryResultImpl.Row( + null, true)); // All the pages has been processed, put this marker + } else { + logger.log( + Level.WARNING, + "\n" + + Thread.currentThread().getName() + + " Could not flag End of Stream, both the buffer types are null. This might happen when the connection is close without executing a query"); + } + } catch (InterruptedException e) { + logger.log( + Level.WARNING, + "\n" + Thread.currentThread().getName() + " Interrupted @ flagEndOfStream", + e); + } + } + /* Helper method that parse and populate a page with TableRows */ private static Iterable getIterableFieldValueList( Iterable tableDataPb, final Schema schema) { @@ -783,17 +813,17 @@ BigQueryResult highThroughPutRead( ; ReadSession readSession = bqReadClient.createReadSession(builder.build()); - BlockingQueue buffer = new LinkedBlockingDeque<>(bufferSize); + bufferRow = new LinkedBlockingDeque<>(getBufferSize()); Map arrowNameToIndex = new HashMap<>(); // deserialize and populate the buffer async, so that the client isn't blocked processArrowStreamAsync( readSession, - buffer, + bufferRow, new ArrowRowReader(readSession.getArrowSchema(), arrowNameToIndex), schema); logger.log(Level.INFO, "\n Using BigQuery Read API"); - return new BigQueryResultImpl(schema, totalRows, buffer, stats); + return new BigQueryResultImpl(schema, totalRows, bufferRow, stats); } catch (IOException e) { throw BigQueryException.translateAndThrow(e); @@ -827,8 +857,18 @@ private void processArrowStreamAsync( } catch (Exception e) { throw BigQueryException.translateAndThrow(e); - } finally { - markLast(buffer); // marking end of stream + } finally { // logic needed for graceful shutdown + // marking end of stream + try { + buffer.put( + new BigQueryResultImpl.Row( + null, true)); // All the pages has been processed, put this marker + } catch (InterruptedException e) { + logger.log( + Level.WARNING, + "\n" + Thread.currentThread().getName() + " Interrupted @ markLast", + e); + } queryTaskExecutor.shutdownNow(); // Shutdown the thread pool } }; @@ -890,7 +930,6 @@ private void processRows( if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { // do not process and shutdown - markLast(buffer); // puts an isLast Row in the buffer for ResultSet to process break; // exit the loop, root will be cleared in the finally block } @@ -981,9 +1020,6 @@ boolean isFastQuerySupported() { @VisibleForTesting boolean useReadAPI(Long totalRows, Long pageRows, Schema schema, Boolean hasQueryParameters) { - - // TODO(prasmish) get this logic review - totalRows and pageRows are returned null when the job - // is not complete if ((totalRows == null || pageRows == null) && Boolean.TRUE.equals( connectionSettings @@ -992,7 +1028,6 @@ boolean useReadAPI(Long totalRows, Long pageRows, Schema schema, Boolean hasQuer return true; } - // Schema schema = Schema.fromPb(tableSchema); // Read API does not yet support Interval Type or QueryParameters if (containsIntervalType(schema) || hasQueryParameters) { logger.log(Level.INFO, "\n Schema has IntervalType, or QueryParameters. Disabling ReadAPI"); diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java index 348749b46..dcfa5265f 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java @@ -2652,12 +2652,11 @@ public void testConnectionClose() throws SQLException { int cnt = 0; while (rs.next()) { ++cnt; - if (cnt > 57000) { // breaking at 57K, query reads 300K + if (cnt == 57000) { // breaking at 57000th record, query reads 300K assertTrue(connection.close()); // we should be able to cancel the connection } } - assertTrue( - cnt < 60000); // Few extra records are still read (generally ~10) even after canceling, as + assertTrue(cnt < 100000); // Extra records are still read even after canceling, as // the backgrounds threads are still active while the interrupt occurs and the // buffer and pageCache are cleared } diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITNightlyBigQueryTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITNightlyBigQueryTest.java index d672967b1..deabba59f 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITNightlyBigQueryTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITNightlyBigQueryTest.java @@ -332,7 +332,31 @@ public void testIterateAndOrderDefaultConnSettings() throws SQLException { ++cnt; } assertEquals(LIMIT_RECS, cnt); // all the records were retrieved - connection.close(); + assertTrue(connection.close()); + } + + /* + This tests interrupts the execution in between and checks if it has been interrupted successfully while using ReadAPI + */ + @Test + public void testConnectionClose() throws SQLException { + Connection connection = bigquery.createConnection(); + BigQueryResult bigQueryResult = connection.executeSelect(QUERY); + logger.log(Level.INFO, "Query used: {0}", QUERY); + ResultSet rs = bigQueryResult.getResultSet(); + int cnt = 0; + while (rs.next()) { + ++cnt; + if (cnt == 50000) { // interrupt at 50K + assertTrue(connection.close()); + } + } + assertTrue( + LIMIT_RECS + > cnt); // we stopped at 50K but still we can expect additional records (typically ~100) + // to be retrieved + // as a number of records should have been already buffered. less than + // LIMIT_RECS should be retrieved } @Test