Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: allow setting custom Deadline.Ticker to InProcessServerBuilder #6034

Merged
merged 3 commits into from Aug 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
49 changes: 44 additions & 5 deletions context/src/main/java/io/grpc/Deadline.java
Expand Up @@ -44,6 +44,8 @@ public final class Deadline implements Comparable<Deadline> {
* <p>This is <strong>EXPERIMENTAL</strong> API and may subject to change. If you'd like it to be
* stabilized or have any feedback, please
* <href a="https://github.com/grpc/grpc-java/issues/6030">let us know</a>.
*
* @since 1.24.0
*/
public static Ticker getSystemTicker() {
return SYSTEM_TICKER;
Expand All @@ -64,6 +66,11 @@ public static Deadline after(long duration, TimeUnit units) {
/**
* Create a deadline that will expire at the specified offset based on the given {@link Ticker}.
*
* <p><strong>CAUTION</strong>: Only deadlines created with the same {@link Ticker} instance can
* be compared by methods like {@link #minimum}, {@link #isBefore} and {@link #compareTo}. Custom
* Tickers should only be used in tests where you fake out the clock. Always use the {@link
* #getSystemTicker system ticker} in production, or serious errors may occur.
*
* <p>This is <strong>EXPERIMENTAL</strong> API and may subject to change. If you'd like it to be
* stabilized or have any feedback, please
* <href a="https://github.com/grpc/grpc-java/issues/6030">let us know</a>.
Expand All @@ -72,6 +79,8 @@ public static Deadline after(long duration, TimeUnit units) {
* @param units The time unit for the duration.
* @param ticker Where this deadline refer the current time
* @return A new deadline.
*
* @since 1.24.0
*/
public static Deadline after(long duration, TimeUnit units, Ticker ticker) {
checkNotNull(units, "units");
Expand Down Expand Up @@ -111,19 +120,22 @@ public boolean isExpired() {
}

/**
* Is {@code this} deadline before another.
* Is {@code this} deadline before another. Two deadlines must be created using the same {@link
* Ticker}.
*/
public boolean isBefore(Deadline other) {
assert this.ticker == other.ticker : "Tickers don't match";
checkTicker(other);
return this.deadlineNanos - other.deadlineNanos < 0;
}

/**
* Return the minimum deadline of {@code this} or an other deadline.
* Return the minimum deadline of {@code this} or an other deadline. They must be created using
* the same {@link Ticker}.
*
* @param other deadline to compare with {@code this}.
*/
public Deadline minimum(Deadline other) {
assert this.ticker == other.ticker : "Tickers don't match";
checkTicker(other);
return isBefore(other) ? this : other;
}

Expand Down Expand Up @@ -157,6 +169,11 @@ public long timeRemaining(TimeUnit unit) {

/**
* Schedule a task to be run when the deadline expires.
*
* <p>Note if this deadline was created with a custom {@link Ticker}, the {@code scheduler}'s
* underlying clock should be synchronized with that Ticker. Otherwise the task won't be run at
* the expected point of time.
*
* @param task to run on expiration
* @param scheduler used to execute the task
* @return {@link ScheduledFuture} which can be used to cancel execution of the task
Expand All @@ -182,12 +199,20 @@ public String toString() {
buf.append(String.format(".%09d", nanos));
}
buf.append("s from now");
if (ticker != SYSTEM_TICKER) {
buf.append(" (ticker=" + ticker + ")");
}
return buf.toString();
}

/**
* {@inheritDoc}
*
* <p>Both deadlines must be created with the same {@link Ticker}.
*/
@Override
public int compareTo(Deadline that) {
assert this.ticker == that.ticker : "Tickers don't match";
checkTicker(that);
long diff = this.deadlineNanos - that.deadlineNanos;
if (diff < 0) {
return -1;
Expand All @@ -200,12 +225,18 @@ public int compareTo(Deadline that) {
/**
* Time source representing nanoseconds since fixed but arbitrary point in time.
*
* <p>DO NOT use custom {@link Ticker} implementations in production, because deadlines created
* with custom tickers are incompatible with those created with the system ticker. Always use
* the {@link #getSystemTicker system ticker} whenever you need to provide one in production code.
*
* <p>This is <strong>EXPERIMENTAL</strong> API and may subject to change. If you'd like it to be
* stabilized or have any feedback, please
* <href a="https://github.com/grpc/grpc-java/issues/6030">let us know</a>.
*
* <p>In general implementations should be thread-safe, unless it's implemented and used in a
* localized environment (like unit tests) where you are sure the usages are synchronized.
*
* @since 1.24.0
*/
public abstract static class Ticker {
/** Returns the number of nanoseconds since this source's epoch. */
Expand All @@ -225,4 +256,12 @@ private static <T> T checkNotNull(T reference, Object errorMessage) {
}
return reference;
}

private void checkTicker(Deadline other) {
if (ticker != other.ticker) {
throw new AssertionError(
"Tickers (" + ticker + " and " + other.ticker + ") don't match."
+ " Custom Ticker should only be used in tests!");
}
}
}
70 changes: 62 additions & 8 deletions context/src/test/java/io/grpc/DeadlineTest.java
Expand Up @@ -17,6 +17,7 @@
package io.grpc;

import static com.google.common.truth.Truth.assertAbout;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.testing.DeadlineSubject.deadline;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -68,6 +69,18 @@ public void defaultTickerIsSystemTicker() {
assertAbout(deadline()).that(d).isWithin(1, TimeUnit.SECONDS).of(reference);
}

@Test
public void minimum() {
Deadline d1 = Deadline.after(1, TimeUnit.MINUTES, ticker);
Deadline d2 = Deadline.after(2, TimeUnit.MINUTES, ticker);
Deadline d3 = Deadline.after(3, TimeUnit.MINUTES, ticker);

assertThat(d1.minimum(d2)).isSameInstanceAs(d1);
assertThat(d2.minimum(d1)).isSameInstanceAs(d1);
assertThat(d3.minimum(d2)).isSameInstanceAs(d2);
assertThat(d2.minimum(d3)).isSameInstanceAs(d2);
}

@Test
public void timeCanOverflow() {
ticker.reset(Long.MAX_VALUE);
Expand Down Expand Up @@ -208,28 +221,34 @@ public void run() {
assertTrue(executed.get());
}

@Test
public void toString_systemTickerNotShown() {
Deadline d = Deadline.after(0, TimeUnit.MILLISECONDS);
assertThat(d.toString()).endsWith("s from now");
}

@Test
public void toString_exact() {
Deadline d = Deadline.after(0, TimeUnit.MILLISECONDS, ticker);
assertEquals("0s from now", d.toString());
assertEquals("0s from now (ticker=FAKE_TICKER)", d.toString());
}

@Test
public void toString_after() {
Deadline d;

d = Deadline.after(-1, TimeUnit.MINUTES, ticker);
assertEquals("-60s from now", d.toString());
assertEquals("-60s from now (ticker=FAKE_TICKER)", d.toString());
d = Deadline.after(-1, TimeUnit.MILLISECONDS, ticker);
assertEquals("-0.001000000s from now", d.toString());
assertEquals("-0.001000000s from now (ticker=FAKE_TICKER)", d.toString());
d = Deadline.after(-500, TimeUnit.MILLISECONDS, ticker);
assertEquals("-0.500000000s from now", d.toString());
assertEquals("-0.500000000s from now (ticker=FAKE_TICKER)", d.toString());
d = Deadline.after(-1000, TimeUnit.MILLISECONDS, ticker);
assertEquals("-1s from now", d.toString());
assertEquals("-1s from now (ticker=FAKE_TICKER)", d.toString());
d = Deadline.after(-1500, TimeUnit.MILLISECONDS, ticker);
assertEquals("-1.500000000s from now", d.toString());
assertEquals("-1.500000000s from now (ticker=FAKE_TICKER)", d.toString());
d = Deadline.after(-1023456789, TimeUnit.NANOSECONDS, ticker);
assertEquals("-1.023456789s from now", d.toString());
assertEquals("-1.023456789s from now (ticker=FAKE_TICKER)", d.toString());
}

@Test
Expand All @@ -255,10 +274,40 @@ public void compareTo_same() {
Truth.assertThat(d1).isEquivalentAccordingToCompareTo(d2);
}

@Test
public void tickersDontMatch() {
Deadline d1 = Deadline.after(10, TimeUnit.SECONDS);
Deadline d2 = Deadline.after(10, TimeUnit.SECONDS, ticker);
boolean success = false;
try {
d1.compareTo(d2);
success = true;
} catch (AssertionError e) {
// Expected
}
assertFalse(success);

try {
d1.minimum(d2);
success = true;
} catch (AssertionError e) {
// Expected
}
assertFalse(success);

try {
d1.isBefore(d2);
success = true;
} catch (AssertionError e) {
// Expected
}
assertFalse(success);
}

@Test
public void toString_before() {
Deadline d = Deadline.after(12, TimeUnit.MICROSECONDS, ticker);
assertEquals("0.000012000s from now", d.toString());
assertEquals("0.000012000s from now (ticker=FAKE_TICKER)", d.toString());
}

private static class FakeTicker extends Deadline.Ticker {
Expand All @@ -279,5 +328,10 @@ public void increment(long period, TimeUnit unit) {
}
this.time += unit.toNanos(period);
}

@Override
public String toString() {
return "FAKE_TICKER";
}
}
}
19 changes: 19 additions & 0 deletions core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java
Expand Up @@ -19,6 +19,7 @@
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.base.Preconditions;
import io.grpc.Deadline;
import io.grpc.ExperimentalApi;
import io.grpc.ServerStreamTracer;
import io.grpc.internal.AbstractServerImplBuilder;
Expand Down Expand Up @@ -125,6 +126,24 @@ public InProcessServerBuilder scheduledExecutorService(
return this;
}

/**
* Provides a custom deadline ticker that this server will use to create incoming {@link
* Deadline}s.
*
* <p>This is intended for unit tests that fake out the clock. You should also have a fake {@link
* ScheduledExecutorService} whose clock is synchronized with this ticker and set it to {@link
* #scheduledExecutorService}. DO NOT use this in production.
*
* @return this
* @see Deadline#after(long, TimeUnit, Deadline.Ticker)
*
* @since 1.24.0
*/
public InProcessServerBuilder deadlineTicker(Deadline.Ticker ticker) {
setDeadlineTicker(ticker);
return this;
}

/**
* Sets the maximum size of metadata allowed to be received. {@code Integer.MAX_VALUE} disables
* the enforcement. Defaults to no limit ({@code Integer.MAX_VALUE}).
Expand Down
Expand Up @@ -25,6 +25,7 @@
import io.grpc.BindableService;
import io.grpc.CompressorRegistry;
import io.grpc.Context;
import io.grpc.Deadline;
import io.grpc.DecompressorRegistry;
import io.grpc.HandlerRegistry;
import io.grpc.InternalChannelz;
Expand Down Expand Up @@ -78,6 +79,7 @@ public static ServerBuilder<?> forPort(int port) {
DecompressorRegistry decompressorRegistry = DEFAULT_DECOMPRESSOR_REGISTRY;
CompressorRegistry compressorRegistry = DEFAULT_COMPRESSOR_REGISTRY;
long handshakeTimeoutMillis = DEFAULT_HANDSHAKE_TIMEOUT_MILLIS;
Deadline.Ticker ticker = Deadline.getSystemTicker();
@Nullable private CensusStatsModule censusStatsOverride;
private boolean statsEnabled = true;
private boolean recordStartedRpcs = true;
Expand Down Expand Up @@ -216,6 +218,13 @@ protected void setTracingEnabled(boolean value) {
tracingEnabled = value;
}

/**
* Sets a custom deadline ticker. This should only be called from InProcessServerBuilder.
*/
protected void setDeadlineTicker(Deadline.Ticker ticker) {
this.ticker = checkNotNull(ticker, "ticker");
}

@Override
public final Server build() {
ServerImpl server = new ServerImpl(
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/java/io/grpc/internal/ServerImpl.java
Expand Up @@ -16,6 +16,7 @@

package io.grpc.internal;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.grpc.Contexts.statusFromCancelled;
Expand All @@ -33,6 +34,7 @@
import io.grpc.BinaryLog;
import io.grpc.CompressorRegistry;
import io.grpc.Context;
import io.grpc.Deadline;
import io.grpc.Decompressor;
import io.grpc.DecompressorRegistry;
import io.grpc.HandlerRegistry;
Expand Down Expand Up @@ -123,6 +125,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume

private final InternalChannelz channelz;
private final CallTracer serverCallTracer;
private final Deadline.Ticker ticker;

/**
* Construct a server.
Expand Down Expand Up @@ -157,6 +160,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
this.binlog = builder.binlog;
this.channelz = builder.channelz;
this.serverCallTracer = builder.callTracerFactory.create();
this.ticker = checkNotNull(builder.ticker, "ticker");

channelz.addServer(this);
}
Expand Down Expand Up @@ -578,8 +582,10 @@ private Context.CancellableContext createContext(
return baseContext.withCancellation();
}

Context.CancellableContext context = baseContext.withDeadlineAfter(
timeoutNanos, NANOSECONDS, transport.getScheduledExecutorService());
Context.CancellableContext context =
baseContext.withDeadline(
Deadline.after(timeoutNanos, NANOSECONDS, ticker),
transport.getScheduledExecutorService());
final class ServerStreamCancellationListener implements Context.CancellationListener {
@Override
public void cancelled(Context context) {
Expand Down
15 changes: 15 additions & 0 deletions core/src/test/java/io/grpc/internal/FakeClock.java
Expand Up @@ -20,6 +20,7 @@
import com.google.common.base.Supplier;
import com.google.common.base.Ticker;
import com.google.common.util.concurrent.AbstractFuture;
import io.grpc.Deadline;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -61,6 +62,13 @@ public boolean shouldAccept(Runnable command) {
}
};

private final Deadline.Ticker deadlineTicker =
new Deadline.Ticker() {
@Override public long nanoTime() {
return currentTimeNanos;
}
};

private final Supplier<Stopwatch> stopwatchSupplier =
new Supplier<Stopwatch>() {
@Override public Stopwatch get() {
Expand Down Expand Up @@ -229,6 +237,13 @@ public Ticker getTicker() {
return ticker;
}

/**
* Deadline ticker of the FakeClock.
*/
public Deadline.Ticker getDeadlineTicker() {
return deadlineTicker;
}

/**
* Run all due tasks. Immediately due tasks that are queued during the process also get executed.
*
Expand Down