Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

alts: create handshaker RPC lazily #7630

Merged
merged 2 commits into from Nov 18, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 17 additions & 7 deletions alts/src/main/java/io/grpc/alts/internal/AltsHandshakerStub.java
Expand Up @@ -29,28 +29,27 @@
/** An interface to the ALTS handshaker service. */
class AltsHandshakerStub {
private final StreamObserver<HandshakerResp> reader = new Reader();
private final StreamObserver<HandshakerReq> writer;
private StreamObserver<HandshakerReq> writer;
private final HandshakerServiceStub serviceStub;
private final ArrayBlockingQueue<Optional<HandshakerResp>> responseQueue =
new ArrayBlockingQueue<>(1);
private final AtomicReference<String> exceptionMessage = new AtomicReference<>();

private static final long HANDSHAKE_RPC_DEADLINE_SECS = 20;

AltsHandshakerStub(HandshakerServiceStub serviceStub) {
this.writer =
serviceStub
.withDeadlineAfter(HANDSHAKE_RPC_DEADLINE_SECS, SECONDS)
.doHandshake(this.reader);
this.serviceStub = serviceStub;
}

@VisibleForTesting
AltsHandshakerStub() {
writer = null;
serviceStub = null;
}

@VisibleForTesting
AltsHandshakerStub(StreamObserver<HandshakerReq> writer) {
this.writer = writer;
serviceStub = null;
}

@VisibleForTesting
Expand All @@ -60,6 +59,7 @@ StreamObserver<HandshakerResp> getReaderForTest() {

/** Send a handshaker request and return the handshaker response. */
public HandshakerResp send(HandshakerReq req) throws InterruptedException, IOException {
createWriterIfNull();
maybeThrowIoException();
if (!responseQueue.isEmpty()) {
throw new IOException("Received an unexpected response.");
Expand All @@ -72,6 +72,14 @@ public HandshakerResp send(HandshakerReq req) throws InterruptedException, IOExc
return result.get();
}

/** Create a new writer if the writer is null. */
private void createWriterIfNull() {
if (writer == null) {
writer =
serviceStub.withDeadlineAfter(HANDSHAKE_RPC_DEADLINE_SECS, SECONDS).doHandshake(reader);
}
}

/** Throw exception if there is an outstanding exception. */
private void maybeThrowIoException() throws IOException {
if (exceptionMessage.get() != null) {
Expand All @@ -81,7 +89,9 @@ private void maybeThrowIoException() throws IOException {

/** Close the connection. */
public void close() {
writer.onCompleted();
if (writer != null) {
writer.onCompleted();
}
}

private class Reader implements StreamObserver<HandshakerResp> {
Expand Down