Skip to content

Commit

Permalink
core: allow setting custom Deadline.Ticker to InProcessServerBuilder (#…
Browse files Browse the repository at this point in the history
…6034)

ServerImpl uses that ticker to create incoming Deadlines. This feature is specifically restricted to in-process, as it can also customize ScheduledExecutorService, and them together can fake out the clock which is useful in tests. On the other hand, a fake Ticker won't work with Netty's ScheduledExecutorService.

Also improved mismatch detection, documentation and tests in Deadline.
  • Loading branch information
zhangkun83 committed Aug 1, 2019
1 parent 271cbff commit 4e4d4d8
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 23 deletions.
49 changes: 44 additions & 5 deletions context/src/main/java/io/grpc/Deadline.java
Expand Up @@ -44,6 +44,8 @@ public final class Deadline implements Comparable<Deadline> {
* <p>This is <strong>EXPERIMENTAL</strong> API and may subject to change. If you'd like it to be
* stabilized or have any feedback, please
* <href a="https://github.com/grpc/grpc-java/issues/6030">let us know</a>.
*
* @since 1.24.0
*/
public static Ticker getSystemTicker() {
return SYSTEM_TICKER;
Expand All @@ -64,6 +66,11 @@ public static Deadline after(long duration, TimeUnit units) {
/**
* Create a deadline that will expire at the specified offset based on the given {@link Ticker}.
*
* <p><strong>CAUTION</strong>: Only deadlines created with the same {@link Ticker} instance can
* be compared by methods like {@link #minimum}, {@link #isBefore} and {@link #compareTo}. Custom
* Tickers should only be used in tests where you fake out the clock. Always use the {@link
* #getSystemTicker system ticker} in production, or serious errors may occur.
*
* <p>This is <strong>EXPERIMENTAL</strong> API and may subject to change. If you'd like it to be
* stabilized or have any feedback, please
* <href a="https://github.com/grpc/grpc-java/issues/6030">let us know</a>.
Expand All @@ -72,6 +79,8 @@ public static Deadline after(long duration, TimeUnit units) {
* @param units The time unit for the duration.
* @param ticker Where this deadline refer the current time
* @return A new deadline.
*
* @since 1.24.0
*/
public static Deadline after(long duration, TimeUnit units, Ticker ticker) {
checkNotNull(units, "units");
Expand Down Expand Up @@ -111,19 +120,22 @@ public boolean isExpired() {
}

/**
* Is {@code this} deadline before another.
* Is {@code this} deadline before another. Two deadlines must be created using the same {@link
* Ticker}.
*/
public boolean isBefore(Deadline other) {
assert this.ticker == other.ticker : "Tickers don't match";
checkTicker(other);
return this.deadlineNanos - other.deadlineNanos < 0;
}

/**
* Return the minimum deadline of {@code this} or an other deadline.
* Return the minimum deadline of {@code this} or an other deadline. They must be created using
* the same {@link Ticker}.
*
* @param other deadline to compare with {@code this}.
*/
public Deadline minimum(Deadline other) {
assert this.ticker == other.ticker : "Tickers don't match";
checkTicker(other);
return isBefore(other) ? this : other;
}

Expand Down Expand Up @@ -157,6 +169,11 @@ public long timeRemaining(TimeUnit unit) {

/**
* Schedule a task to be run when the deadline expires.
*
* <p>Note if this deadline was created with a custom {@link Ticker}, the {@code scheduler}'s
* underlying clock should be synchronized with that Ticker. Otherwise the task won't be run at
* the expected point of time.
*
* @param task to run on expiration
* @param scheduler used to execute the task
* @return {@link ScheduledFuture} which can be used to cancel execution of the task
Expand All @@ -182,12 +199,20 @@ public String toString() {
buf.append(String.format(".%09d", nanos));
}
buf.append("s from now");
if (ticker != SYSTEM_TICKER) {
buf.append(" (ticker=" + ticker + ")");
}
return buf.toString();
}

/**
* {@inheritDoc}
*
* <p>Both deadlines must be created with the same {@link Ticker}.
*/
@Override
public int compareTo(Deadline that) {
assert this.ticker == that.ticker : "Tickers don't match";
checkTicker(that);
long diff = this.deadlineNanos - that.deadlineNanos;
if (diff < 0) {
return -1;
Expand All @@ -200,12 +225,18 @@ public int compareTo(Deadline that) {
/**
* Time source representing nanoseconds since fixed but arbitrary point in time.
*
* <p>DO NOT use custom {@link Ticker} implementations in production, because deadlines created
* with custom tickers are incompatible with those created with the system ticker. Always use
* the {@link #getSystemTicker system ticker} whenever you need to provide one in production code.
*
* <p>This is <strong>EXPERIMENTAL</strong> API and may subject to change. If you'd like it to be
* stabilized or have any feedback, please
* <href a="https://github.com/grpc/grpc-java/issues/6030">let us know</a>.
*
* <p>In general implementations should be thread-safe, unless it's implemented and used in a
* localized environment (like unit tests) where you are sure the usages are synchronized.
*
* @since 1.24.0
*/
public abstract static class Ticker {
/** Returns the number of nanoseconds since this source's epoch. */
Expand All @@ -225,4 +256,12 @@ private static <T> T checkNotNull(T reference, Object errorMessage) {
}
return reference;
}

private void checkTicker(Deadline other) {
if (ticker != other.ticker) {
throw new AssertionError(
"Tickers (" + ticker + " and " + other.ticker + ") don't match."
+ " Custom Ticker should only be used in tests!");
}
}
}
70 changes: 62 additions & 8 deletions context/src/test/java/io/grpc/DeadlineTest.java
Expand Up @@ -17,6 +17,7 @@
package io.grpc;

import static com.google.common.truth.Truth.assertAbout;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.testing.DeadlineSubject.deadline;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -68,6 +69,18 @@ public void defaultTickerIsSystemTicker() {
assertAbout(deadline()).that(d).isWithin(1, TimeUnit.SECONDS).of(reference);
}

@Test
public void minimum() {
Deadline d1 = Deadline.after(1, TimeUnit.MINUTES, ticker);
Deadline d2 = Deadline.after(2, TimeUnit.MINUTES, ticker);
Deadline d3 = Deadline.after(3, TimeUnit.MINUTES, ticker);

assertThat(d1.minimum(d2)).isSameInstanceAs(d1);
assertThat(d2.minimum(d1)).isSameInstanceAs(d1);
assertThat(d3.minimum(d2)).isSameInstanceAs(d2);
assertThat(d2.minimum(d3)).isSameInstanceAs(d2);
}

@Test
public void timeCanOverflow() {
ticker.reset(Long.MAX_VALUE);
Expand Down Expand Up @@ -208,28 +221,34 @@ public void run() {
assertTrue(executed.get());
}

@Test
public void toString_systemTickerNotShown() {
Deadline d = Deadline.after(0, TimeUnit.MILLISECONDS);
assertThat(d.toString()).endsWith("s from now");
}

@Test
public void toString_exact() {
Deadline d = Deadline.after(0, TimeUnit.MILLISECONDS, ticker);
assertEquals("0s from now", d.toString());
assertEquals("0s from now (ticker=FAKE_TICKER)", d.toString());
}

@Test
public void toString_after() {
Deadline d;

d = Deadline.after(-1, TimeUnit.MINUTES, ticker);
assertEquals("-60s from now", d.toString());
assertEquals("-60s from now (ticker=FAKE_TICKER)", d.toString());
d = Deadline.after(-1, TimeUnit.MILLISECONDS, ticker);
assertEquals("-0.001000000s from now", d.toString());
assertEquals("-0.001000000s from now (ticker=FAKE_TICKER)", d.toString());
d = Deadline.after(-500, TimeUnit.MILLISECONDS, ticker);
assertEquals("-0.500000000s from now", d.toString());
assertEquals("-0.500000000s from now (ticker=FAKE_TICKER)", d.toString());
d = Deadline.after(-1000, TimeUnit.MILLISECONDS, ticker);
assertEquals("-1s from now", d.toString());
assertEquals("-1s from now (ticker=FAKE_TICKER)", d.toString());
d = Deadline.after(-1500, TimeUnit.MILLISECONDS, ticker);
assertEquals("-1.500000000s from now", d.toString());
assertEquals("-1.500000000s from now (ticker=FAKE_TICKER)", d.toString());
d = Deadline.after(-1023456789, TimeUnit.NANOSECONDS, ticker);
assertEquals("-1.023456789s from now", d.toString());
assertEquals("-1.023456789s from now (ticker=FAKE_TICKER)", d.toString());
}

@Test
Expand All @@ -255,10 +274,40 @@ public void compareTo_same() {
Truth.assertThat(d1).isEquivalentAccordingToCompareTo(d2);
}

@Test
public void tickersDontMatch() {
Deadline d1 = Deadline.after(10, TimeUnit.SECONDS);
Deadline d2 = Deadline.after(10, TimeUnit.SECONDS, ticker);
boolean success = false;
try {
d1.compareTo(d2);
success = true;
} catch (AssertionError e) {
// Expected
}
assertFalse(success);

try {
d1.minimum(d2);
success = true;
} catch (AssertionError e) {
// Expected
}
assertFalse(success);

try {
d1.isBefore(d2);
success = true;
} catch (AssertionError e) {
// Expected
}
assertFalse(success);
}

@Test
public void toString_before() {
Deadline d = Deadline.after(12, TimeUnit.MICROSECONDS, ticker);
assertEquals("0.000012000s from now", d.toString());
assertEquals("0.000012000s from now (ticker=FAKE_TICKER)", d.toString());
}

private static class FakeTicker extends Deadline.Ticker {
Expand All @@ -279,5 +328,10 @@ public void increment(long period, TimeUnit unit) {
}
this.time += unit.toNanos(period);
}

@Override
public String toString() {
return "FAKE_TICKER";
}
}
}
19 changes: 19 additions & 0 deletions core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java
Expand Up @@ -19,6 +19,7 @@
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.base.Preconditions;
import io.grpc.Deadline;
import io.grpc.ExperimentalApi;
import io.grpc.ServerStreamTracer;
import io.grpc.internal.AbstractServerImplBuilder;
Expand Down Expand Up @@ -125,6 +126,24 @@ public InProcessServerBuilder scheduledExecutorService(
return this;
}

/**
* Provides a custom deadline ticker that this server will use to create incoming {@link
* Deadline}s.
*
* <p>This is intended for unit tests that fake out the clock. You should also have a fake {@link
* ScheduledExecutorService} whose clock is synchronized with this ticker and set it to {@link
* #scheduledExecutorService}. DO NOT use this in production.
*
* @return this
* @see Deadline#after(long, TimeUnit, Deadline.Ticker)
*
* @since 1.24.0
*/
public InProcessServerBuilder deadlineTicker(Deadline.Ticker ticker) {
setDeadlineTicker(ticker);
return this;
}

/**
* Sets the maximum size of metadata allowed to be received. {@code Integer.MAX_VALUE} disables
* the enforcement. Defaults to no limit ({@code Integer.MAX_VALUE}).
Expand Down
Expand Up @@ -25,6 +25,7 @@
import io.grpc.BindableService;
import io.grpc.CompressorRegistry;
import io.grpc.Context;
import io.grpc.Deadline;
import io.grpc.DecompressorRegistry;
import io.grpc.HandlerRegistry;
import io.grpc.InternalChannelz;
Expand Down Expand Up @@ -78,6 +79,7 @@ public static ServerBuilder<?> forPort(int port) {
DecompressorRegistry decompressorRegistry = DEFAULT_DECOMPRESSOR_REGISTRY;
CompressorRegistry compressorRegistry = DEFAULT_COMPRESSOR_REGISTRY;
long handshakeTimeoutMillis = DEFAULT_HANDSHAKE_TIMEOUT_MILLIS;
Deadline.Ticker ticker = Deadline.getSystemTicker();
@Nullable private CensusStatsModule censusStatsOverride;
private boolean statsEnabled = true;
private boolean recordStartedRpcs = true;
Expand Down Expand Up @@ -216,6 +218,13 @@ protected void setTracingEnabled(boolean value) {
tracingEnabled = value;
}

/**
* Sets a custom deadline ticker. This should only be called from InProcessServerBuilder.
*/
protected void setDeadlineTicker(Deadline.Ticker ticker) {
this.ticker = checkNotNull(ticker, "ticker");
}

@Override
public final Server build() {
ServerImpl server = new ServerImpl(
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/java/io/grpc/internal/ServerImpl.java
Expand Up @@ -16,6 +16,7 @@

package io.grpc.internal;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.grpc.Contexts.statusFromCancelled;
Expand All @@ -33,6 +34,7 @@
import io.grpc.BinaryLog;
import io.grpc.CompressorRegistry;
import io.grpc.Context;
import io.grpc.Deadline;
import io.grpc.Decompressor;
import io.grpc.DecompressorRegistry;
import io.grpc.HandlerRegistry;
Expand Down Expand Up @@ -123,6 +125,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume

private final InternalChannelz channelz;
private final CallTracer serverCallTracer;
private final Deadline.Ticker ticker;

/**
* Construct a server.
Expand Down Expand Up @@ -157,6 +160,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
this.binlog = builder.binlog;
this.channelz = builder.channelz;
this.serverCallTracer = builder.callTracerFactory.create();
this.ticker = checkNotNull(builder.ticker, "ticker");

channelz.addServer(this);
}
Expand Down Expand Up @@ -578,8 +582,10 @@ private Context.CancellableContext createContext(
return baseContext.withCancellation();
}

Context.CancellableContext context = baseContext.withDeadlineAfter(
timeoutNanos, NANOSECONDS, transport.getScheduledExecutorService());
Context.CancellableContext context =
baseContext.withDeadline(
Deadline.after(timeoutNanos, NANOSECONDS, ticker),
transport.getScheduledExecutorService());
final class ServerStreamCancellationListener implements Context.CancellationListener {
@Override
public void cancelled(Context context) {
Expand Down
15 changes: 15 additions & 0 deletions core/src/test/java/io/grpc/internal/FakeClock.java
Expand Up @@ -20,6 +20,7 @@
import com.google.common.base.Supplier;
import com.google.common.base.Ticker;
import com.google.common.util.concurrent.AbstractFuture;
import io.grpc.Deadline;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -61,6 +62,13 @@ public boolean shouldAccept(Runnable command) {
}
};

private final Deadline.Ticker deadlineTicker =
new Deadline.Ticker() {
@Override public long nanoTime() {
return currentTimeNanos;
}
};

private final Supplier<Stopwatch> stopwatchSupplier =
new Supplier<Stopwatch>() {
@Override public Stopwatch get() {
Expand Down Expand Up @@ -229,6 +237,13 @@ public Ticker getTicker() {
return ticker;
}

/**
* Deadline ticker of the FakeClock.
*/
public Deadline.Ticker getDeadlineTicker() {
return deadlineTicker;
}

/**
* Run all due tasks. Immediately due tasks that are queued during the process also get executed.
*
Expand Down

0 comments on commit 4e4d4d8

Please sign in to comment.