diff --git a/api/src/main/java/io/grpc/ClientStreamTracer.java b/api/src/main/java/io/grpc/ClientStreamTracer.java index cb2f5538e34..55751dd3fde 100644 --- a/api/src/main/java/io/grpc/ClientStreamTracer.java +++ b/api/src/main/java/io/grpc/ClientStreamTracer.java @@ -79,6 +79,13 @@ public void inboundHeaders() { public void inboundTrailers(Metadata trailers) { } + /** + * Information providing context to the call became available. + */ + @Internal + public void addOptionalLabel(String key, String value) { + } + /** * Factory class for {@link ClientStreamTracer}. */ diff --git a/api/src/main/java/io/grpc/LoadBalancer.java b/api/src/main/java/io/grpc/LoadBalancer.java index 128d6589d77..d44fb4e072d 100644 --- a/api/src/main/java/io/grpc/LoadBalancer.java +++ b/api/src/main/java/io/grpc/LoadBalancer.java @@ -490,6 +490,29 @@ public abstract static class PickSubchannelArgs { * @since 1.2.0 */ public abstract MethodDescriptor getMethodDescriptor(); + + /** + * Gets an object that can be informed about what sort of pick was made. + */ + @Internal + public PickDetailsConsumer getPickDetailsConsumer() { + return new PickDetailsConsumer() {}; + } + } + + /** Receives information about the pick being chosen. */ + @Internal + public interface PickDetailsConsumer { + /** + * Optional labels that provide context of how the pick was routed. Particularly helpful for + * per-RPC metrics. + * + * @throws NullPointerException if key or value is {@code null} + */ + default void addOptionalLabel(String key, String value) { + checkNotNull(key, "key"); + checkNotNull(value, "value"); + } } /** diff --git a/api/src/testFixtures/java/io/grpc/PickSubchannelArgsMatcher.java b/api/src/testFixtures/java/io/grpc/PickSubchannelArgsMatcher.java new file mode 100644 index 00000000000..50140649810 --- /dev/null +++ b/api/src/testFixtures/java/io/grpc/PickSubchannelArgsMatcher.java @@ -0,0 +1,59 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc; + +import com.google.common.base.Preconditions; +import io.grpc.CallOptions; +import io.grpc.LoadBalancer.PickSubchannelArgs; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import org.mockito.ArgumentMatcher; +import org.mockito.ArgumentMatchers; + +/** + * Mockito Matcher for {@link PickSubchannelArgs}. + */ +public final class PickSubchannelArgsMatcher implements ArgumentMatcher { + private final MethodDescriptor method; + private final Metadata headers; + private final CallOptions callOptions; + + public PickSubchannelArgsMatcher( + MethodDescriptor method, Metadata headers, CallOptions callOptions) { + this.method = Preconditions.checkNotNull(method, "method"); + this.headers = Preconditions.checkNotNull(headers, "headers"); + this.callOptions = Preconditions.checkNotNull(callOptions, "callOptions"); + } + + @Override + public boolean matches(PickSubchannelArgs args) { + return args != null + && method.equals(args.getMethodDescriptor()) + && headers.equals(args.getHeaders()) + && callOptions.equals(args.getCallOptions()); + } + + @Override + public final String toString() { + return "[method=" + method + " headers=" + headers + " callOptions=" + callOptions + "]"; + } + + public static PickSubchannelArgs eqPickSubchannelArgs( + MethodDescriptor method, Metadata headers, CallOptions callOptions) { + return ArgumentMatchers.argThat(new PickSubchannelArgsMatcher(method, headers, callOptions)); + } +} diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index d71de1f5d53..0be8eec309a 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -137,7 +137,8 @@ public final ClientStream newStream( MethodDescriptor method, Metadata headers, CallOptions callOptions, ClientStreamTracer[] tracers) { try { - PickSubchannelArgs args = new PickSubchannelArgsImpl(method, headers, callOptions); + PickSubchannelArgs args = new PickSubchannelArgsImpl( + method, headers, callOptions, new PickDetailsConsumerImpl(tracers)); SubchannelPicker picker = null; long pickerVersion = -1; while (true) { diff --git a/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java b/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java index 4740a811f3a..b3e9b216de9 100644 --- a/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java +++ b/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java @@ -54,6 +54,11 @@ public void inboundTrailers(Metadata trailers) { delegate().inboundTrailers(trailers); } + @Override + public void addOptionalLabel(String key, String value) { + delegate().addOptionalLabel(key, value); + } + @Override public void streamClosed(Status status) { delegate().streamClosed(status); diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index a25886797e0..241fb216c2c 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -158,6 +158,8 @@ public Result selectConfig(PickSubchannelArgs args) { throw new IllegalStateException("Resolution is pending"); } }; + private static final LoadBalancer.PickDetailsConsumer NOOP_PICK_DETAILS_CONSUMER = + new LoadBalancer.PickDetailsConsumer() {}; private final InternalLogId logId; private final String target; @@ -519,11 +521,11 @@ public ClientStream newStream( final Metadata headers, final Context context) { if (!retryEnabled) { - ClientTransport transport = - getTransport(new PickSubchannelArgsImpl(method, headers, callOptions)); - Context origContext = context.attach(); ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers( callOptions, headers, 0, /* isTransparentRetry= */ false); + ClientTransport transport = getTransport(new PickSubchannelArgsImpl( + method, headers, callOptions, new PickDetailsConsumerImpl(tracers))); + Context origContext = context.attach(); try { return transport.newStream(method, headers, callOptions, tracers); } finally { @@ -566,8 +568,8 @@ ClientStream newSubstream( CallOptions newOptions = callOptions.withStreamTracerFactory(factory); ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers( newOptions, newHeaders, previousAttempts, isTransparentRetry); - ClientTransport transport = - getTransport(new PickSubchannelArgsImpl(method, newHeaders, newOptions)); + ClientTransport transport = getTransport(new PickSubchannelArgsImpl( + method, newHeaders, newOptions, new PickDetailsConsumerImpl(tracers))); Context origContext = context.attach(); try { return transport.newStream(method, newHeaders, newOptions, tracers); @@ -1207,7 +1209,8 @@ protected ClientCall delegate() { @SuppressWarnings("unchecked") @Override public void start(Listener observer, Metadata headers) { - PickSubchannelArgs args = new PickSubchannelArgsImpl(method, headers, callOptions); + PickSubchannelArgs args = + new PickSubchannelArgsImpl(method, headers, callOptions, NOOP_PICK_DETAILS_CONSUMER); InternalConfigSelector.Result result = configSelector.selectConfig(args); Status status = result.getStatus(); if (!status.isOk()) { diff --git a/core/src/main/java/io/grpc/internal/PickDetailsConsumerImpl.java b/core/src/main/java/io/grpc/internal/PickDetailsConsumerImpl.java new file mode 100644 index 00000000000..5c69757afbf --- /dev/null +++ b/core/src/main/java/io/grpc/internal/PickDetailsConsumerImpl.java @@ -0,0 +1,42 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal; + +import com.google.common.base.Preconditions; +import io.grpc.ClientStreamTracer; +import io.grpc.LoadBalancer.PickDetailsConsumer; + +/** + * Adapter for tracers into details consumers. + */ +final class PickDetailsConsumerImpl implements PickDetailsConsumer { + private final ClientStreamTracer[] tracers; + + /** Construct a consumer with unchanging tracers array. */ + public PickDetailsConsumerImpl(ClientStreamTracer[] tracers) { + this.tracers = Preconditions.checkNotNull(tracers, "tracers"); + } + + @Override + public void addOptionalLabel(String key, String value) { + Preconditions.checkNotNull(key, "key"); + Preconditions.checkNotNull(value, "value"); + for (ClientStreamTracer tracer : tracers) { + tracer.addOptionalLabel(key, value); + } + } +} diff --git a/core/src/main/java/io/grpc/internal/PickSubchannelArgsImpl.java b/core/src/main/java/io/grpc/internal/PickSubchannelArgsImpl.java index dd938303dee..c61fcac6f69 100644 --- a/core/src/main/java/io/grpc/internal/PickSubchannelArgsImpl.java +++ b/core/src/main/java/io/grpc/internal/PickSubchannelArgsImpl.java @@ -20,6 +20,7 @@ import com.google.common.base.Objects; import io.grpc.CallOptions; +import io.grpc.LoadBalancer.PickDetailsConsumer; import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.Metadata; import io.grpc.MethodDescriptor; @@ -29,15 +30,18 @@ public final class PickSubchannelArgsImpl extends PickSubchannelArgs { private final CallOptions callOptions; private final Metadata headers; private final MethodDescriptor method; + private final PickDetailsConsumer pickDetailsConsumer; /** * Creates call args object for given method with its call options, metadata. */ public PickSubchannelArgsImpl( - MethodDescriptor method, Metadata headers, CallOptions callOptions) { + MethodDescriptor method, Metadata headers, CallOptions callOptions, + PickDetailsConsumer pickDetailsConsumer) { this.method = checkNotNull(method, "method"); this.headers = checkNotNull(headers, "headers"); this.callOptions = checkNotNull(callOptions, "callOptions"); + this.pickDetailsConsumer = checkNotNull(pickDetailsConsumer, "pickDetailsConsumer"); } @Override @@ -55,6 +59,11 @@ public CallOptions getCallOptions() { return method; } + @Override + public PickDetailsConsumer getPickDetailsConsumer() { + return pickDetailsConsumer; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -66,12 +75,13 @@ public boolean equals(Object o) { PickSubchannelArgsImpl that = (PickSubchannelArgsImpl) o; return Objects.equal(callOptions, that.callOptions) && Objects.equal(headers, that.headers) - && Objects.equal(method, that.method); + && Objects.equal(method, that.method) + && Objects.equal(pickDetailsConsumer, that.pickDetailsConsumer); } @Override public int hashCode() { - return Objects.hashCode(callOptions, headers, method); + return Objects.hashCode(callOptions, headers, method, pickDetailsConsumer); } @Override diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index 4cae565a19e..c7ae8c8b4be 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -17,12 +17,14 @@ package io.grpc.internal; import static com.google.common.truth.Truth.assertThat; +import static io.grpc.PickSubchannelArgsMatcher.eqPickSubchannelArgs; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.doAnswer; @@ -44,6 +46,7 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor.MethodType; +import io.grpc.PickSubchannelArgsMatcher; import io.grpc.Status; import io.grpc.StringMarshaller; import io.grpc.SynchronizationContext; @@ -344,31 +347,31 @@ public void uncaughtException(Thread t, Throwable e) { method, headers, failFastCallOptions, tracers); ff1.start(mock(ClientStreamListener.class)); ff1.halfClose(); - PickSubchannelArgsImpl ff1args = new PickSubchannelArgsImpl(method, headers, + PickSubchannelArgsMatcher ff1args = new PickSubchannelArgsMatcher(method, headers, failFastCallOptions); verify(transportListener).transportInUse(true); DelayedStream ff2 = (DelayedStream) delayedTransport.newStream( method2, headers2, failFastCallOptions, tracers); - PickSubchannelArgsImpl ff2args = new PickSubchannelArgsImpl(method2, headers2, + PickSubchannelArgsMatcher ff2args = new PickSubchannelArgsMatcher(method2, headers2, failFastCallOptions); DelayedStream ff3 = (DelayedStream) delayedTransport.newStream( method, headers, failFastCallOptions, tracers); - PickSubchannelArgsImpl ff3args = new PickSubchannelArgsImpl(method, headers, + PickSubchannelArgsMatcher ff3args = new PickSubchannelArgsMatcher(method, headers, failFastCallOptions); DelayedStream ff4 = (DelayedStream) delayedTransport.newStream( method2, headers2, failFastCallOptions, tracers); - PickSubchannelArgsImpl ff4args = new PickSubchannelArgsImpl(method2, headers2, + PickSubchannelArgsMatcher ff4args = new PickSubchannelArgsMatcher(method2, headers2, failFastCallOptions); // Wait-for-ready streams FakeClock wfr3Executor = new FakeClock(); DelayedStream wfr1 = (DelayedStream) delayedTransport.newStream( method, headers, waitForReadyCallOptions, tracers); - PickSubchannelArgsImpl wfr1args = new PickSubchannelArgsImpl(method, headers, + PickSubchannelArgsMatcher wfr1args = new PickSubchannelArgsMatcher(method, headers, waitForReadyCallOptions); DelayedStream wfr2 = (DelayedStream) delayedTransport.newStream( method2, headers2, waitForReadyCallOptions, tracers); - PickSubchannelArgsImpl wfr2args = new PickSubchannelArgsImpl(method2, headers2, + PickSubchannelArgsMatcher wfr2args = new PickSubchannelArgsMatcher(method2, headers2, waitForReadyCallOptions); CallOptions wfr3callOptions = waitForReadyCallOptions.withExecutor( wfr3Executor.getScheduledExecutorService()); @@ -376,11 +379,11 @@ public void uncaughtException(Thread t, Throwable e) { method, headers, wfr3callOptions, tracers); wfr3.start(mock(ClientStreamListener.class)); wfr3.halfClose(); - PickSubchannelArgsImpl wfr3args = new PickSubchannelArgsImpl(method, headers, + PickSubchannelArgsMatcher wfr3args = new PickSubchannelArgsMatcher(method, headers, wfr3callOptions); DelayedStream wfr4 = (DelayedStream) delayedTransport.newStream( method2, headers2, waitForReadyCallOptions, tracers); - PickSubchannelArgsImpl wfr4args = new PickSubchannelArgsImpl(method2, headers2, + PickSubchannelArgsMatcher wfr4args = new PickSubchannelArgsMatcher(method2, headers2, waitForReadyCallOptions); assertEquals(8, delayedTransport.getPendingStreamsCount()); @@ -401,14 +404,14 @@ public void uncaughtException(Thread t, Throwable e) { delayedTransport.reprocess(picker); assertEquals(5, delayedTransport.getPendingStreamsCount()); - inOrder.verify(picker).pickSubchannel(ff1args); - inOrder.verify(picker).pickSubchannel(ff2args); - inOrder.verify(picker).pickSubchannel(ff3args); - inOrder.verify(picker).pickSubchannel(ff4args); - inOrder.verify(picker).pickSubchannel(wfr1args); - inOrder.verify(picker).pickSubchannel(wfr2args); - inOrder.verify(picker).pickSubchannel(wfr3args); - inOrder.verify(picker).pickSubchannel(wfr4args); + inOrder.verify(picker).pickSubchannel(argThat(ff1args)); + inOrder.verify(picker).pickSubchannel(argThat(ff2args)); + inOrder.verify(picker).pickSubchannel(argThat(ff3args)); + inOrder.verify(picker).pickSubchannel(argThat(ff4args)); + inOrder.verify(picker).pickSubchannel(argThat(wfr1args)); + inOrder.verify(picker).pickSubchannel(argThat(wfr2args)); + inOrder.verify(picker).pickSubchannel(argThat(wfr3args)); + inOrder.verify(picker).pickSubchannel(argThat(wfr4args)); inOrder.verifyNoMoreInteractions(); // Make sure that streams are created and started immediately, not in any executor. This is @@ -454,11 +457,11 @@ public void uncaughtException(Thread t, Throwable e) { delayedTransport.reprocess(picker); assertEquals(0, delayedTransport.getPendingStreamsCount()); verify(transportListener).transportInUse(false); - inOrder.verify(picker).pickSubchannel(ff3args); // ff3 - inOrder.verify(picker).pickSubchannel(ff4args); // ff4 - inOrder.verify(picker).pickSubchannel(wfr2args); // wfr2 - inOrder.verify(picker).pickSubchannel(wfr3args); // wfr3 - inOrder.verify(picker).pickSubchannel(wfr4args); // wfr4 + inOrder.verify(picker).pickSubchannel(argThat(ff3args)); // ff3 + inOrder.verify(picker).pickSubchannel(argThat(ff4args)); // ff4 + inOrder.verify(picker).pickSubchannel(argThat(wfr2args)); // wfr2 + inOrder.verify(picker).pickSubchannel(argThat(wfr3args)); // wfr3 + inOrder.verify(picker).pickSubchannel(argThat(wfr4args)); // wfr4 inOrder.verifyNoMoreInteractions(); fakeExecutor.runDueTasks(); assertEquals(0, fakeExecutor.numPendingTasks()); @@ -478,7 +481,7 @@ public void uncaughtException(Thread t, Throwable e) { method, headers, waitForReadyCallOptions, tracers); assertNull(wfr5.getRealStream()); inOrder.verify(picker).pickSubchannel( - new PickSubchannelArgsImpl(method, headers, waitForReadyCallOptions)); + eqPickSubchannelArgs(method, headers, waitForReadyCallOptions)); inOrder.verifyNoMoreInteractions(); assertEquals(1, delayedTransport.getPendingStreamsCount()); @@ -492,7 +495,7 @@ public void uncaughtException(Thread t, Throwable e) { PickResult.withSubchannel(subchannel1)); delayedTransport.reprocess(picker); verify(picker).pickSubchannel( - new PickSubchannelArgsImpl(method, headers, waitForReadyCallOptions)); + eqPickSubchannelArgs(method, headers, waitForReadyCallOptions)); fakeExecutor.runDueTasks(); assertSame(mockRealStream, wfr5.getRealStream()); assertEquals(0, delayedTransport.getPendingStreamsCount()); @@ -517,7 +520,7 @@ public void reprocess_NoPendingStream() { // Though picker was not originally used, it will be saved and serve future streams. ClientStream stream = delayedTransport.newStream( method, headers, CallOptions.DEFAULT, tracers); - verify(picker).pickSubchannel(new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT)); + verify(picker).pickSubchannel(eqPickSubchannelArgs(method, headers, CallOptions.DEFAULT)); verify(mockInternalSubchannel).obtainActiveTransport(); assertSame(mockRealStream, stream); } @@ -559,16 +562,16 @@ public void run() { }; sideThread.start(); - PickSubchannelArgsImpl args = new PickSubchannelArgsImpl(method, headers, callOptions); - PickSubchannelArgsImpl args2 = new PickSubchannelArgsImpl(method, headers2, callOptions); + PickSubchannelArgsMatcher args = new PickSubchannelArgsMatcher(method, headers, callOptions); + PickSubchannelArgsMatcher args2 = new PickSubchannelArgsMatcher(method, headers2, callOptions); // Is called from sideThread - verify(picker, timeout(5000)).pickSubchannel(args); + verify(picker, timeout(5000)).pickSubchannel(argThat(args)); // Because stream has not been buffered (it's still stuck in newStream()), this will do nothing, // but incrementing the picker version. delayedTransport.reprocess(picker); - verify(picker).pickSubchannel(args); + verify(picker).pickSubchannel(argThat(args)); // Now let the stuck newStream() through barrier.await(5, TimeUnit.SECONDS); @@ -576,7 +579,7 @@ public void run() { sideThread.join(5000); assertFalse("sideThread should've exited", sideThread.isAlive()); // newStream() detects that there has been a new picker while it's stuck, thus will pick again. - verify(picker, times(2)).pickSubchannel(args); + verify(picker, times(2)).pickSubchannel(argThat(args)); barrier.reset(); nextPickShouldWait.set(true); @@ -592,9 +595,9 @@ public void run() { }; sideThread2.start(); // The second stream will see the first picker - verify(picker, timeout(5000)).pickSubchannel(args2); + verify(picker, timeout(5000)).pickSubchannel(argThat(args2)); // While the first stream won't use the first picker any more. - verify(picker, times(2)).pickSubchannel(args); + verify(picker, times(2)).pickSubchannel(argThat(args)); // Now use a different picker SubchannelPicker picker2 = mock(SubchannelPicker.class); @@ -602,9 +605,9 @@ public void run() { .thenReturn(PickResult.withNoResult()); delayedTransport.reprocess(picker2); // The pending first stream uses the new picker - verify(picker2).pickSubchannel(args); + verify(picker2).pickSubchannel(argThat(args)); // The second stream is still pending in creation, doesn't use the new picker. - verify(picker2, never()).pickSubchannel(args2); + verify(picker2, never()).pickSubchannel(argThat(args2)); // Now let the second stream finish creation barrier.await(5, TimeUnit.SECONDS); @@ -612,13 +615,30 @@ public void run() { sideThread2.join(5000); assertFalse("sideThread2 should've exited", sideThread2.isAlive()); // The second stream should see the new picker - verify(picker2, timeout(5000)).pickSubchannel(args2); + verify(picker2, timeout(5000)).pickSubchannel(argThat(args2)); // Wrapping up - verify(picker, times(2)).pickSubchannel(args); - verify(picker).pickSubchannel(args2); - verify(picker2).pickSubchannel(args); - verify(picker2).pickSubchannel(args); + verify(picker, times(2)).pickSubchannel(argThat(args)); + verify(picker).pickSubchannel(argThat(args2)); + verify(picker2).pickSubchannel(argThat(args)); + verify(picker2).pickSubchannel(argThat(args)); + } + + @Test + public void reprocess_addOptionalLabelCallsTracer() throws Exception { + delayedTransport.reprocess(new SubchannelPicker() { + @Override public PickResult pickSubchannel(PickSubchannelArgs args) { + args.getPickDetailsConsumer().addOptionalLabel("routed", "perfectly"); + return PickResult.withError(Status.UNAVAILABLE.withDescription("expected")); + } + }); + + ClientStreamTracer tracer = mock(ClientStreamTracer.class); + ClientStream stream = delayedTransport.newStream( + method, headers, callOptions, new ClientStreamTracer[] {tracer}); + stream.start(streamListener); + + verify(tracer).addOptionalLabel("routed", "perfectly"); } @Test diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 5cd0d40fc94..dc2162bfce2 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -26,6 +26,7 @@ import static io.grpc.ConnectivityState.SHUTDOWN; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static io.grpc.EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE; +import static io.grpc.PickSubchannelArgsMatcher.eqPickSubchannelArgs; import static io.grpc.internal.ClientStreamListener.RpcProgress.PROCESSED; import static junit.framework.TestCase.assertNotSame; import static org.junit.Assert.assertEquals; @@ -617,6 +618,32 @@ public ClientCall interceptCall( TimeUnit.SECONDS.toNanos(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS)); } + @Test + public void pickSubchannelAddOptionalLabel_callsTracer() { + channelBuilder.directExecutor(); + createChannel(); + + updateBalancingStateSafely(helper, TRANSIENT_FAILURE, new SubchannelPicker() { + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + args.getPickDetailsConsumer().addOptionalLabel("routed", "perfectly"); + return PickResult.withError(Status.UNAVAILABLE.withDescription("expected")); + } + }); + ClientStreamTracer tracer = mock(ClientStreamTracer.class); + ClientStreamTracer.Factory tracerFactory = new ClientStreamTracer.Factory() { + @Override + public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) { + return tracer; + } + }; + ClientCall call = channel.newCall( + method, CallOptions.DEFAULT.withStreamTracerFactory(tracerFactory)); + call.start(mockCallListener, new Metadata()); + + verify(tracer).addOptionalLabel("routed", "perfectly"); + } + @Test public void shutdownWithNoTransportsEverCreated() { channelBuilder.nameResolverFactory( @@ -808,10 +835,10 @@ private void subtestCallsAndShutdown(boolean shutdownNow, boolean shutdownNowAft .thenReturn(mockStream2); transportListener.transportReady(); when(mockPicker.pickSubchannel( - new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT))).thenReturn( + eqPickSubchannelArgs(method, headers, CallOptions.DEFAULT))).thenReturn( PickResult.withNoResult()); when(mockPicker.pickSubchannel( - new PickSubchannelArgsImpl(method, headers2, CallOptions.DEFAULT))).thenReturn( + eqPickSubchannelArgs(method, headers2, CallOptions.DEFAULT))).thenReturn( PickResult.withSubchannel(subchannel)); updateBalancingStateSafely(helper, READY, mockPicker); @@ -875,7 +902,7 @@ private void subtestCallsAndShutdown(boolean shutdownNow, boolean shutdownNowAft assertFalse(nameResolverFactory.resolvers.get(0).shutdown); // call and call2 are still alive, and can still be assigned to a real transport SubchannelPicker picker2 = mock(SubchannelPicker.class); - when(picker2.pickSubchannel(new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT))) + when(picker2.pickSubchannel(eqPickSubchannelArgs(method, headers, CallOptions.DEFAULT))) .thenReturn(PickResult.withSubchannel(subchannel)); updateBalancingStateSafely(helper, READY, picker2); executor.runDueTasks(); @@ -4531,4 +4558,4 @@ private static ManagedChannelServiceConfig createManagedChannelServiceConfig( return ManagedChannelServiceConfig .fromServiceConfig(rawServiceConfig, true, 3, 4, policySelection); } -} \ No newline at end of file +} diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelServiceConfigTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelServiceConfigTest.java index c25a0808584..493714dfd41 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelServiceConfigTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelServiceConfigTest.java @@ -27,6 +27,7 @@ import io.grpc.CallOptions; import io.grpc.InternalConfigSelector; import io.grpc.InternalConfigSelector.Result; +import io.grpc.LoadBalancer.PickDetailsConsumer; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo; @@ -209,7 +210,8 @@ public void getDefaultConfigSelectorFromConfig() { InternalConfigSelector configSelector = serviceConfig.getDefaultConfigSelector(); MethodDescriptor method = methodForName("service1", "method1"); Result result = configSelector.selectConfig( - new PickSubchannelArgsImpl(method, new Metadata(), CallOptions.DEFAULT)); + new PickSubchannelArgsImpl( + method, new Metadata(), CallOptions.DEFAULT, new PickDetailsConsumer() {})); MethodInfo methodInfoFromDefaultConfigSelector = ((ManagedChannelServiceConfig) result.getConfig()).getMethodConfig(method); assertThat(methodInfoFromDefaultConfigSelector) diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/RpcBehaviorLoadBalancerProviderTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/RpcBehaviorLoadBalancerProviderTest.java index e19208b8883..02ede46bcdd 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/RpcBehaviorLoadBalancerProviderTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/RpcBehaviorLoadBalancerProviderTest.java @@ -100,7 +100,7 @@ public void helperWrapsPicker() { @Test public void pickerAddsRpcBehaviorMetadata() { PickSubchannelArgsImpl args = new PickSubchannelArgsImpl(TestMethodDescriptors.voidMethod(), - new Metadata(), CallOptions.DEFAULT); + new Metadata(), CallOptions.DEFAULT, new LoadBalancer.PickDetailsConsumer() {}); new RpcBehaviorPicker(mockPicker, "error-code-15").pickSubchannel(args); assertThat(args.getHeaders() diff --git a/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java b/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java index 8dd99bff320..9fc59e7aa33 100644 --- a/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java +++ b/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java @@ -43,6 +43,7 @@ import io.grpc.ForwardingChannelBuilder2; import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancer.PickDetailsConsumer; import io.grpc.LoadBalancer.PickResult; import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.LoadBalancerProvider; @@ -383,7 +384,8 @@ public void get_updatesLbState() throws Exception { .setFullMethodName("doesn/exists") .build(), headers, - CallOptions.DEFAULT)); + CallOptions.DEFAULT, + new PickDetailsConsumer() {})); assertThat(pickResult.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); assertThat(pickResult.getStatus().getDescription()).contains("fallback not available"); assertThat(fakeThrottler.getNumThrottled()).isEqualTo(1); @@ -438,7 +440,8 @@ private static PickResult getPickResultForCreate(ArgumentCaptor fakeRescueMethod; private RlsLoadBalancer rlsLb; private String defaultTarget = "defaultTarget"; - private PickSubchannelArgsImpl searchSubchannelArgs; - private PickSubchannelArgsImpl rescueSubchannelArgs; + private PickSubchannelArgs searchSubchannelArgs; + private PickSubchannelArgs rescueSubchannelArgs; @Before public void setUp() { @@ -163,12 +165,8 @@ public CachingRlsLbClient.Builder get() { } }; - Metadata headers = new Metadata(); - searchSubchannelArgs = - new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT); - rescueSubchannelArgs = - new PickSubchannelArgsImpl(fakeRescueMethod, headers, CallOptions.DEFAULT); - + searchSubchannelArgs = newPickSubchannelArgs(fakeSearchMethod); + rescueSubchannelArgs = newPickSubchannelArgs(fakeRescueMethod); } @After @@ -183,9 +181,7 @@ public void lb_serverStatusCodeConversion() throws Exception { inOrder.verify(helper) .updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); SubchannelPicker picker = pickerCaptor.getValue(); - Metadata headers = new Metadata(); - PickSubchannelArgsImpl fakeSearchMethodArgs = - new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT); + PickSubchannelArgs fakeSearchMethodArgs = newPickSubchannelArgs(fakeSearchMethod); // Warm-up pick; will be queued PickResult res = picker.pickSubchannel(fakeSearchMethodArgs); assertThat(res.getStatus().isOk()).isTrue(); @@ -332,7 +328,6 @@ public void lb_working_withoutDefaultTarget() throws Exception { inOrder.verify(helper) .updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); SubchannelPicker picker = pickerCaptor.getValue(); - Metadata headers = new Metadata(); // Warm-up pick; will be queued PickResult res = picker.pickSubchannel(searchSubchannelArgs); assertThat(res.getStatus().isOk()).isTrue(); @@ -375,13 +370,11 @@ public void lb_working_withoutDefaultTarget() throws Exception { // search method will fail because there is no fallback target. picker = pickerCaptor.getValue(); - res = picker.pickSubchannel( - new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT)); + res = picker.pickSubchannel(newPickSubchannelArgs(fakeSearchMethod)); assertThat(res.getStatus().isOk()).isFalse(); assertThat(subchannelIsReady(res.getSubchannel())).isFalse(); - res = picker.pickSubchannel( - new PickSubchannelArgsImpl(fakeRescueMethod, headers, CallOptions.DEFAULT)); + res = picker.pickSubchannel(newPickSubchannelArgs(fakeRescueMethod)); assertThat(subchannelIsReady(res.getSubchannel())).isTrue(); assertThat(res.getSubchannel().getAddresses()).isEqualTo(rescueSubchannel.getAddresses()); assertThat(res.getSubchannel().getAttributes()).isEqualTo(rescueSubchannel.getAttributes()); @@ -401,10 +394,7 @@ public void lb_nameResolutionFailed() throws Exception { inOrder.verify(helper) .updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); SubchannelPicker picker = pickerCaptor.getValue(); - Metadata headers = new Metadata(); - PickResult res = - picker.pickSubchannel( - new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT)); + PickResult res = picker.pickSubchannel(newPickSubchannelArgs(fakeSearchMethod)); assertThat(res.getStatus().isOk()).isTrue(); assertThat(subchannelIsReady(res.getSubchannel())).isFalse(); @@ -418,8 +408,7 @@ public void lb_nameResolutionFailed() throws Exception { SubchannelPicker picker2 = pickerCaptor.getValue(); assertThat(picker2).isEqualTo(picker); - res = picker2.pickSubchannel( - new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT)); + res = picker2.pickSubchannel(newPickSubchannelArgs(fakeSearchMethod)); // verify success. Subchannel is wrapped, so checking attributes. assertThat(subchannelIsReady(res.getSubchannel())).isTrue(); assertThat(res.getSubchannel().getAddresses()).isEqualTo(searchSubchannel.getAddresses()); @@ -432,14 +421,13 @@ public void lb_nameResolutionFailed() throws Exception { verify(helper) .updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); SubchannelPicker failedPicker = pickerCaptor.getValue(); - res = failedPicker.pickSubchannel( - new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT)); + res = failedPicker.pickSubchannel(newPickSubchannelArgs(fakeSearchMethod)); assertThat(res.getStatus().isOk()).isFalse(); assertThat(subchannelIsReady(res.getSubchannel())).isFalse(); } private PickResult markReadyAndGetPickResult(InOrder inOrder, - PickSubchannelArgsImpl pickSubchannelArgs) { + PickSubchannelArgs pickSubchannelArgs) { subchannels.getLast().updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); inOrder.verify(helper, atLeast(1)) .updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture()); @@ -504,6 +492,11 @@ private String getRlsConfigJsonStr() { + "}"; } + private PickSubchannelArgs newPickSubchannelArgs(MethodDescriptor method) { + return new PickSubchannelArgsImpl( + method, new Metadata(), CallOptions.DEFAULT, new PickDetailsConsumer() {}); + } + private final class FakeHelper extends Helper { @Override diff --git a/util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java b/util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java index 7317917887a..b173b3f5e26 100644 --- a/util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java +++ b/util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java @@ -53,6 +53,11 @@ public void inboundTrailers(Metadata trailers) { delegate().inboundTrailers(trailers); } + @Override + public void addOptionalLabel(String key, String value) { + delegate().addOptionalLabel(key, value); + } + @Override public void streamClosed(Status status) { delegate().streamClosed(status); diff --git a/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java index 786962d0f1d..f55b0d73f79 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java @@ -37,6 +37,7 @@ import io.grpc.EquivalentAddressGroup; import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancer.PickDetailsConsumer; import io.grpc.LoadBalancer.PickResult; import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.LoadBalancer.ResolvedAddresses; @@ -310,7 +311,8 @@ private static PickResult pickSubchannel(SubchannelPicker picker, String cluster .build(), new Metadata(), CallOptions.DEFAULT.withOption( - XdsNameResolver.CLUSTER_SELECTION_KEY, clusterName)); + XdsNameResolver.CLUSTER_SELECTION_KEY, clusterName), + new PickDetailsConsumer() {}); return picker.pickSubchannel(args); } diff --git a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java index d4ed83ae2cf..339779cf5e0 100644 --- a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java @@ -50,6 +50,7 @@ import io.grpc.EquivalentAddressGroup; import io.grpc.LoadBalancer.CreateSubchannelArgs; import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancer.PickDetailsConsumer; import io.grpc.LoadBalancer.PickResult; import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.LoadBalancer.ResolvedAddresses; @@ -461,13 +462,14 @@ public void skipFailingHosts_pickNextNonFailingHost() { assertThat(result.getSubchannel().getAddresses()).isEqualTo(servers.get(1)); } - private PickSubchannelArgsImpl getDefaultPickSubchannelArgs(long rpcHash) { + private PickSubchannelArgs getDefaultPickSubchannelArgs(long rpcHash) { return new PickSubchannelArgsImpl( TestMethodDescriptors.voidMethod(), new Metadata(), - CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, rpcHash)); + CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, rpcHash), + new PickDetailsConsumer() {}); } - private PickSubchannelArgsImpl getDefaultPickSubchannelArgsForServer(int serverid) { + private PickSubchannelArgs getDefaultPickSubchannelArgsForServer(int serverid) { long rpcHash = hashFunc.hashAsciiString("FakeSocketAddress-server" + serverid + "_0"); return getDefaultPickSubchannelArgs(rpcHash); } diff --git a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java index cca2d84373c..28871850e72 100644 --- a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java @@ -50,6 +50,8 @@ import io.grpc.InsecureChannelCredentials; import io.grpc.InternalConfigSelector; import io.grpc.InternalConfigSelector.Result; +import io.grpc.LoadBalancer.PickDetailsConsumer; +import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor.MethodType; @@ -662,7 +664,7 @@ public void retryPolicyInPerMethodConfigGeneratedByResolverIsValid() { ResolutionResult result = resolutionResultCaptor.getValue(); InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY); Result selectResult = configSelector.selectConfig( - new PickSubchannelArgsImpl(call1.methodDescriptor, new Metadata(), CallOptions.DEFAULT)); + newPickSubchannelArgs(call1.methodDescriptor, new Metadata(), CallOptions.DEFAULT)); Object config = selectResult.getConfig(); // Purely validating the data (io.grpc.internal.RetryPolicy). @@ -693,7 +695,7 @@ public void resolved_simpleCallFailedToRoute_noMatchingRoute() { InternalConfigSelector configSelector = resolveToClusters(); CallInfo call = new CallInfo("FooService", "barMethod"); Result selectResult = configSelector.selectConfig( - new PickSubchannelArgsImpl(call.methodDescriptor, new Metadata(), CallOptions.DEFAULT)); + newPickSubchannelArgs(call.methodDescriptor, new Metadata(), CallOptions.DEFAULT)); Status status = selectResult.getStatus(); assertThat(status.isOk()).isFalse(); assertThat(status.getCode()).isEqualTo(Code.UNAVAILABLE); @@ -727,7 +729,7 @@ public void resolved_simpleCallFailedToRoute_routeWithNonForwardingAction() { InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY); // Simulates making a call1 RPC. Result selectResult = configSelector.selectConfig( - new PickSubchannelArgsImpl(call1.methodDescriptor, new Metadata(), CallOptions.DEFAULT)); + newPickSubchannelArgs(call1.methodDescriptor, new Metadata(), CallOptions.DEFAULT)); Status status = selectResult.getStatus(); assertThat(status.isOk()).isFalse(); assertThat(status.getCode()).isEqualTo(Code.UNAVAILABLE); @@ -1166,7 +1168,7 @@ private void assertEmptyResolutionResult(String resource) { assertThat((Map) result.getServiceConfig().getConfig()).isEmpty(); InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY); Result configResult = configSelector.selectConfig( - new PickSubchannelArgsImpl(call1.methodDescriptor, new Metadata(), CallOptions.DEFAULT)); + newPickSubchannelArgs(call1.methodDescriptor, new Metadata(), CallOptions.DEFAULT)); assertThat(configResult.getStatus().getCode()).isEqualTo(Status.Code.UNAVAILABLE); assertThat(configResult.getStatus().getDescription()).contains(resource); } @@ -1175,7 +1177,7 @@ private void assertCallSelectClusterResult( CallInfo call, InternalConfigSelector configSelector, String expectedCluster, @Nullable Double expectedTimeoutSec) { Result result = configSelector.selectConfig( - new PickSubchannelArgsImpl(call.methodDescriptor, new Metadata(), CallOptions.DEFAULT)); + newPickSubchannelArgs(call.methodDescriptor, new Metadata(), CallOptions.DEFAULT)); assertThat(result.getStatus().isOk()).isTrue(); ClientInterceptor interceptor = result.getInterceptor(); ClientCall clientCall = interceptor.interceptCall( @@ -1203,7 +1205,7 @@ private void assertCallSelectRlsPluginResult( CallInfo call, InternalConfigSelector configSelector, String expectedPluginName, Double expectedTimeoutSec) { Result result = configSelector.selectConfig( - new PickSubchannelArgsImpl(call.methodDescriptor, new Metadata(), CallOptions.DEFAULT)); + newPickSubchannelArgs(call.methodDescriptor, new Metadata(), CallOptions.DEFAULT)); assertThat(result.getStatus().isOk()).isTrue(); ClientInterceptor interceptor = result.getInterceptor(); ClientCall clientCall = interceptor.interceptCall( @@ -1850,8 +1852,7 @@ private ClientCall.Listener startNewCall( } @SuppressWarnings("unchecked") ClientCall.Listener listener = mock(ClientCall.Listener.class); - Result result = selector.selectConfig(new PickSubchannelArgsImpl( - method, metadata, callOptions)); + Result result = selector.selectConfig(newPickSubchannelArgs(method, metadata, callOptions)); ClientCall call = ClientInterceptors.intercept(channel, result.getInterceptor()).newCall(method, callOptions); call.start(listener, metadata); @@ -1889,6 +1890,11 @@ private void verifyRpcDelayedThenAborted( verifyRpcFailed(listener, expectedStatus); } + private PickSubchannelArgs newPickSubchannelArgs( + MethodDescriptor method, Metadata headers, CallOptions callOptions) { + return new PickSubchannelArgsImpl(method, headers, callOptions, new PickDetailsConsumer() {}); + } + private final class FakeXdsClientPoolFactory implements XdsClientPoolFactory { @Override public void setBootstrapOverride(Map bootstrap) {}