From 50bf647c4fcf73fd5593db2d46d53cd97384e66f Mon Sep 17 00:00:00 2001 From: Rahul Kesharwani Date: Thu, 2 Jan 2020 18:08:00 +0530 Subject: [PATCH 1/4] feat: implemented watchdog as BackgroundResource This change updates watchdog to BackgroundResource, Also, watchdog unschedules itself explicitly when shutdown. closes #828 --- .../com/google/api/gax/rpc/ClientContext.java | 20 ++++--- .../api/gax/rpc/FixedWatchdogProvider.java | 5 ++ .../rpc/InstantiatingWatchdogProvider.java | 10 ++-- .../java/com/google/api/gax/rpc/Watchdog.java | 46 ++++++++++++++- .../google/api/gax/rpc/WatchdogProvider.java | 2 + .../google/api/gax/rpc/ClientContextTest.java | 37 ++++++++++++ .../com/google/api/gax/rpc/WatchdogTest.java | 58 ++++++++++++++----- 7 files changed, 150 insertions(+), 28 deletions(-) diff --git a/gax/src/main/java/com/google/api/gax/rpc/ClientContext.java b/gax/src/main/java/com/google/api/gax/rpc/ClientContext.java index 391f3d959..3d30dcd41 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/ClientContext.java +++ b/gax/src/main/java/com/google/api/gax/rpc/ClientContext.java @@ -160,6 +160,15 @@ public static ClientContext create(StubSettings settings) throws IOException { defaultCallContext = defaultCallContext.withCredentials(credentials); } + ImmutableList.Builder backgroundResources = ImmutableList.builder(); + + if (transportChannelProvider.shouldAutoClose()) { + backgroundResources.add(transportChannel); + } + if (executorProvider.shouldAutoClose()) { + backgroundResources.add(new ExecutorAsBackgroundResource(executor)); + } + WatchdogProvider watchdogProvider = settings.getStreamWatchdogProvider(); @Nullable Watchdog watchdog = null; @@ -175,15 +184,10 @@ public static ClientContext create(StubSettings settings) throws IOException { watchdogProvider = watchdogProvider.withExecutor(executor); } watchdog = watchdogProvider.getWatchdog(); - } - - ImmutableList.Builder backgroundResources = ImmutableList.builder(); - if (transportChannelProvider.shouldAutoClose()) { - backgroundResources.add(transportChannel); - } - if (executorProvider.shouldAutoClose()) { - backgroundResources.add(new ExecutorAsBackgroundResource(executor)); + if (watchdogProvider.shouldAutoClose()) { + backgroundResources.add(watchdog); + } } return newBuilder() diff --git a/gax/src/main/java/com/google/api/gax/rpc/FixedWatchdogProvider.java b/gax/src/main/java/com/google/api/gax/rpc/FixedWatchdogProvider.java index 1533df71e..e22f14122 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/FixedWatchdogProvider.java +++ b/gax/src/main/java/com/google/api/gax/rpc/FixedWatchdogProvider.java @@ -82,4 +82,9 @@ public WatchdogProvider withExecutor(ScheduledExecutorService executor) { public Watchdog getWatchdog() { return watchdog; } + + @Override + public boolean shouldAutoClose() { + return false; + } } diff --git a/gax/src/main/java/com/google/api/gax/rpc/InstantiatingWatchdogProvider.java b/gax/src/main/java/com/google/api/gax/rpc/InstantiatingWatchdogProvider.java index b4f7c4f57..a0fe416ea 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/InstantiatingWatchdogProvider.java +++ b/gax/src/main/java/com/google/api/gax/rpc/InstantiatingWatchdogProvider.java @@ -33,7 +33,6 @@ import com.google.api.core.BetaApi; import com.google.common.base.Preconditions; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.threeten.bp.Duration; @@ -103,10 +102,11 @@ public Watchdog getWatchdog() { return null; } - Watchdog watchdog = new Watchdog(clock); - executor.scheduleAtFixedRate( - watchdog, checkInterval.toMillis(), checkInterval.toMillis(), TimeUnit.MILLISECONDS); + return new Watchdog(clock, checkInterval, executor); + } - return watchdog; + @Override + public boolean shouldAutoClose() { + return true; } } diff --git a/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java b/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java index 9ffff65e9..ea6d4974d 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java +++ b/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java @@ -31,11 +31,15 @@ import com.google.api.core.ApiClock; import com.google.api.core.InternalApi; +import com.google.api.gax.core.BackgroundResource; import com.google.common.base.Preconditions; import java.util.Iterator; import java.util.Map.Entry; import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import javax.annotation.concurrent.GuardedBy; import org.threeten.bp.Duration; @@ -56,15 +60,21 @@ * */ @InternalApi -public class Watchdog implements Runnable { +public class Watchdog implements Runnable, BackgroundResource { // Dummy value to convert the ConcurrentHashMap into a Set private static Object PRESENT = new Object(); private final ConcurrentHashMap openStreams = new ConcurrentHashMap<>(); private final ApiClock clock; + private final ScheduledExecutorService executor; + private final ScheduledFuture future; - public Watchdog(ApiClock clock) { + public Watchdog(ApiClock clock, Duration interval, ScheduledExecutorService executor) { this.clock = Preconditions.checkNotNull(clock, "clock can't be null"); + this.executor = executor; + this.future = + executor.scheduleAtFixedRate( + this, interval.toMillis(), interval.toMillis(), TimeUnit.MILLISECONDS); } /** Wraps the target observer with timing constraints. */ @@ -98,6 +108,38 @@ public void run() { } } + @Override + public void shutdown() { + future.cancel(true); + executor.shutdown(); + } + + @Override + public boolean isShutdown() { + return executor.isShutdown(); + } + + @Override + public boolean isTerminated() { + return executor.isTerminated(); + } + + @Override + public void shutdownNow() { + future.cancel(true); + executor.shutdownNow(); + } + + @Override + public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException { + return executor.awaitTermination(duration, unit); + } + + @Override + public void close() throws Exception { + shutdown(); + } + enum State { /** Stream has been started, but doesn't have any outstanding requests. */ IDLE, diff --git a/gax/src/main/java/com/google/api/gax/rpc/WatchdogProvider.java b/gax/src/main/java/com/google/api/gax/rpc/WatchdogProvider.java index 14126c26d..a1fa227e8 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/WatchdogProvider.java +++ b/gax/src/main/java/com/google/api/gax/rpc/WatchdogProvider.java @@ -50,4 +50,6 @@ public interface WatchdogProvider { WatchdogProvider withExecutor(ScheduledExecutorService executor); Watchdog getWatchdog(); + + boolean shouldAutoClose(); } diff --git a/gax/src/test/java/com/google/api/gax/rpc/ClientContextTest.java b/gax/src/test/java/com/google/api/gax/rpc/ClientContextTest.java index 2d0830a5e..ac01cc6ca 100644 --- a/gax/src/test/java/com/google/api/gax/rpc/ClientContextTest.java +++ b/gax/src/test/java/com/google/api/gax/rpc/ClientContextTest.java @@ -287,4 +287,41 @@ private void runTest( Truth.assertThat(executor.shutdownCalled).isEqualTo(shouldAutoClose); } } + + @Test + public void testWatchdogProvider() throws IOException { + FakeClientSettings.Builder builder = new FakeClientSettings.Builder(); + + InterceptingExecutor executor = new InterceptingExecutor(1); + FakeTransportChannel transportChannel = FakeTransportChannel.create(new FakeChannel()); + FakeTransportProvider transportProvider = + new FakeTransportProvider(transportChannel, executor, true, null, null); + ApiClock clock = Mockito.mock(ApiClock.class); + + builder.setClock(clock); + builder.setCredentialsProvider( + FixedCredentialsProvider.create(Mockito.mock(Credentials.class))); + builder.setExecutorProvider(new FakeExecutorProvider(executor, true)); + builder.setTransportChannelProvider(transportProvider); + + Duration watchdogCheckInterval = Duration.ofSeconds(11); + builder.setWatchdogProvider( + InstantiatingWatchdogProvider.create() + .withClock(clock) + .withCheckInterval(watchdogCheckInterval) + .withExecutor(executor)); + builder.setWatchdogCheckInterval(watchdogCheckInterval); + + HeaderProvider headerProvider = Mockito.mock(HeaderProvider.class); + Mockito.when(headerProvider.getHeaders()).thenReturn(ImmutableMap.of("k1", "v1")); + HeaderProvider internalHeaderProvider = Mockito.mock(HeaderProvider.class); + + Mockito.when(internalHeaderProvider.getHeaders()).thenReturn(ImmutableMap.of("k2", "v2")); + builder.setHeaderProvider(headerProvider); + builder.setInternalHeaderProvider(internalHeaderProvider); + + ClientContext context = ClientContext.create(builder.build()); + List resources = context.getBackgroundResources(); + Truth.assertThat(resources.get(2)).isInstanceOf(Watchdog.class); + } } diff --git a/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java b/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java index f1d4be8ff..a63a6719d 100644 --- a/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java +++ b/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java @@ -29,25 +29,34 @@ */ package com.google.api.gax.rpc; +import static com.google.common.truth.Truth.assertThat; + import com.google.api.core.SettableApiFuture; +import com.google.api.gax.core.BackgroundResource; import com.google.api.gax.core.FakeApiClock; import com.google.api.gax.rpc.testing.MockStreamingApi.MockServerStreamingCall; import com.google.api.gax.rpc.testing.MockStreamingApi.MockServerStreamingCallable; import com.google.common.collect.Queues; -import com.google.common.truth.Truth; import java.util.Queue; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.Mockito; import org.threeten.bp.Duration; @RunWith(JUnit4.class) public class WatchdogTest { + private static final ScheduledExecutorService EXECUTOR = Executors.newScheduledThreadPool(1); + private FakeApiClock clock; + private final Duration checkInterval = Duration.ofMillis(1000); private Duration waitTime = Duration.ofSeconds(10); private Duration idleTime = Duration.ofMinutes(5); @@ -59,7 +68,7 @@ public class WatchdogTest { @Before public void setUp() { clock = new FakeApiClock(0); - watchdog = new Watchdog(clock); + watchdog = new Watchdog(clock, checkInterval, EXECUTOR); callable = new MockServerStreamingCallable<>(); innerObserver = new AccumulatingObserver<>(); @@ -70,7 +79,7 @@ public void setUp() { @Test public void testRequestPassthrough() throws Exception { innerObserver.controller.get().request(1); - Truth.assertThat(call.getController().popLastPull()).isEqualTo(1); + assertThat(call.getController().popLastPull()).isEqualTo(1); } @Test @@ -79,11 +88,11 @@ public void testWaitTimeout() throws Exception { clock.incrementNanoTime(waitTime.toNanos() - 1); watchdog.run(); - Truth.assertThat(call.getController().isCancelled()).isFalse(); + assertThat(call.getController().isCancelled()).isFalse(); clock.incrementNanoTime(1); watchdog.run(); - Truth.assertThat(call.getController().isCancelled()).isTrue(); + assertThat(call.getController().isCancelled()).isTrue(); call.getController() .getObserver() .onError(new RuntimeException("Some upstream exception representing cancellation")); @@ -94,18 +103,18 @@ public void testWaitTimeout() throws Exception { } catch (ExecutionException t) { actualError = t.getCause(); } - Truth.assertThat(actualError).isInstanceOf(WatchdogTimeoutException.class); + assertThat(actualError).isInstanceOf(WatchdogTimeoutException.class); } @Test public void testIdleTimeout() throws InterruptedException { clock.incrementNanoTime(idleTime.toNanos() - 1); watchdog.run(); - Truth.assertThat(call.getController().isCancelled()).isFalse(); + assertThat(call.getController().isCancelled()).isFalse(); clock.incrementNanoTime(1); watchdog.run(); - Truth.assertThat(call.getController().isCancelled()).isTrue(); + assertThat(call.getController().isCancelled()).isTrue(); call.getController() .getObserver() .onError(new RuntimeException("Some upstream exception representing cancellation")); @@ -116,7 +125,7 @@ public void testIdleTimeout() throws InterruptedException { } catch (ExecutionException t) { actualError = t.getCause(); } - Truth.assertThat(actualError).isInstanceOf(WatchdogTimeoutException.class); + assertThat(actualError).isInstanceOf(WatchdogTimeoutException.class); } @Test @@ -141,12 +150,12 @@ public void testMultiple() throws Exception { watchdog.run(); // Call1 should be ok - Truth.assertThat(call1.getController().isCancelled()).isFalse(); + assertThat(call1.getController().isCancelled()).isFalse(); // Should not throw - Truth.assertThat(downstreamObserver1.done.isDone()).isFalse(); + assertThat(downstreamObserver1.done.isDone()).isFalse(); // Call2 should be timed out - Truth.assertThat(call2.getController().isCancelled()).isTrue(); + assertThat(call2.getController().isCancelled()).isTrue(); call2.getController().getObserver().onError(new CancellationException("User cancelled")); Throwable error = null; try { @@ -154,7 +163,30 @@ public void testMultiple() throws Exception { } catch (ExecutionException t) { error = t.getCause(); } - Truth.assertThat(error).isInstanceOf(WatchdogTimeoutException.class); + assertThat(error).isInstanceOf(WatchdogTimeoutException.class); + } + + @Test + public void testWatchdogClosed() throws Exception { + ScheduledFuture future = Mockito.mock(ScheduledFuture.class); + ScheduledExecutorService mockExecutor = Mockito.mock(ScheduledExecutorService.class); + Mockito.when( + mockExecutor.scheduleAtFixedRate( + Mockito.any(Watchdog.class), + Mockito.anyLong(), + Mockito.anyLong(), + Mockito.any(TimeUnit.class))) + .thenReturn(future); + Watchdog underTest = new Watchdog(clock, checkInterval, mockExecutor); + assertThat(underTest).isInstanceOf(BackgroundResource.class); + + underTest.close(); + underTest.shutdown(); + + Mockito.verify(mockExecutor) + .scheduleAtFixedRate( + underTest, checkInterval.toMillis(), checkInterval.toMillis(), TimeUnit.MILLISECONDS); + Mockito.verify(mockExecutor, Mockito.times(2)).shutdown(); } static class AccumulatingObserver implements ResponseObserver { From 141e4d946215ad9cc81f3ffe78e0e67631d93e25 Mon Sep 17 00:00:00 2001 From: Rahul Kesharwani Date: Fri, 3 Jan 2020 17:52:49 +0530 Subject: [PATCH 2/4] updated test cases to increase coverage percentage. --- gax/src/main/java/com/google/api/gax/rpc/Watchdog.java | 8 ++++---- .../java/com/google/api/gax/rpc/ClientContextTest.java | 4 +++- .../com/google/api/gax/rpc/FixedWatchdogProviderTest.java | 1 + .../api/gax/rpc/InstantiatingWatchdogProviderTest.java | 2 ++ .../test/java/com/google/api/gax/rpc/WatchdogTest.java | 7 ++++++- 5 files changed, 16 insertions(+), 6 deletions(-) diff --git a/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java b/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java index ea6d4974d..15d34ffef 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java +++ b/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java @@ -69,12 +69,12 @@ public class Watchdog implements Runnable, BackgroundResource { private final ScheduledExecutorService executor; private final ScheduledFuture future; - public Watchdog(ApiClock clock, Duration interval, ScheduledExecutorService executor) { + public Watchdog(ApiClock clock, Duration scheduleInterval, ScheduledExecutorService executor) { this.clock = Preconditions.checkNotNull(clock, "clock can't be null"); this.executor = executor; this.future = executor.scheduleAtFixedRate( - this, interval.toMillis(), interval.toMillis(), TimeUnit.MILLISECONDS); + this, scheduleInterval.toMillis(), scheduleInterval.toMillis(), TimeUnit.MILLISECONDS); } /** Wraps the target observer with timing constraints. */ @@ -110,7 +110,7 @@ public void run() { @Override public void shutdown() { - future.cancel(true); + future.cancel(false); executor.shutdown(); } @@ -136,7 +136,7 @@ public boolean awaitTermination(long duration, TimeUnit unit) throws Interrupted } @Override - public void close() throws Exception { + public void close() { shutdown(); } diff --git a/gax/src/test/java/com/google/api/gax/rpc/ClientContextTest.java b/gax/src/test/java/com/google/api/gax/rpc/ClientContextTest.java index ac01cc6ca..17529fcb2 100644 --- a/gax/src/test/java/com/google/api/gax/rpc/ClientContextTest.java +++ b/gax/src/test/java/com/google/api/gax/rpc/ClientContextTest.java @@ -29,6 +29,8 @@ */ package com.google.api.gax.rpc; +import static com.google.common.truth.Truth.assertThat; + import com.google.api.core.ApiClock; import com.google.api.gax.core.BackgroundResource; import com.google.api.gax.core.ExecutorProvider; @@ -322,6 +324,6 @@ public void testWatchdogProvider() throws IOException { ClientContext context = ClientContext.create(builder.build()); List resources = context.getBackgroundResources(); - Truth.assertThat(resources.get(2)).isInstanceOf(Watchdog.class); + assertThat(resources.get(2)).isInstanceOf(Watchdog.class); } } diff --git a/gax/src/test/java/com/google/api/gax/rpc/FixedWatchdogProviderTest.java b/gax/src/test/java/com/google/api/gax/rpc/FixedWatchdogProviderTest.java index ba7d476b9..de1fd43ab 100644 --- a/gax/src/test/java/com/google/api/gax/rpc/FixedWatchdogProviderTest.java +++ b/gax/src/test/java/com/google/api/gax/rpc/FixedWatchdogProviderTest.java @@ -61,6 +61,7 @@ public void testNoModifications() { assertThat(provider.needsCheckInterval()).isFalse(); assertThat(provider.needsClock()).isFalse(); assertThat(provider.needsExecutor()).isFalse(); + assertThat(provider.shouldAutoClose()).isFalse(); Throwable actualError = null; try { diff --git a/gax/src/test/java/com/google/api/gax/rpc/InstantiatingWatchdogProviderTest.java b/gax/src/test/java/com/google/api/gax/rpc/InstantiatingWatchdogProviderTest.java index b7c87a620..0607dc717 100644 --- a/gax/src/test/java/com/google/api/gax/rpc/InstantiatingWatchdogProviderTest.java +++ b/gax/src/test/java/com/google/api/gax/rpc/InstantiatingWatchdogProviderTest.java @@ -60,6 +60,8 @@ public void happyPath() { assertThat(provider.needsCheckInterval()).isTrue(); provider = provider.withCheckInterval(checkInterval); + assertThat(provider.shouldAutoClose()).isTrue(); + Watchdog watchdog = provider.getWatchdog(); Mockito.verify(executor) .scheduleAtFixedRate( diff --git a/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java b/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java index a63a6719d..3efc80d74 100644 --- a/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java +++ b/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java @@ -167,7 +167,7 @@ public void testMultiple() throws Exception { } @Test - public void testWatchdogClosed() throws Exception { + public void testWatchdogBeingClosed() { ScheduledFuture future = Mockito.mock(ScheduledFuture.class); ScheduledExecutorService mockExecutor = Mockito.mock(ScheduledExecutorService.class); Mockito.when( @@ -186,7 +186,12 @@ public void testWatchdogClosed() throws Exception { Mockito.verify(mockExecutor) .scheduleAtFixedRate( underTest, checkInterval.toMillis(), checkInterval.toMillis(), TimeUnit.MILLISECONDS); + Mockito.verify(future, Mockito.times(2)).cancel(false); Mockito.verify(mockExecutor, Mockito.times(2)).shutdown(); + + underTest.shutdownNow(); + Mockito.verify(future).cancel(true); + Mockito.verify(mockExecutor).shutdownNow(); } static class AccumulatingObserver implements ResponseObserver { From 44948d417ae797450fbac00fbcbead0b26272d27 Mon Sep 17 00:00:00 2001 From: Rahul Kesharwani Date: Tue, 7 Jan 2020 13:24:52 +0530 Subject: [PATCH 3/4] Moved Watchdog scheduling to start() method --- .../com/google/api/gax/rpc/ClientContext.java | 23 +++++++++---------- .../rpc/InstantiatingWatchdogProvider.java | 2 +- .../java/com/google/api/gax/rpc/Watchdog.java | 18 ++++++++++++--- .../com/google/api/gax/rpc/WatchdogTest.java | 5 ++-- 4 files changed, 30 insertions(+), 18 deletions(-) diff --git a/gax/src/main/java/com/google/api/gax/rpc/ClientContext.java b/gax/src/main/java/com/google/api/gax/rpc/ClientContext.java index 3d30dcd41..82e06736a 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/ClientContext.java +++ b/gax/src/main/java/com/google/api/gax/rpc/ClientContext.java @@ -160,15 +160,6 @@ public static ClientContext create(StubSettings settings) throws IOException { defaultCallContext = defaultCallContext.withCredentials(credentials); } - ImmutableList.Builder backgroundResources = ImmutableList.builder(); - - if (transportChannelProvider.shouldAutoClose()) { - backgroundResources.add(transportChannel); - } - if (executorProvider.shouldAutoClose()) { - backgroundResources.add(new ExecutorAsBackgroundResource(executor)); - } - WatchdogProvider watchdogProvider = settings.getStreamWatchdogProvider(); @Nullable Watchdog watchdog = null; @@ -184,10 +175,18 @@ public static ClientContext create(StubSettings settings) throws IOException { watchdogProvider = watchdogProvider.withExecutor(executor); } watchdog = watchdogProvider.getWatchdog(); + } - if (watchdogProvider.shouldAutoClose()) { - backgroundResources.add(watchdog); - } + ImmutableList.Builder backgroundResources = ImmutableList.builder(); + + if (transportChannelProvider.shouldAutoClose()) { + backgroundResources.add(transportChannel); + } + if (executorProvider.shouldAutoClose()) { + backgroundResources.add(new ExecutorAsBackgroundResource(executor)); + } + if (watchdogProvider != null && watchdogProvider.shouldAutoClose()) { + backgroundResources.add(watchdog); } return newBuilder() diff --git a/gax/src/main/java/com/google/api/gax/rpc/InstantiatingWatchdogProvider.java b/gax/src/main/java/com/google/api/gax/rpc/InstantiatingWatchdogProvider.java index a0fe416ea..c68362de9 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/InstantiatingWatchdogProvider.java +++ b/gax/src/main/java/com/google/api/gax/rpc/InstantiatingWatchdogProvider.java @@ -102,7 +102,7 @@ public Watchdog getWatchdog() { return null; } - return new Watchdog(clock, checkInterval, executor); + return Watchdog.createWatchdog(clock, checkInterval, executor); } @Override diff --git a/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java b/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java index 15d34ffef..a42e3cdf5 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java +++ b/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java @@ -66,13 +66,25 @@ public class Watchdog implements Runnable, BackgroundResource { private final ConcurrentHashMap openStreams = new ConcurrentHashMap<>(); private final ApiClock clock; + private final Duration scheduleInterval; private final ScheduledExecutorService executor; - private final ScheduledFuture future; + private ScheduledFuture future; - public Watchdog(ApiClock clock, Duration scheduleInterval, ScheduledExecutorService executor) { + private Watchdog(ApiClock clock, Duration scheduleInterval, ScheduledExecutorService executor) { this.clock = Preconditions.checkNotNull(clock, "clock can't be null"); + this.scheduleInterval = scheduleInterval; this.executor = executor; - this.future = + } + + public static Watchdog createWatchdog( + ApiClock clock, Duration scheduleInterval, ScheduledExecutorService executor) { + Watchdog watchdog = new Watchdog(clock, scheduleInterval, executor); + watchdog.start(); + return watchdog; + } + + private void start() { + future = executor.scheduleAtFixedRate( this, scheduleInterval.toMillis(), scheduleInterval.toMillis(), TimeUnit.MILLISECONDS); } diff --git a/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java b/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java index 3efc80d74..e89e717df 100644 --- a/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java +++ b/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java @@ -68,7 +68,7 @@ public class WatchdogTest { @Before public void setUp() { clock = new FakeApiClock(0); - watchdog = new Watchdog(clock, checkInterval, EXECUTOR); + watchdog = Watchdog.createWatchdog(clock, checkInterval, EXECUTOR); callable = new MockServerStreamingCallable<>(); innerObserver = new AccumulatingObserver<>(); @@ -167,6 +167,7 @@ public void testMultiple() throws Exception { } @Test + @SuppressWarnings("unchecked") public void testWatchdogBeingClosed() { ScheduledFuture future = Mockito.mock(ScheduledFuture.class); ScheduledExecutorService mockExecutor = Mockito.mock(ScheduledExecutorService.class); @@ -177,7 +178,7 @@ public void testWatchdogBeingClosed() { Mockito.anyLong(), Mockito.any(TimeUnit.class))) .thenReturn(future); - Watchdog underTest = new Watchdog(clock, checkInterval, mockExecutor); + Watchdog underTest = Watchdog.createWatchdog(clock, checkInterval, mockExecutor); assertThat(underTest).isInstanceOf(BackgroundResource.class); underTest.close(); From 76f3d5fd3f4c39947411071925b4f75506174354 Mon Sep 17 00:00:00 2001 From: Rahul Kesharwani Date: Tue, 7 Jan 2020 13:31:39 +0530 Subject: [PATCH 4/4] added javadoc for Watchdog.create() --- .../gax/rpc/InstantiatingWatchdogProvider.java | 2 +- .../java/com/google/api/gax/rpc/Watchdog.java | 15 ++++++++------- .../java/com/google/api/gax/rpc/WatchdogTest.java | 4 ++-- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/gax/src/main/java/com/google/api/gax/rpc/InstantiatingWatchdogProvider.java b/gax/src/main/java/com/google/api/gax/rpc/InstantiatingWatchdogProvider.java index c68362de9..b33c57048 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/InstantiatingWatchdogProvider.java +++ b/gax/src/main/java/com/google/api/gax/rpc/InstantiatingWatchdogProvider.java @@ -102,7 +102,7 @@ public Watchdog getWatchdog() { return null; } - return Watchdog.createWatchdog(clock, checkInterval, executor); + return Watchdog.create(clock, checkInterval, executor); } @Override diff --git a/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java b/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java index a42e3cdf5..ddf399ff7 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java +++ b/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java @@ -70,19 +70,20 @@ public class Watchdog implements Runnable, BackgroundResource { private final ScheduledExecutorService executor; private ScheduledFuture future; - private Watchdog(ApiClock clock, Duration scheduleInterval, ScheduledExecutorService executor) { - this.clock = Preconditions.checkNotNull(clock, "clock can't be null"); - this.scheduleInterval = scheduleInterval; - this.executor = executor; - } - - public static Watchdog createWatchdog( + /** returns a Watchdog which is scheduled at the provided interval. */ + public static Watchdog create( ApiClock clock, Duration scheduleInterval, ScheduledExecutorService executor) { Watchdog watchdog = new Watchdog(clock, scheduleInterval, executor); watchdog.start(); return watchdog; } + private Watchdog(ApiClock clock, Duration scheduleInterval, ScheduledExecutorService executor) { + this.clock = Preconditions.checkNotNull(clock, "clock can't be null"); + this.scheduleInterval = scheduleInterval; + this.executor = executor; + } + private void start() { future = executor.scheduleAtFixedRate( diff --git a/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java b/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java index e89e717df..b63e80cc6 100644 --- a/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java +++ b/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java @@ -68,7 +68,7 @@ public class WatchdogTest { @Before public void setUp() { clock = new FakeApiClock(0); - watchdog = Watchdog.createWatchdog(clock, checkInterval, EXECUTOR); + watchdog = Watchdog.create(clock, checkInterval, EXECUTOR); callable = new MockServerStreamingCallable<>(); innerObserver = new AccumulatingObserver<>(); @@ -178,7 +178,7 @@ public void testWatchdogBeingClosed() { Mockito.anyLong(), Mockito.any(TimeUnit.class))) .thenReturn(future); - Watchdog underTest = Watchdog.createWatchdog(clock, checkInterval, mockExecutor); + Watchdog underTest = Watchdog.create(clock, checkInterval, mockExecutor); assertThat(underTest).isInstanceOf(BackgroundResource.class); underTest.close();