From 44a4e4d8c70bf910f96d2d2c181d1f23b70f3e52 Mon Sep 17 00:00:00 2001 From: Gaole Meng Date: Wed, 30 Nov 2022 10:56:57 -0800 Subject: [PATCH] feat: Change one thread per retry to use a thread pool (#1898) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: Split writer into connection worker and wrapper, this is a prerequisite for multiplexing client * feat: add connection worker pool skeleton, used for multiplexing client * feat: add Load api for connection worker for multiplexing client * feat: add multiplexing support to connection worker. We will treat every new stream name as a switch of destinationt * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: port the multiplexing client core algorithm and basic tests also fixed a tiny bug inside fake bigquery write impl for getting thre response from offset * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: wire multiplexing connection pool to stream writer * feat: some fixes for multiplexing client * feat: fix some todos, and reject the mixed behavior of passed in client or not * feat: fix the bug that we may peek into the write_stream field but it's possible the proto schema does not contain this field * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: fix the bug that we may peek into the write_stream field but it's possible the proto schema does not contain this field * feat: add getInflightWaitSeconds implementation * feat: Add schema comparision in connection loop to ensure schema update for the same stream name can be notified * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: add schema update support to multiplexing * fix: fix windows build bug: windows Instant resolution is different with linux * fix: fix another failing tests for windows build * fix: fix another test failure for Windows build * feat: Change new thread for each retry to be a thread pool to avoid create/tear down too much threads if lots of retries happens * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md Co-authored-by: Owl Bot --- .../bigquerystorage/WriteToDefaultStream.java | 46 ++++++++++--------- 1 file changed, 24 insertions(+), 22 deletions(-) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java index a95388b47f..1913360b8a 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java @@ -41,6 +41,8 @@ import io.grpc.Status.Code; import java.io.IOException; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Phaser; import javax.annotation.concurrent.GuardedBy; import org.json.JSONArray; @@ -193,6 +195,8 @@ static class AppendCompleteCallback implements ApiFutureCallback { - try { - // Since default stream appends are not ordered, we can simply retry the - // appends. - // Retrying with exclusive streams requires more careful consideration. - this.parent.append(appendContext); - } catch (Exception e) { - // Fall through to return error. - System.out.format("Failed to retry append: %s%n", e); - } - }) - .start(); + pool.submit( + () -> { + try { + // Since default stream appends are not ordered, we can simply retry the + // appends. + // Retrying with exclusive streams requires more careful consideration. + this.parent.append(appendContext); + } catch (Exception e) { + // Fall through to return error. + System.out.format("Failed to retry append: %s%n", e); + } + }); // Mark the existing attempt as done since it's being retried. done(); return; @@ -251,15 +254,14 @@ public void onFailure(Throwable throwable) { // Retry the remaining valid rows, but using a separate thread to // avoid potentially blocking while we are in a callback. if (dataNew.length() > 0) { - new Thread( - () -> { - try { - this.parent.append(new AppendContext(dataNew, 0)); - } catch (Exception e2) { - System.out.format("Failed to retry append with filtered rows: %s%n", e2); - } - }) - .start(); + pool.submit( + () -> { + try { + this.parent.append(new AppendContext(dataNew, 0)); + } catch (Exception e2) { + System.out.format("Failed to retry append with filtered rows: %s%n", e2); + } + }); } return; }