Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Commit

Permalink
Expose Batcher#sendOutstanding to provide async batch flushing (#786)
Browse files Browse the repository at this point in the history
Now `Batcher` supports asynchronously batching. 

This is needed to support HBase client's [BulkMutation#sendUnsent](https://github.com/googleapis/java-bigtable-hbase/blob/master/bigtable-client-core-parent/bigtable-client-core/src/main/java/com/google/cloud/bigtable/core/IBulkMutation.java#L37-L38) through GCJ client.
  • Loading branch information
rahulKQL committed Sep 9, 2019
1 parent a4aec0f commit 7637f9c
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 13 deletions.
8 changes: 8 additions & 0 deletions gax/src/main/java/com/google/api/gax/batching/Batcher.java
Expand Up @@ -62,6 +62,14 @@ public interface Batcher<ElementT, ElementResultT> extends AutoCloseable {
*/
void flush() throws InterruptedException;

/**
* Sends accumulated elements asynchronously for batching.
*
* <p>Note: This method can be invoked concurrently unlike {@link #add} and {@link #close}, which
* can only be called from a single user thread. Please take caution to avoid race condition.
*/
void sendOutstanding();

/**
* Closes this Batcher by preventing new elements from being added and flushing the existing
* elements.
Expand Down
20 changes: 8 additions & 12 deletions gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java
Expand Up @@ -132,27 +132,23 @@ public ApiFuture<ElementResultT> add(ElementT element) {
}

if (currentOpenBatch.hasAnyThresholdReached()) {
sendBatch();
sendOutstanding();
}
return result;
}

/** {@inheritDoc} */
@Override
public void flush() throws InterruptedException {
sendBatch();
sendOutstanding();
awaitAllOutstandingBatches();
}

/**
* Sends accumulated elements asynchronously for batching.
*
* <p>Note: This method can be invoked concurrently unlike {@link #add} and {@link #close}, which
* can only be called from a single user thread. Please take caution to avoid race condition.
*/
private void sendBatch() {

/** {@inheritDoc} */
@Override
public void sendOutstanding() {
final Batch<ElementT, ElementResultT, RequestT, ResponseT> accumulatedBatch;

synchronized (elementLock) {
if (currentOpenBatch.isEmpty()) {
return;
Expand Down Expand Up @@ -280,7 +276,7 @@ boolean hasAnyThresholdReached() {
}

/**
* Executes {@link #sendBatch()} on a periodic interval.
* Executes {@link #sendOutstanding()} on a periodic interval.
*
* <p>This class holds a weak reference to the Batcher instance and cancels polling if the target
* Batcher has been garbage collected.
Expand All @@ -303,7 +299,7 @@ public void run() {
if (batcher == null) {
scheduledFuture.cancel(true);
} else {
batcher.sendBatch();
batcher.sendOutstanding();
}
}

Expand Down
Expand Up @@ -41,6 +41,7 @@
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntList;
import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntSquarerCallable;
import com.google.api.gax.rpc.testing.FakeBatchableApi.SquarerBatchingDescriptorV2;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -55,6 +56,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Filter;
import java.util.logging.Level;
import java.util.logging.LogRecord;
Expand Down Expand Up @@ -113,6 +115,35 @@ public void testResultsAreResolvedAfterFlush() throws Exception {
assertThat(anotherResult.isDone()).isFalse();
}

@Test
public void testSendOutstanding() {
final AtomicInteger callableCounter = new AtomicInteger();

underTest =
new BatcherImpl<>(
SQUARER_BATCHING_DESC_V2,
new LabeledIntSquarerCallable() {
@Override
public ApiFuture<List<Integer>> futureCall(LabeledIntList request) {
callableCounter.incrementAndGet();
return super.futureCall(request);
}
},
labeledIntList,
batchingSettings,
EXECUTOR);

// Empty Batcher
underTest.sendOutstanding();
assertThat(callableCounter.get()).isEqualTo(0);

underTest.add(2);
underTest.add(3);
underTest.add(4);
underTest.sendOutstanding();
assertThat(callableCounter.get()).isEqualTo(1);
}

/** Element results are resolved after batch is closed. */
@Test
public void testWhenBatcherIsClose() throws Exception {
Expand Down Expand Up @@ -490,7 +521,7 @@ private void testElementTriggers(BatchingSettings settings) throws Exception {
SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList, settings, EXECUTOR);
Future<Integer> result = underTest.add(4);
assertThat(result.isDone()).isFalse();
// After this element is added, the batch triggers sendBatch().
// After this element is added, the batch triggers sendOutstanding().
Future<Integer> anotherResult = underTest.add(5);
// Both the elements should be resolved now.
assertThat(result.isDone()).isTrue();
Expand Down

0 comments on commit 7637f9c

Please sign in to comment.