Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Commit

Permalink
Track number of failure for Batcher when Batcher#close is called.
Browse files Browse the repository at this point in the history
Adding test case for BatchingException.
  • Loading branch information
rahulKQL committed Oct 8, 2019
1 parent 664f671 commit 9930063
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 1 deletion.
44 changes: 44 additions & 0 deletions gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand All @@ -47,14 +49,17 @@
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* Queues up the elements until {@link #flush()} is called; once batching is over, returned future
* resolves.
Expand Down Expand Up @@ -83,9 +88,14 @@ public class BatcherImpl<ElementT, ElementResultT, RequestT, ResponseT>
private final AtomicInteger numOfOutstandingBatches = new AtomicInteger(0);
private final Object flushLock = new Object();
private final Object elementLock = new Object();
private final Object errorLock = new Object();
private final Future<?> scheduledFuture;
private volatile boolean isClosed = false;

private AtomicLong numOfFailure = new AtomicLong();
private final Map<Class, AtomicInteger> failuresTypeCount = new ConcurrentHashMap<>();
private final Map<StatusCode, AtomicInteger> failureStatusCodeCount = new ConcurrentHashMap<>();

/**
* @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements
* into wrappers request and response.
Expand Down Expand Up @@ -176,6 +186,7 @@ public void onSuccess(ResponseT response) {
@Override
public void onFailure(Throwable throwable) {
try {
addException(throwable);
accumulatedBatch.onBatchFailure(throwable);
} finally {
onBatchCompletion();
Expand All @@ -201,6 +212,36 @@ private void awaitAllOutstandingBatches() throws InterruptedException {
}
}

/**
* It keeps the count of number of failed RPCs. This method also tracks the count for exception
* type along with counts for different failed {@link StatusCode}s.
*/
private void addException(Throwable throwable) {
numOfFailure.incrementAndGet();
Class exceptionClass = throwable.getClass();

if (throwable instanceof ApiException) {
StatusCode code = ((ApiException) throwable).getStatusCode();
exceptionClass = ApiException.class;

synchronized (errorLock) {
if (failureStatusCodeCount.containsKey(code)) {
failureStatusCodeCount.get(code).get();
} else {
failureStatusCodeCount.put(code, new AtomicInteger(1));
}
}
}

synchronized (errorLock) {
if (failuresTypeCount.containsKey(exceptionClass)) {
failuresTypeCount.get(exceptionClass).incrementAndGet();
} else {
failuresTypeCount.put(exceptionClass, new AtomicInteger(1));
}
}
}

/** {@inheritDoc} */
@Override
public void close() throws InterruptedException {
Expand All @@ -210,6 +251,9 @@ public void close() throws InterruptedException {
flush();
scheduledFuture.cancel(true);
isClosed = true;
if (numOfFailure.get() > 0) {
throw new BatchingException(numOfFailure.get(), failuresTypeCount, failureStatusCodeCount);
}
currentBatcherReference.closed = true;
currentBatcherReference.clear();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 2019 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.batching;

import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

/**
* This class represents the number of failed exceptions while performing Batching. It also provides
* the count of exceptions types and count of each failed statusCodes occurred in the Batching
* process.
*/
public class BatchingException extends RuntimeException {

private final long numOfFailure;
private final Map<Class, AtomicInteger> exceptionCount;
private final Map<StatusCode, AtomicInteger> statusCodeCount;

BatchingException(
long numOfFailure,
Map<Class, AtomicInteger> exceptionCount,
Map<StatusCode, AtomicInteger> statusCodeCount) {
this.numOfFailure = numOfFailure;
this.exceptionCount = exceptionCount;
this.statusCodeCount = statusCodeCount;
}

public long getTotalFailureCount() {
return numOfFailure;
}

public Map<Class, AtomicInteger> getFailureTypesCount() {
return exceptionCount;
}

public Map<StatusCode, AtomicInteger> getFailureStatusCodeCount() {
return statusCodeCount;
}

@Override
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("Failed to commit ")
.append(numOfFailure)
.append(" mutations\n")
.append("Mutations failed for Exception types: ")
.append(exceptionCount.entrySet())
.append("\n");

if (!exceptionCount.isEmpty()) {
sb.append("Total ApiException failure are: ")
.append(exceptionCount.get(ApiException.class))
.append(" with Status Code as: ")
.append(statusCodeCount.entrySet());
}
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,14 @@
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.BatcherImpl.BatcherReference;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.api.gax.rpc.UnimplementedException;
import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntList;
import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntSquarerCallable;
import com.google.api.gax.rpc.testing.FakeBatchableApi.SquarerBatchingDescriptorV2;
import com.google.api.gax.rpc.testing.FakeStatusCode;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -196,20 +200,32 @@ public ApiFuture<List<Integer>> futureCall(
return ApiFutures.immediateFailedFuture(fakeError);
}
};

underTest =
new BatcherImpl<>(
SQUARER_BATCHING_DESC_V2, unaryCallable, labeledIntList, batchingSettings, EXECUTOR);
Future<Integer> failedResult = underTest.add(5);
underTest.flush();
assertThat(failedResult.isDone()).isTrue();

Throwable actualError = null;
try {
failedResult.get();
} catch (InterruptedException | ExecutionException ex) {
actualError = ex;
}

assertThat(actualError).hasCauseThat().isSameInstanceAs(fakeError);

try {
underTest.close();
} catch (RuntimeException e) {
actualError = e;
}
assertThat(actualError).isInstanceOf(BatchingException.class);
BatchingException batchingEx = (BatchingException) actualError;
assertThat(batchingEx.getTotalFailureCount()).isEqualTo(1);
assertThat(batchingEx.getFailureTypesCount()).containsKey(RuntimeException.class);
assertThat(batchingEx.getFailureStatusCodeCount()).isEmpty();
}

/** Resolves future results when {@link BatchingDescriptor#splitResponse} throws exception. */
Expand Down Expand Up @@ -515,6 +531,37 @@ public boolean isLoggable(LogRecord record) {
}
}

@Test
public void testExceptionWhileBatching() {
final Exception fakeError = new RuntimeException();
UnaryCallable<LabeledIntList, List<Integer>> unaryCallable =
new UnaryCallable<LabeledIntList, List<Integer>>() {
@Override
public ApiFuture<List<Integer>> futureCall(
LabeledIntList request, ApiCallContext context) {
return ApiFutures.immediateFailedFuture(
new UnimplementedException(
fakeError, FakeStatusCode.of(StatusCode.Code.FAILED_PRECONDITION), false));
}
};
underTest =
new BatcherImpl<>(
SQUARER_BATCHING_DESC_V2, unaryCallable, labeledIntList, batchingSettings, EXECUTOR);
underTest.add(2);
Exception actualError = null;
try {
underTest.close();
} catch (Exception e) {
actualError = e;
}
assertThat(actualError).isInstanceOf(BatchingException.class);
BatchingException batchingEx = (BatchingException) actualError;
assertThat(batchingEx.getTotalFailureCount()).isEqualTo(1);
assertThat(batchingEx.getFailureTypesCount()).containsKey(ApiException.class);
assertThat(batchingEx.getFailureStatusCodeCount())
.containsKey(FakeStatusCode.of(StatusCode.Code.FAILED_PRECONDITION));
}

private void testElementTriggers(BatchingSettings settings) throws Exception {
underTest =
new BatcherImpl<>(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2019 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.batching;

import static com.google.common.truth.Truth.assertThat;

import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.testing.FakeStatusCode;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class BatchingExceptionTest {

@Test
public void testBatchingException() {
Map<Class, AtomicInteger> failureCounts = new ConcurrentHashMap<>();
failureCounts.put(RuntimeException.class, new AtomicInteger(6));
failureCounts.put(IOException.class, new AtomicInteger(3));

Map<StatusCode, AtomicInteger> statusCounts = new ConcurrentHashMap<>();
statusCounts.put(FakeStatusCode.of(StatusCode.Code.UNIMPLEMENTED), new AtomicInteger(34));
statusCounts.put(FakeStatusCode.of(StatusCode.Code.INVALID_ARGUMENT), new AtomicInteger(324));

BatchingException underTest = new BatchingException(10, failureCounts, statusCounts);
assertThat(underTest).isInstanceOf(RuntimeException.class);
assertThat(underTest.getTotalFailureCount()).isEqualTo(10);
assertThat(underTest.getFailureTypesCount()).isEqualTo(failureCounts);
assertThat(underTest.getFailureStatusCodeCount()).isEqualTo(statusCounts);
}
}

0 comments on commit 9930063

Please sign in to comment.