From 2f4df5c48e0202f2888435aeb4862e77725698fb Mon Sep 17 00:00:00 2001 From: Jiangtao Li Date: Tue, 17 Nov 2020 17:36:09 -0800 Subject: [PATCH] alts: create handshaker RPC lazily (#7630) * alts: create handshaker RPC lazily * alts: address review comments --- .../alts/internal/AltsHandshakerStub.java | 24 +++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/alts/src/main/java/io/grpc/alts/internal/AltsHandshakerStub.java b/alts/src/main/java/io/grpc/alts/internal/AltsHandshakerStub.java index 7a6178459926..61d9fd2f8947 100644 --- a/alts/src/main/java/io/grpc/alts/internal/AltsHandshakerStub.java +++ b/alts/src/main/java/io/grpc/alts/internal/AltsHandshakerStub.java @@ -29,7 +29,8 @@ /** An interface to the ALTS handshaker service. */ class AltsHandshakerStub { private final StreamObserver reader = new Reader(); - private final StreamObserver writer; + private StreamObserver writer; + private final HandshakerServiceStub serviceStub; private final ArrayBlockingQueue> responseQueue = new ArrayBlockingQueue<>(1); private final AtomicReference exceptionMessage = new AtomicReference<>(); @@ -37,20 +38,18 @@ class AltsHandshakerStub { private static final long HANDSHAKE_RPC_DEADLINE_SECS = 20; AltsHandshakerStub(HandshakerServiceStub serviceStub) { - this.writer = - serviceStub - .withDeadlineAfter(HANDSHAKE_RPC_DEADLINE_SECS, SECONDS) - .doHandshake(this.reader); + this.serviceStub = serviceStub; } @VisibleForTesting AltsHandshakerStub() { - writer = null; + serviceStub = null; } @VisibleForTesting AltsHandshakerStub(StreamObserver writer) { this.writer = writer; + serviceStub = null; } @VisibleForTesting @@ -60,6 +59,7 @@ StreamObserver getReaderForTest() { /** Send a handshaker request and return the handshaker response. */ public HandshakerResp send(HandshakerReq req) throws InterruptedException, IOException { + createWriterIfNull(); maybeThrowIoException(); if (!responseQueue.isEmpty()) { throw new IOException("Received an unexpected response."); @@ -72,6 +72,14 @@ public HandshakerResp send(HandshakerReq req) throws InterruptedException, IOExc return result.get(); } + /** Create a new writer if the writer is null. */ + private void createWriterIfNull() { + if (writer == null) { + writer = + serviceStub.withDeadlineAfter(HANDSHAKE_RPC_DEADLINE_SECS, SECONDS).doHandshake(reader); + } + } + /** Throw exception if there is an outstanding exception. */ private void maybeThrowIoException() throws IOException { if (exceptionMessage.get() != null) { @@ -81,7 +89,9 @@ private void maybeThrowIoException() throws IOException { /** Close the connection. */ public void close() { - writer.onCompleted(); + if (writer != null) { + writer.onCompleted(); + } } private class Reader implements StreamObserver {