Skip to content

Commit

Permalink
core: fix old ClientStreamTracer.Factory creating tracers twice (grpc…
Browse files Browse the repository at this point in the history
…#8381)

Fix a bug introduced in grpc#8355 : old ClientStreamTracer.Factory implementation creates tracers twice.
  • Loading branch information
dapengzhang0 committed Aug 4, 2021
1 parent aeb3720 commit c3b26d3
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 5 deletions.
14 changes: 10 additions & 4 deletions core/src/main/java/io/grpc/internal/GrpcUtil.java
Expand Up @@ -65,7 +65,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -785,15 +784,22 @@ static ClientStreamTracer newClientStreamTracer(
} else {
streamTracer = new ForwardingClientStreamTracer() {
final ClientStreamTracer noop = new ClientStreamTracer() {};
AtomicReference<ClientStreamTracer> delegate = new AtomicReference<>(noop);
volatile ClientStreamTracer delegate = noop;

void maybeInit(StreamInfo info, Metadata headers) {
delegate.compareAndSet(noop, streamTracerFactory.newClientStreamTracer(info, headers));
if (delegate != noop) {
return;
}
synchronized (this) {
if (delegate == noop) {
delegate = streamTracerFactory.newClientStreamTracer(info, headers);
}
}
}

@Override
protected ClientStreamTracer delegate() {
return delegate.get();
return delegate;
}

@SuppressWarnings("deprecation")
Expand Down
9 changes: 8 additions & 1 deletion core/src/test/java/io/grpc/internal/GrpcUtilTest.java
Expand Up @@ -38,6 +38,7 @@
import io.grpc.internal.ClientStreamListener.RpcProgress;
import io.grpc.internal.GrpcUtil.Http2Error;
import io.grpc.testing.TestMethodDescriptors;
import java.util.ArrayDeque;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -301,12 +302,14 @@ public void clientStreamTracerFactoryBackwardCompatibility() {
final AtomicReference<Attributes> transportAttrsRef = new AtomicReference<>();
final ClientStreamTracer mockTracer = mock(ClientStreamTracer.class);
final Metadata.Key<String> key = Metadata.Key.of("fake-key", Metadata.ASCII_STRING_MARSHALLER);
final ArrayDeque<ClientStreamTracer> tracers = new ArrayDeque<>();
ClientStreamTracer.Factory oldFactoryImpl = new ClientStreamTracer.Factory() {
@SuppressWarnings("deprecation")
@Override
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
transportAttrsRef.set(info.getTransportAttrs());
headers.put(key, "fake-value");
tracers.offer(mockTracer);
return mockTracer;
}
};
Expand All @@ -318,8 +321,12 @@ public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata header
Attributes.newBuilder().set(Attributes.Key.<String>create("foo"), "bar").build();
ClientStreamTracer tracer = GrpcUtil.newClientStreamTracer(oldFactoryImpl, info, metadata);
tracer.streamCreated(transAttrs, metadata);

assertThat(tracers.poll()).isSameInstanceAs(mockTracer);
assertThat(transportAttrsRef.get()).isEqualTo(transAttrs);
assertThat(metadata.get(key)).isEqualTo("fake-value");

tracer.streamClosed(Status.UNAVAILABLE);
// verify that newClientStreamTracer() is called no more than once
assertThat(tracers).isEmpty();
}
}

0 comments on commit c3b26d3

Please sign in to comment.