Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue: CallOptions is not thread-safe. #9689

Merged
merged 9 commits into from
Nov 17, 2022
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
184 changes: 127 additions & 57 deletions api/src/main/java/io/grpc/CallOptions.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015 The gRPC Authors
* Copyright 2015, 2022 The gRPC Authors
pandaapo marked this conversation as resolved.
Show resolved Hide resolved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@

import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;

pandaapo marked this conversation as resolved.
Show resolved Hide resolved
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -28,55 +29,129 @@
import java.util.concurrent.TimeUnit;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

/**
* The collection of runtime options for a new RPC call.
*
* <p>A field that is not set is {@code null}.
*/
@Immutable
pandaapo marked this conversation as resolved.
Show resolved Hide resolved
@CheckReturnValue
public final class CallOptions {
/**
* A blank {@code CallOptions} that all fields are not set.
*/
public static final CallOptions DEFAULT = new CallOptions();
public static final CallOptions DEFAULT = new Builder().build();

// Although {@code CallOptions} is immutable, its fields are not final, so that we can initialize
// them outside of constructor. Otherwise the constructor will have a potentially long list of
// unnamed arguments, which is undesirable.
@Nullable
private Deadline deadline;
private final Deadline deadline;

@Nullable
private Executor executor;
private final Executor executor;

@Nullable
private String authority;
private final String authority;

@Nullable
private CallCredentials credentials;
private final CallCredentials credentials;

@Nullable
private String compressorName;
private final String compressorName;

private Object[][] customOptions;
private final Object[][] customOptions;

// Unmodifiable list
private List<ClientStreamTracer.Factory> streamTracerFactories = Collections.emptyList();
private final List<ClientStreamTracer.Factory> streamTracerFactories;

/**
* Opposite to fail fast.
*/
@Nullable
private Boolean waitForReady;
private final Boolean waitForReady;

@Nullable
private Integer maxInboundMessageSize;
private final Integer maxInboundMessageSize;
@Nullable
private Integer maxOutboundMessageSize;
private final Integer maxOutboundMessageSize;

private CallOptions(Builder builder) {
this.deadline = builder.deadline;
this.executor = builder.executor;
this.authority = builder.authority;
this.credentials = builder.credentials;
this.compressorName = builder.compressorName;
this.customOptions = builder.customOptions;
this.streamTracerFactories = builder.streamTracerFactories;
this.waitForReady = builder.waitForReady;
this.maxInboundMessageSize = builder.maxInboundMessageSize;
this.maxOutboundMessageSize = builder.maxOutboundMessageSize;
}

private static class Builder {
pandaapo marked this conversation as resolved.
Show resolved Hide resolved
private Deadline deadline;
private Executor executor;
private String authority;
private CallCredentials credentials;
private String compressorName;
private Object[][] customOptions = new Object[0][2];
// Unmodifiable list
private List<ClientStreamTracer.Factory> streamTracerFactories = Collections.emptyList();
pandaapo marked this conversation as resolved.
Show resolved Hide resolved
private Boolean waitForReady;
private Integer maxInboundMessageSize;
private Integer maxOutboundMessageSize;

private Builder deadline(Deadline deadline) {
this.deadline = deadline;
return this;
}

private Builder executor(Executor executor) {
this.executor = executor;
return this;
}

private Builder authority(String authority) {
this.authority = authority;
return this;
}

private Builder credentials(CallCredentials credentials) {
this.credentials = credentials;
return this;
}

private Builder compressorName(String compressorName) {
this.compressorName = compressorName;
return this;
}

private Builder customOptions(Object[][] customOptions) {
this.customOptions = customOptions;
return this;
}

private Builder waitForReady(Boolean waitForReady) {
this.waitForReady = waitForReady;
return this;
}

private Builder maxInboundMessageSize(Integer maxInboundMessageSize) {
this.maxInboundMessageSize = maxInboundMessageSize;
return this;
}

private Builder maxOutboundMessageSize(Integer maxOutboundMessageSize) {
this.maxOutboundMessageSize = maxOutboundMessageSize;
return this;
}

private Builder streamTracerFactories(List<ClientStreamTracer.Factory> streamTracerFactories) {
this.streamTracerFactories = streamTracerFactories;
return this;
}

private CallOptions build() {
return new CallOptions(this);
}
}

/**
* Override the HTTP/2 authority the channel claims to be connecting to. <em>This is not
Expand All @@ -89,17 +164,15 @@ public final class CallOptions {
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1767")
public CallOptions withAuthority(@Nullable String authority) {
CallOptions newOptions = new CallOptions(this);
newOptions.authority = authority;
CallOptions newOptions = fullBuild(this).authority(authority).build();
pandaapo marked this conversation as resolved.
Show resolved Hide resolved
return newOptions;
}

/**
* Returns a new {@code CallOptions} with the given call credentials.
*/
public CallOptions withCallCredentials(@Nullable CallCredentials credentials) {
CallOptions newOptions = new CallOptions(this);
newOptions.credentials = credentials;
CallOptions newOptions = fullBuild(this).credentials(credentials).build();
return newOptions;
}

Expand All @@ -113,8 +186,7 @@ public CallOptions withCallCredentials(@Nullable CallCredentials credentials) {
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1704")
public CallOptions withCompression(@Nullable String compressorName) {
CallOptions newOptions = new CallOptions(this);
newOptions.compressorName = compressorName;
CallOptions newOptions = fullBuild(this).compressorName(compressorName).build();
return newOptions;
}

Expand All @@ -127,8 +199,7 @@ public CallOptions withCompression(@Nullable String compressorName) {
* @param deadline the deadline or {@code null} for unsetting the deadline.
*/
public CallOptions withDeadline(@Nullable Deadline deadline) {
CallOptions newOptions = new CallOptions(this);
newOptions.deadline = deadline;
CallOptions newOptions = fullBuild(this).deadline(deadline).build();
return newOptions;
}

Expand Down Expand Up @@ -156,8 +227,7 @@ public Deadline getDeadline() {
* fails RPCs without sending them if unable to connect.
*/
public CallOptions withWaitForReady() {
CallOptions newOptions = new CallOptions(this);
newOptions.waitForReady = Boolean.TRUE;
CallOptions newOptions = fullBuild(this).waitForReady(Boolean.TRUE).build();
return newOptions;
}

Expand All @@ -166,8 +236,7 @@ public CallOptions withWaitForReady() {
* This method should be rarely used because the default is without 'wait for ready'.
*/
public CallOptions withoutWaitForReady() {
CallOptions newOptions = new CallOptions(this);
newOptions.waitForReady = Boolean.FALSE;
CallOptions newOptions = new Builder().waitForReady(Boolean.FALSE).build();
return newOptions;
}

Expand Down Expand Up @@ -208,8 +277,7 @@ public CallCredentials getCredentials() {
* executor specified with {@link ManagedChannelBuilder#executor}.
*/
public CallOptions withExecutor(@Nullable Executor executor) {
CallOptions newOptions = new CallOptions(this);
newOptions.executor = executor;
CallOptions newOptions = fullBuild(this).executor(executor).build();
return newOptions;
}

Expand All @@ -221,12 +289,13 @@ public CallOptions withExecutor(@Nullable Executor executor) {
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/2861")
public CallOptions withStreamTracerFactory(ClientStreamTracer.Factory factory) {
CallOptions newOptions = new CallOptions(this);
ArrayList<ClientStreamTracer.Factory> newList =
new ArrayList<>(streamTracerFactories.size() + 1);
newList.addAll(streamTracerFactories);
newList.add(factory);
newOptions.streamTracerFactories = Collections.unmodifiableList(newList);
CallOptions newOptions = fullBuild(this)
.streamTracerFactories(Collections.unmodifiableList(newList))
.build();
return newOptions;
}

Expand Down Expand Up @@ -319,7 +388,7 @@ public <T> CallOptions withOption(Key<T> key, T value) {
Preconditions.checkNotNull(key, "key");
Preconditions.checkNotNull(value, "value");

CallOptions newOptions = new CallOptions(this);
Builder builder = fullBuild(this);
int existingIdx = -1;
for (int i = 0; i < customOptions.length; i++) {
if (key.equals(customOptions[i][0])) {
Expand All @@ -328,7 +397,9 @@ public <T> CallOptions withOption(Key<T> key, T value) {
}
}

newOptions.customOptions = new Object[customOptions.length + (existingIdx == -1 ? 1 : 0)][2];
CallOptions newOptions = builder
.customOptions(new Object[customOptions.length + (existingIdx == -1 ? 1 : 0)][2])
.build();
System.arraycopy(customOptions, 0, newOptions.customOptions, 0, customOptions.length);

if (existingIdx == -1) {
Expand Down Expand Up @@ -368,10 +439,6 @@ public Executor getExecutor() {
return executor;
}

private CallOptions() {
customOptions = new Object[0][2];
}

/**
* Returns whether <a href="https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md">
* 'wait for ready'</a> option is enabled for the call. 'Fail fast' is the default option for gRPC
Expand All @@ -392,8 +459,9 @@ Boolean getWaitForReady() {
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/2563")
public CallOptions withMaxInboundMessageSize(int maxSize) {
checkArgument(maxSize >= 0, "invalid maxsize %s", maxSize);
CallOptions newOptions = new CallOptions(this);
newOptions.maxInboundMessageSize = maxSize;
CallOptions newOptions = fullBuild(this)
.maxInboundMessageSize(maxSize)
.build();
return newOptions;
}

Expand All @@ -403,8 +471,9 @@ public CallOptions withMaxInboundMessageSize(int maxSize) {
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/2563")
public CallOptions withMaxOutboundMessageSize(int maxSize) {
checkArgument(maxSize >= 0, "invalid maxsize %s", maxSize);
CallOptions newOptions = new CallOptions(this);
newOptions.maxOutboundMessageSize = maxSize;
CallOptions newOptions = fullBuild(this)
.maxOutboundMessageSize(maxSize)
.build();
return newOptions;
pandaapo marked this conversation as resolved.
Show resolved Hide resolved
}

Expand All @@ -427,19 +496,20 @@ public Integer getMaxOutboundMessageSize() {
}

/**
* Copy constructor.
* Copy CallOptions.
*/
private CallOptions(CallOptions other) {
deadline = other.deadline;
authority = other.authority;
credentials = other.credentials;
executor = other.executor;
compressorName = other.compressorName;
customOptions = other.customOptions;
waitForReady = other.waitForReady;
maxInboundMessageSize = other.maxInboundMessageSize;
maxOutboundMessageSize = other.maxOutboundMessageSize;
streamTracerFactories = other.streamTracerFactories;
private Builder fullBuild(CallOptions other) {
pandaapo marked this conversation as resolved.
Show resolved Hide resolved
return new Builder()
.deadline(other.deadline)
.executor(other.executor)
.authority(other.authority)
.credentials(other.credentials)
.compressorName(other.compressorName)
.customOptions(other.customOptions)
.streamTracerFactories(other.streamTracerFactories)
.waitForReady(other.waitForReady)
.maxInboundMessageSize(other.maxInboundMessageSize)
.maxOutboundMessageSize(other.maxOutboundMessageSize);
}

@Override
Expand Down