diff --git a/context/src/main/java/io/grpc/Deadline.java b/context/src/main/java/io/grpc/Deadline.java index fadbe50874d..b330081857b 100644 --- a/context/src/main/java/io/grpc/Deadline.java +++ b/context/src/main/java/io/grpc/Deadline.java @@ -44,6 +44,8 @@ public final class Deadline implements Comparable { *

This is EXPERIMENTAL API and may subject to change. If you'd like it to be * stabilized or have any feedback, please * let us know. + * + * @since 1.24.0 */ public static Ticker getSystemTicker() { return SYSTEM_TICKER; @@ -64,6 +66,11 @@ public static Deadline after(long duration, TimeUnit units) { /** * Create a deadline that will expire at the specified offset based on the given {@link Ticker}. * + *

CAUTION: Only deadlines created with the same {@link Ticker} instance can + * be compared by methods like {@link #minimum}, {@link #isBefore} and {@link #compareTo}. Custom + * Tickers should only be used in tests where you fake out the clock. Always use the {@link + * #getSystemTicker system ticker} in production, or serious errors may occur. + * *

This is EXPERIMENTAL API and may subject to change. If you'd like it to be * stabilized or have any feedback, please * let us know. @@ -72,6 +79,8 @@ public static Deadline after(long duration, TimeUnit units) { * @param units The time unit for the duration. * @param ticker Where this deadline refer the current time * @return A new deadline. + * + * @since 1.24.0 */ public static Deadline after(long duration, TimeUnit units, Ticker ticker) { checkNotNull(units, "units"); @@ -111,19 +120,22 @@ public boolean isExpired() { } /** - * Is {@code this} deadline before another. + * Is {@code this} deadline before another. Two deadlines must be created using the same {@link + * Ticker}. */ public boolean isBefore(Deadline other) { - assert this.ticker == other.ticker : "Tickers don't match"; + checkTicker(other); return this.deadlineNanos - other.deadlineNanos < 0; } /** - * Return the minimum deadline of {@code this} or an other deadline. + * Return the minimum deadline of {@code this} or an other deadline. They must be created using + * the same {@link Ticker}. + * * @param other deadline to compare with {@code this}. */ public Deadline minimum(Deadline other) { - assert this.ticker == other.ticker : "Tickers don't match"; + checkTicker(other); return isBefore(other) ? this : other; } @@ -157,6 +169,11 @@ public long timeRemaining(TimeUnit unit) { /** * Schedule a task to be run when the deadline expires. + * + *

Note if this deadline was created with a custom {@link Ticker}, the {@code scheduler}'s + * underlying clock should be synchronized with that Ticker. Otherwise the task won't be run at + * the expected point of time. + * * @param task to run on expiration * @param scheduler used to execute the task * @return {@link ScheduledFuture} which can be used to cancel execution of the task @@ -182,12 +199,20 @@ public String toString() { buf.append(String.format(".%09d", nanos)); } buf.append("s from now"); + if (ticker != SYSTEM_TICKER) { + buf.append(" (ticker=" + ticker + ")"); + } return buf.toString(); } + /** + * {@inheritDoc} + * + *

Both deadlines must be created with the same {@link Ticker}. + */ @Override public int compareTo(Deadline that) { - assert this.ticker == that.ticker : "Tickers don't match"; + checkTicker(that); long diff = this.deadlineNanos - that.deadlineNanos; if (diff < 0) { return -1; @@ -200,12 +225,18 @@ public int compareTo(Deadline that) { /** * Time source representing nanoseconds since fixed but arbitrary point in time. * + *

DO NOT use custom {@link Ticker} implementations in production, because deadlines created + * with custom tickers are incompatible with those created with the system ticker. Always use + * the {@link #getSystemTicker system ticker} whenever you need to provide one in production code. + * *

This is EXPERIMENTAL API and may subject to change. If you'd like it to be * stabilized or have any feedback, please * let us know. * *

In general implementations should be thread-safe, unless it's implemented and used in a * localized environment (like unit tests) where you are sure the usages are synchronized. + * + * @since 1.24.0 */ public abstract static class Ticker { /** Returns the number of nanoseconds since this source's epoch. */ @@ -225,4 +256,12 @@ private static T checkNotNull(T reference, Object errorMessage) { } return reference; } + + private void checkTicker(Deadline other) { + if (ticker != other.ticker) { + throw new AssertionError( + "Tickers (" + ticker + " and " + other.ticker + ") don't match." + + " Custom Ticker should only be used in tests!"); + } + } } diff --git a/context/src/test/java/io/grpc/DeadlineTest.java b/context/src/test/java/io/grpc/DeadlineTest.java index 48e172dfd32..5f35918796d 100644 --- a/context/src/test/java/io/grpc/DeadlineTest.java +++ b/context/src/test/java/io/grpc/DeadlineTest.java @@ -17,6 +17,7 @@ package io.grpc; import static com.google.common.truth.Truth.assertAbout; +import static com.google.common.truth.Truth.assertThat; import static io.grpc.testing.DeadlineSubject.deadline; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -68,6 +69,18 @@ public void defaultTickerIsSystemTicker() { assertAbout(deadline()).that(d).isWithin(1, TimeUnit.SECONDS).of(reference); } + @Test + public void minimum() { + Deadline d1 = Deadline.after(1, TimeUnit.MINUTES, ticker); + Deadline d2 = Deadline.after(2, TimeUnit.MINUTES, ticker); + Deadline d3 = Deadline.after(3, TimeUnit.MINUTES, ticker); + + assertThat(d1.minimum(d2)).isSameInstanceAs(d1); + assertThat(d2.minimum(d1)).isSameInstanceAs(d1); + assertThat(d3.minimum(d2)).isSameInstanceAs(d2); + assertThat(d2.minimum(d3)).isSameInstanceAs(d2); + } + @Test public void timeCanOverflow() { ticker.reset(Long.MAX_VALUE); @@ -208,10 +221,16 @@ public void run() { assertTrue(executed.get()); } + @Test + public void toString_systemTickerNotShown() { + Deadline d = Deadline.after(0, TimeUnit.MILLISECONDS); + assertThat(d.toString()).endsWith("s from now"); + } + @Test public void toString_exact() { Deadline d = Deadline.after(0, TimeUnit.MILLISECONDS, ticker); - assertEquals("0s from now", d.toString()); + assertEquals("0s from now (ticker=FAKE_TICKER)", d.toString()); } @Test @@ -219,17 +238,17 @@ public void toString_after() { Deadline d; d = Deadline.after(-1, TimeUnit.MINUTES, ticker); - assertEquals("-60s from now", d.toString()); + assertEquals("-60s from now (ticker=FAKE_TICKER)", d.toString()); d = Deadline.after(-1, TimeUnit.MILLISECONDS, ticker); - assertEquals("-0.001000000s from now", d.toString()); + assertEquals("-0.001000000s from now (ticker=FAKE_TICKER)", d.toString()); d = Deadline.after(-500, TimeUnit.MILLISECONDS, ticker); - assertEquals("-0.500000000s from now", d.toString()); + assertEquals("-0.500000000s from now (ticker=FAKE_TICKER)", d.toString()); d = Deadline.after(-1000, TimeUnit.MILLISECONDS, ticker); - assertEquals("-1s from now", d.toString()); + assertEquals("-1s from now (ticker=FAKE_TICKER)", d.toString()); d = Deadline.after(-1500, TimeUnit.MILLISECONDS, ticker); - assertEquals("-1.500000000s from now", d.toString()); + assertEquals("-1.500000000s from now (ticker=FAKE_TICKER)", d.toString()); d = Deadline.after(-1023456789, TimeUnit.NANOSECONDS, ticker); - assertEquals("-1.023456789s from now", d.toString()); + assertEquals("-1.023456789s from now (ticker=FAKE_TICKER)", d.toString()); } @Test @@ -255,10 +274,40 @@ public void compareTo_same() { Truth.assertThat(d1).isEquivalentAccordingToCompareTo(d2); } + @Test + public void tickersDontMatch() { + Deadline d1 = Deadline.after(10, TimeUnit.SECONDS); + Deadline d2 = Deadline.after(10, TimeUnit.SECONDS, ticker); + boolean success = false; + try { + d1.compareTo(d2); + success = true; + } catch (AssertionError e) { + // Expected + } + assertFalse(success); + + try { + d1.minimum(d2); + success = true; + } catch (AssertionError e) { + // Expected + } + assertFalse(success); + + try { + d1.isBefore(d2); + success = true; + } catch (AssertionError e) { + // Expected + } + assertFalse(success); + } + @Test public void toString_before() { Deadline d = Deadline.after(12, TimeUnit.MICROSECONDS, ticker); - assertEquals("0.000012000s from now", d.toString()); + assertEquals("0.000012000s from now (ticker=FAKE_TICKER)", d.toString()); } private static class FakeTicker extends Deadline.Ticker { @@ -279,5 +328,10 @@ public void increment(long period, TimeUnit unit) { } this.time += unit.toNanos(period); } + + @Override + public String toString() { + return "FAKE_TICKER"; + } } } diff --git a/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java b/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java index ed27f3f9e63..603a794e3db 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java @@ -19,6 +19,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.base.Preconditions; +import io.grpc.Deadline; import io.grpc.ExperimentalApi; import io.grpc.ServerStreamTracer; import io.grpc.internal.AbstractServerImplBuilder; @@ -125,6 +126,24 @@ public InProcessServerBuilder scheduledExecutorService( return this; } + /** + * Provides a custom deadline ticker that this server will use to create incoming {@link + * Deadline}s. + * + *

This is intended for unit tests that fake out the clock. You should also have a fake {@link + * ScheduledExecutorService} whose clock is synchronized with this ticker and set it to {@link + * #scheduledExecutorService}. DO NOT use this in production. + * + * @return this + * @see Deadline#after(long, TimeUnit, Deadline.Ticker) + * + * @since 1.24.0 + */ + public InProcessServerBuilder deadlineTicker(Deadline.Ticker ticker) { + setDeadlineTicker(ticker); + return this; + } + /** * Sets the maximum size of metadata allowed to be received. {@code Integer.MAX_VALUE} disables * the enforcement. Defaults to no limit ({@code Integer.MAX_VALUE}). diff --git a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java index 04e3fbcc54b..c53c79a564e 100644 --- a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java @@ -25,6 +25,7 @@ import io.grpc.BindableService; import io.grpc.CompressorRegistry; import io.grpc.Context; +import io.grpc.Deadline; import io.grpc.DecompressorRegistry; import io.grpc.HandlerRegistry; import io.grpc.InternalChannelz; @@ -78,6 +79,7 @@ public static ServerBuilder forPort(int port) { DecompressorRegistry decompressorRegistry = DEFAULT_DECOMPRESSOR_REGISTRY; CompressorRegistry compressorRegistry = DEFAULT_COMPRESSOR_REGISTRY; long handshakeTimeoutMillis = DEFAULT_HANDSHAKE_TIMEOUT_MILLIS; + Deadline.Ticker ticker = Deadline.getSystemTicker(); @Nullable private CensusStatsModule censusStatsOverride; private boolean statsEnabled = true; private boolean recordStartedRpcs = true; @@ -216,6 +218,13 @@ protected void setTracingEnabled(boolean value) { tracingEnabled = value; } + /** + * Sets a custom deadline ticker. This should only be called from InProcessServerBuilder. + */ + protected void setDeadlineTicker(Deadline.Ticker ticker) { + this.ticker = checkNotNull(ticker, "ticker"); + } + @Override public final Server build() { ServerImpl server = new ServerImpl( diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index abbd4ec4780..471507eaee2 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -16,6 +16,7 @@ package io.grpc.internal; +import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.grpc.Contexts.statusFromCancelled; @@ -33,6 +34,7 @@ import io.grpc.BinaryLog; import io.grpc.CompressorRegistry; import io.grpc.Context; +import io.grpc.Deadline; import io.grpc.Decompressor; import io.grpc.DecompressorRegistry; import io.grpc.HandlerRegistry; @@ -123,6 +125,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume private final InternalChannelz channelz; private final CallTracer serverCallTracer; + private final Deadline.Ticker ticker; /** * Construct a server. @@ -157,6 +160,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume this.binlog = builder.binlog; this.channelz = builder.channelz; this.serverCallTracer = builder.callTracerFactory.create(); + this.ticker = checkNotNull(builder.ticker, "ticker"); channelz.addServer(this); } @@ -578,8 +582,10 @@ private Context.CancellableContext createContext( return baseContext.withCancellation(); } - Context.CancellableContext context = baseContext.withDeadlineAfter( - timeoutNanos, NANOSECONDS, transport.getScheduledExecutorService()); + Context.CancellableContext context = + baseContext.withDeadline( + Deadline.after(timeoutNanos, NANOSECONDS, ticker), + transport.getScheduledExecutorService()); final class ServerStreamCancellationListener implements Context.CancellationListener { @Override public void cancelled(Context context) { diff --git a/core/src/test/java/io/grpc/internal/FakeClock.java b/core/src/test/java/io/grpc/internal/FakeClock.java index ae8a41802ce..d708af5f25d 100644 --- a/core/src/test/java/io/grpc/internal/FakeClock.java +++ b/core/src/test/java/io/grpc/internal/FakeClock.java @@ -20,6 +20,7 @@ import com.google.common.base.Supplier; import com.google.common.base.Ticker; import com.google.common.util.concurrent.AbstractFuture; +import io.grpc.Deadline; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -61,6 +62,13 @@ public boolean shouldAccept(Runnable command) { } }; + private final Deadline.Ticker deadlineTicker = + new Deadline.Ticker() { + @Override public long nanoTime() { + return currentTimeNanos; + } + }; + private final Supplier stopwatchSupplier = new Supplier() { @Override public Stopwatch get() { @@ -229,6 +237,13 @@ public Ticker getTicker() { return ticker; } + /** + * Deadline ticker of the FakeClock. + */ + public Deadline.Ticker getDeadlineTicker() { + return deadlineTicker; + } + /** * Run all due tasks. Immediately due tasks that are queued during the process also get executed. * diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java index dd6aa46ee30..1a79f93ee3a 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java @@ -19,6 +19,7 @@ import static com.google.common.truth.Truth.assertThat; import static io.grpc.InternalChannelz.id; import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY; +import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -199,6 +200,7 @@ public List getServices() { public void startUp() throws IOException { MockitoAnnotations.initMocks(this); builder.channelz = channelz; + builder.ticker = timer.getDeadlineTicker(); streamTracerFactories = Arrays.asList(streamTracerFactory); when(executorPool.getObject()).thenReturn(executor.getScheduledExecutorService()); when(streamTracerFactory.newServerStreamTracer(anyString(), any(Metadata.class))) @@ -977,10 +979,11 @@ private void checkContext() { verify(stream, times(0)).close(isA(Status.class), ArgumentMatchers.isNotNull()); } - private ServerStreamListener testClientClose_setup( + private ServerStreamListener testStreamClose_setup( final AtomicReference> callReference, final AtomicReference context, - final AtomicBoolean contextCancelled) throws Exception { + final AtomicBoolean contextCancelled, + @Nullable Long timeoutNanos) throws Exception { createAndStartServer(); callListener = new ServerCall.Listener() { @Override @@ -1011,6 +1014,9 @@ public ServerCall.Listener startCall( = transportServer.registerNewServerTransport(new SimpleServerTransport()); transportListener.transportReady(Attributes.EMPTY); Metadata requestHeaders = new Metadata(); + if (timeoutNanos != null) { + requestHeaders.put(TIMEOUT_KEY, timeoutNanos); + } StatsTraceContext statsTraceCtx = StatsTraceContext.newServerContext(streamTracerFactories, "Waitier/serve", requestHeaders); when(stream.statsTraceContext()).thenReturn(statsTraceCtx); @@ -1025,14 +1031,14 @@ public ServerCall.Listener startCall( } @Test - public void testClientClose_cancelTriggersImmediateCancellation() throws Exception { + public void testStreamClose_clientCancelTriggersImmediateCancellation() throws Exception { AtomicBoolean contextCancelled = new AtomicBoolean(false); AtomicReference context = new AtomicReference<>(); AtomicReference> callReference = new AtomicReference<>(); - ServerStreamListener streamListener = testClientClose_setup(callReference, - context, contextCancelled); + ServerStreamListener streamListener = testStreamClose_setup(callReference, + context, contextCancelled, null); // For close status being non OK: // isCancelled is expected to be true immediately after calling closed(), without needing @@ -1048,14 +1054,14 @@ public void testClientClose_cancelTriggersImmediateCancellation() throws Excepti } @Test - public void testClientClose_OkTriggersDelayedCancellation() throws Exception { + public void testStreamClose_clientOkTriggersDelayedCancellation() throws Exception { AtomicBoolean contextCancelled = new AtomicBoolean(false); AtomicReference context = new AtomicReference<>(); AtomicReference> callReference = new AtomicReference<>(); - ServerStreamListener streamListener = testClientClose_setup(callReference, - context, contextCancelled); + ServerStreamListener streamListener = testStreamClose_setup(callReference, + context, contextCancelled, null); // For close status OK: // isCancelled is expected to be true after all pending work is done @@ -1071,6 +1077,27 @@ public void testClientClose_OkTriggersDelayedCancellation() throws Exception { assertTrue(contextCancelled.get()); } + @Test + public void testStreamClose_deadlineExceededTriggersImmediateCancellation() throws Exception { + AtomicBoolean contextCancelled = new AtomicBoolean(false); + AtomicReference context = new AtomicReference<>(); + AtomicReference> callReference + = new AtomicReference<>(); + + testStreamClose_setup(callReference, context, contextCancelled, 50L); + + timer.forwardNanos(49); + + assertFalse(callReference.get().isCancelled()); + assertFalse(context.get().isCancelled()); + + assertEquals(1, timer.forwardNanos(1)); + + assertTrue(callReference.get().isCancelled()); + assertTrue(context.get().isCancelled()); + assertTrue(contextCancelled.get()); + } + @Test public void getPort() throws Exception { final InetSocketAddress addr = new InetSocketAddress(65535);