Skip to content

Commit

Permalink
alts: create handshaker RPC lazily (grpc#7630)
Browse files Browse the repository at this point in the history
* alts: create handshaker RPC lazily

* alts: address review comments
  • Loading branch information
Jiangtao Li authored and dfawley committed Jan 15, 2021
1 parent cd01c95 commit 2f4df5c
Showing 1 changed file with 17 additions and 7 deletions.
24 changes: 17 additions & 7 deletions alts/src/main/java/io/grpc/alts/internal/AltsHandshakerStub.java
Expand Up @@ -29,28 +29,27 @@
/** An interface to the ALTS handshaker service. */
class AltsHandshakerStub {
private final StreamObserver<HandshakerResp> reader = new Reader();
private final StreamObserver<HandshakerReq> writer;
private StreamObserver<HandshakerReq> writer;
private final HandshakerServiceStub serviceStub;
private final ArrayBlockingQueue<Optional<HandshakerResp>> responseQueue =
new ArrayBlockingQueue<>(1);
private final AtomicReference<String> exceptionMessage = new AtomicReference<>();

private static final long HANDSHAKE_RPC_DEADLINE_SECS = 20;

AltsHandshakerStub(HandshakerServiceStub serviceStub) {
this.writer =
serviceStub
.withDeadlineAfter(HANDSHAKE_RPC_DEADLINE_SECS, SECONDS)
.doHandshake(this.reader);
this.serviceStub = serviceStub;
}

@VisibleForTesting
AltsHandshakerStub() {
writer = null;
serviceStub = null;
}

@VisibleForTesting
AltsHandshakerStub(StreamObserver<HandshakerReq> writer) {
this.writer = writer;
serviceStub = null;
}

@VisibleForTesting
Expand All @@ -60,6 +59,7 @@ StreamObserver<HandshakerResp> getReaderForTest() {

/** Send a handshaker request and return the handshaker response. */
public HandshakerResp send(HandshakerReq req) throws InterruptedException, IOException {
createWriterIfNull();
maybeThrowIoException();
if (!responseQueue.isEmpty()) {
throw new IOException("Received an unexpected response.");
Expand All @@ -72,6 +72,14 @@ public HandshakerResp send(HandshakerReq req) throws InterruptedException, IOExc
return result.get();
}

/** Create a new writer if the writer is null. */
private void createWriterIfNull() {
if (writer == null) {
writer =
serviceStub.withDeadlineAfter(HANDSHAKE_RPC_DEADLINE_SECS, SECONDS).doHandshake(reader);
}
}

/** Throw exception if there is an outstanding exception. */
private void maybeThrowIoException() throws IOException {
if (exceptionMessage.get() != null) {
Expand All @@ -81,7 +89,9 @@ private void maybeThrowIoException() throws IOException {

/** Close the connection. */
public void close() {
writer.onCompleted();
if (writer != null) {
writer.onCompleted();
}
}

private class Reader implements StreamObserver<HandshakerResp> {
Expand Down

0 comments on commit 2f4df5c

Please sign in to comment.