Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Commit

Permalink
feat: implemented watchdog as BackgroundResource (#838)
Browse files Browse the repository at this point in the history
* feat: implemented watchdog as BackgroundResource

This change updates watchdog to BackgroundResource, Also, watchdog unschedules itself explicitly when shutdown.

closes #828

* updated test cases to increase coverage percentage.

* Moved Watchdog scheduling to start() method

* added javadoc for Watchdog.create()
  • Loading branch information
rahulKQL authored and igorbernstein2 committed Jan 7, 2020
1 parent f18673a commit 90f3336
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 20 deletions.
3 changes: 3 additions & 0 deletions gax/src/main/java/com/google/api/gax/rpc/ClientContext.java
Expand Up @@ -185,6 +185,9 @@ public static ClientContext create(StubSettings settings) throws IOException {
if (executorProvider.shouldAutoClose()) {
backgroundResources.add(new ExecutorAsBackgroundResource(executor));
}
if (watchdogProvider != null && watchdogProvider.shouldAutoClose()) {
backgroundResources.add(watchdog);
}

return newBuilder()
.setBackgroundResources(backgroundResources.build())
Expand Down
Expand Up @@ -82,4 +82,9 @@ public WatchdogProvider withExecutor(ScheduledExecutorService executor) {
public Watchdog getWatchdog() {
return watchdog;
}

@Override
public boolean shouldAutoClose() {
return false;
}
}
Expand Up @@ -33,7 +33,6 @@
import com.google.api.core.BetaApi;
import com.google.common.base.Preconditions;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;
Expand Down Expand Up @@ -103,10 +102,11 @@ public Watchdog getWatchdog() {
return null;
}

Watchdog watchdog = new Watchdog(clock);
executor.scheduleAtFixedRate(
watchdog, checkInterval.toMillis(), checkInterval.toMillis(), TimeUnit.MILLISECONDS);
return Watchdog.create(clock, checkInterval, executor);
}

return watchdog;
@Override
public boolean shouldAutoClose() {
return true;
}
}
59 changes: 57 additions & 2 deletions gax/src/main/java/com/google/api/gax/rpc/Watchdog.java
Expand Up @@ -31,11 +31,15 @@

import com.google.api.core.ApiClock;
import com.google.api.core.InternalApi;
import com.google.api.gax.core.BackgroundResource;
import com.google.common.base.Preconditions;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
import org.threeten.bp.Duration;
Expand All @@ -56,15 +60,34 @@
* </ul>
*/
@InternalApi
public class Watchdog implements Runnable {
public class Watchdog implements Runnable, BackgroundResource {
// Dummy value to convert the ConcurrentHashMap into a Set
private static Object PRESENT = new Object();
private final ConcurrentHashMap<WatchdogStream, Object> openStreams = new ConcurrentHashMap<>();

private final ApiClock clock;
private final Duration scheduleInterval;
private final ScheduledExecutorService executor;
private ScheduledFuture<?> future;

/** returns a Watchdog which is scheduled at the provided interval. */
public static Watchdog create(
ApiClock clock, Duration scheduleInterval, ScheduledExecutorService executor) {
Watchdog watchdog = new Watchdog(clock, scheduleInterval, executor);
watchdog.start();
return watchdog;
}

public Watchdog(ApiClock clock) {
private Watchdog(ApiClock clock, Duration scheduleInterval, ScheduledExecutorService executor) {
this.clock = Preconditions.checkNotNull(clock, "clock can't be null");
this.scheduleInterval = scheduleInterval;
this.executor = executor;
}

private void start() {
future =
executor.scheduleAtFixedRate(
this, scheduleInterval.toMillis(), scheduleInterval.toMillis(), TimeUnit.MILLISECONDS);
}

/** Wraps the target observer with timing constraints. */
Expand Down Expand Up @@ -98,6 +121,38 @@ public void run() {
}
}

@Override
public void shutdown() {
future.cancel(false);
executor.shutdown();
}

@Override
public boolean isShutdown() {
return executor.isShutdown();
}

@Override
public boolean isTerminated() {
return executor.isTerminated();
}

@Override
public void shutdownNow() {
future.cancel(true);
executor.shutdownNow();
}

@Override
public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException {
return executor.awaitTermination(duration, unit);
}

@Override
public void close() {
shutdown();
}

enum State {
/** Stream has been started, but doesn't have any outstanding requests. */
IDLE,
Expand Down
Expand Up @@ -50,4 +50,6 @@ public interface WatchdogProvider {
WatchdogProvider withExecutor(ScheduledExecutorService executor);

Watchdog getWatchdog();

boolean shouldAutoClose();
}
39 changes: 39 additions & 0 deletions gax/src/test/java/com/google/api/gax/rpc/ClientContextTest.java
Expand Up @@ -29,6 +29,8 @@
*/
package com.google.api.gax.rpc;

import static com.google.common.truth.Truth.assertThat;

import com.google.api.core.ApiClock;
import com.google.api.gax.core.BackgroundResource;
import com.google.api.gax.core.ExecutorProvider;
Expand Down Expand Up @@ -287,4 +289,41 @@ private void runTest(
Truth.assertThat(executor.shutdownCalled).isEqualTo(shouldAutoClose);
}
}

@Test
public void testWatchdogProvider() throws IOException {
FakeClientSettings.Builder builder = new FakeClientSettings.Builder();

InterceptingExecutor executor = new InterceptingExecutor(1);
FakeTransportChannel transportChannel = FakeTransportChannel.create(new FakeChannel());
FakeTransportProvider transportProvider =
new FakeTransportProvider(transportChannel, executor, true, null, null);
ApiClock clock = Mockito.mock(ApiClock.class);

builder.setClock(clock);
builder.setCredentialsProvider(
FixedCredentialsProvider.create(Mockito.mock(Credentials.class)));
builder.setExecutorProvider(new FakeExecutorProvider(executor, true));
builder.setTransportChannelProvider(transportProvider);

Duration watchdogCheckInterval = Duration.ofSeconds(11);
builder.setWatchdogProvider(
InstantiatingWatchdogProvider.create()
.withClock(clock)
.withCheckInterval(watchdogCheckInterval)
.withExecutor(executor));
builder.setWatchdogCheckInterval(watchdogCheckInterval);

HeaderProvider headerProvider = Mockito.mock(HeaderProvider.class);
Mockito.when(headerProvider.getHeaders()).thenReturn(ImmutableMap.of("k1", "v1"));
HeaderProvider internalHeaderProvider = Mockito.mock(HeaderProvider.class);

Mockito.when(internalHeaderProvider.getHeaders()).thenReturn(ImmutableMap.of("k2", "v2"));
builder.setHeaderProvider(headerProvider);
builder.setInternalHeaderProvider(internalHeaderProvider);

ClientContext context = ClientContext.create(builder.build());
List<BackgroundResource> resources = context.getBackgroundResources();
assertThat(resources.get(2)).isInstanceOf(Watchdog.class);
}
}
Expand Up @@ -61,6 +61,7 @@ public void testNoModifications() {
assertThat(provider.needsCheckInterval()).isFalse();
assertThat(provider.needsClock()).isFalse();
assertThat(provider.needsExecutor()).isFalse();
assertThat(provider.shouldAutoClose()).isFalse();

Throwable actualError = null;
try {
Expand Down
Expand Up @@ -60,6 +60,8 @@ public void happyPath() {
assertThat(provider.needsCheckInterval()).isTrue();
provider = provider.withCheckInterval(checkInterval);

assertThat(provider.shouldAutoClose()).isTrue();

Watchdog watchdog = provider.getWatchdog();
Mockito.verify(executor)
.scheduleAtFixedRate(
Expand Down
64 changes: 51 additions & 13 deletions gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java
Expand Up @@ -29,25 +29,34 @@
*/
package com.google.api.gax.rpc;

import static com.google.common.truth.Truth.assertThat;

import com.google.api.core.SettableApiFuture;
import com.google.api.gax.core.BackgroundResource;
import com.google.api.gax.core.FakeApiClock;
import com.google.api.gax.rpc.testing.MockStreamingApi.MockServerStreamingCall;
import com.google.api.gax.rpc.testing.MockStreamingApi.MockServerStreamingCallable;
import com.google.common.collect.Queues;
import com.google.common.truth.Truth;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;
import org.threeten.bp.Duration;

@RunWith(JUnit4.class)
public class WatchdogTest {
private static final ScheduledExecutorService EXECUTOR = Executors.newScheduledThreadPool(1);

private FakeApiClock clock;
private final Duration checkInterval = Duration.ofMillis(1000);
private Duration waitTime = Duration.ofSeconds(10);
private Duration idleTime = Duration.ofMinutes(5);

Expand All @@ -59,7 +68,7 @@ public class WatchdogTest {
@Before
public void setUp() {
clock = new FakeApiClock(0);
watchdog = new Watchdog(clock);
watchdog = Watchdog.create(clock, checkInterval, EXECUTOR);

callable = new MockServerStreamingCallable<>();
innerObserver = new AccumulatingObserver<>();
Expand All @@ -70,7 +79,7 @@ public void setUp() {
@Test
public void testRequestPassthrough() throws Exception {
innerObserver.controller.get().request(1);
Truth.assertThat(call.getController().popLastPull()).isEqualTo(1);
assertThat(call.getController().popLastPull()).isEqualTo(1);
}

@Test
Expand All @@ -79,11 +88,11 @@ public void testWaitTimeout() throws Exception {

clock.incrementNanoTime(waitTime.toNanos() - 1);
watchdog.run();
Truth.assertThat(call.getController().isCancelled()).isFalse();
assertThat(call.getController().isCancelled()).isFalse();

clock.incrementNanoTime(1);
watchdog.run();
Truth.assertThat(call.getController().isCancelled()).isTrue();
assertThat(call.getController().isCancelled()).isTrue();
call.getController()
.getObserver()
.onError(new RuntimeException("Some upstream exception representing cancellation"));
Expand All @@ -94,18 +103,18 @@ public void testWaitTimeout() throws Exception {
} catch (ExecutionException t) {
actualError = t.getCause();
}
Truth.assertThat(actualError).isInstanceOf(WatchdogTimeoutException.class);
assertThat(actualError).isInstanceOf(WatchdogTimeoutException.class);
}

@Test
public void testIdleTimeout() throws InterruptedException {
clock.incrementNanoTime(idleTime.toNanos() - 1);
watchdog.run();
Truth.assertThat(call.getController().isCancelled()).isFalse();
assertThat(call.getController().isCancelled()).isFalse();

clock.incrementNanoTime(1);
watchdog.run();
Truth.assertThat(call.getController().isCancelled()).isTrue();
assertThat(call.getController().isCancelled()).isTrue();
call.getController()
.getObserver()
.onError(new RuntimeException("Some upstream exception representing cancellation"));
Expand All @@ -116,7 +125,7 @@ public void testIdleTimeout() throws InterruptedException {
} catch (ExecutionException t) {
actualError = t.getCause();
}
Truth.assertThat(actualError).isInstanceOf(WatchdogTimeoutException.class);
assertThat(actualError).isInstanceOf(WatchdogTimeoutException.class);
}

@Test
Expand All @@ -141,20 +150,49 @@ public void testMultiple() throws Exception {
watchdog.run();

// Call1 should be ok
Truth.assertThat(call1.getController().isCancelled()).isFalse();
assertThat(call1.getController().isCancelled()).isFalse();
// Should not throw
Truth.assertThat(downstreamObserver1.done.isDone()).isFalse();
assertThat(downstreamObserver1.done.isDone()).isFalse();

// Call2 should be timed out
Truth.assertThat(call2.getController().isCancelled()).isTrue();
assertThat(call2.getController().isCancelled()).isTrue();
call2.getController().getObserver().onError(new CancellationException("User cancelled"));
Throwable error = null;
try {
downstreamObserver2.done.get();
} catch (ExecutionException t) {
error = t.getCause();
}
Truth.assertThat(error).isInstanceOf(WatchdogTimeoutException.class);
assertThat(error).isInstanceOf(WatchdogTimeoutException.class);
}

@Test
@SuppressWarnings("unchecked")
public void testWatchdogBeingClosed() {
ScheduledFuture future = Mockito.mock(ScheduledFuture.class);
ScheduledExecutorService mockExecutor = Mockito.mock(ScheduledExecutorService.class);
Mockito.when(
mockExecutor.scheduleAtFixedRate(
Mockito.any(Watchdog.class),
Mockito.anyLong(),
Mockito.anyLong(),
Mockito.any(TimeUnit.class)))
.thenReturn(future);
Watchdog underTest = Watchdog.create(clock, checkInterval, mockExecutor);
assertThat(underTest).isInstanceOf(BackgroundResource.class);

underTest.close();
underTest.shutdown();

Mockito.verify(mockExecutor)
.scheduleAtFixedRate(
underTest, checkInterval.toMillis(), checkInterval.toMillis(), TimeUnit.MILLISECONDS);
Mockito.verify(future, Mockito.times(2)).cancel(false);
Mockito.verify(mockExecutor, Mockito.times(2)).shutdown();

underTest.shutdownNow();
Mockito.verify(future).cancel(true);
Mockito.verify(mockExecutor).shutdownNow();
}

static class AccumulatingObserver<T> implements ResponseObserver<T> {
Expand Down

0 comments on commit 90f3336

Please sign in to comment.