Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Expose Batcher#sendOutstanding to provide async batch flushing #786

Merged
merged 1 commit into from Sep 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions gax/src/main/java/com/google/api/gax/batching/Batcher.java
Expand Up @@ -62,6 +62,14 @@ public interface Batcher<ElementT, ElementResultT> extends AutoCloseable {
*/
void flush() throws InterruptedException;

/**
* Sends accumulated elements asynchronously for batching.
*
* <p>Note: This method can be invoked concurrently unlike {@link #add} and {@link #close}, which
* can only be called from a single user thread. Please take caution to avoid race condition.
*/
void sendOutstanding();

/**
* Closes this Batcher by preventing new elements from being added and flushing the existing
* elements.
Expand Down
20 changes: 8 additions & 12 deletions gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java
Expand Up @@ -132,27 +132,23 @@ public ApiFuture<ElementResultT> add(ElementT element) {
}

if (currentOpenBatch.hasAnyThresholdReached()) {
sendBatch();
sendOutstanding();
}
return result;
}

/** {@inheritDoc} */
@Override
public void flush() throws InterruptedException {
sendBatch();
sendOutstanding();
awaitAllOutstandingBatches();
}

/**
* Sends accumulated elements asynchronously for batching.
*
* <p>Note: This method can be invoked concurrently unlike {@link #add} and {@link #close}, which
* can only be called from a single user thread. Please take caution to avoid race condition.
*/
private void sendBatch() {

/** {@inheritDoc} */
@Override
public void sendOutstanding() {
final Batch<ElementT, ElementResultT, RequestT, ResponseT> accumulatedBatch;

synchronized (elementLock) {
if (currentOpenBatch.isEmpty()) {
return;
Expand Down Expand Up @@ -280,7 +276,7 @@ boolean hasAnyThresholdReached() {
}

/**
* Executes {@link #sendBatch()} on a periodic interval.
* Executes {@link #sendOutstanding()} on a periodic interval.
*
* <p>This class holds a weak reference to the Batcher instance and cancels polling if the target
* Batcher has been garbage collected.
Expand All @@ -303,7 +299,7 @@ public void run() {
if (batcher == null) {
scheduledFuture.cancel(true);
} else {
batcher.sendBatch();
batcher.sendOutstanding();
}
}

Expand Down
Expand Up @@ -41,6 +41,7 @@
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntList;
import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntSquarerCallable;
import com.google.api.gax.rpc.testing.FakeBatchableApi.SquarerBatchingDescriptorV2;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -55,6 +56,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Filter;
import java.util.logging.Level;
import java.util.logging.LogRecord;
Expand Down Expand Up @@ -113,6 +115,35 @@ public void testResultsAreResolvedAfterFlush() throws Exception {
assertThat(anotherResult.isDone()).isFalse();
}

@Test
public void testSendOutstanding() {
final AtomicInteger callableCounter = new AtomicInteger();

underTest =
new BatcherImpl<>(
SQUARER_BATCHING_DESC_V2,
new LabeledIntSquarerCallable() {
@Override
public ApiFuture<List<Integer>> futureCall(LabeledIntList request) {
callableCounter.incrementAndGet();
return super.futureCall(request);
}
},
labeledIntList,
batchingSettings,
EXECUTOR);

// Empty Batcher
underTest.sendOutstanding();
assertThat(callableCounter.get()).isEqualTo(0);

underTest.add(2);
underTest.add(3);
underTest.add(4);
underTest.sendOutstanding();
assertThat(callableCounter.get()).isEqualTo(1);
}

/** Element results are resolved after batch is closed. */
@Test
public void testWhenBatcherIsClose() throws Exception {
Expand Down Expand Up @@ -490,7 +521,7 @@ private void testElementTriggers(BatchingSettings settings) throws Exception {
SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList, settings, EXECUTOR);
Future<Integer> result = underTest.add(4);
assertThat(result.isDone()).isFalse();
// After this element is added, the batch triggers sendBatch().
// After this element is added, the batch triggers sendOutstanding().
Future<Integer> anotherResult = underTest.add(5);
// Both the elements should be resolved now.
assertThat(result.isDone()).isTrue();
Expand Down