From 37063ef0d2f480d4675b5638c6d2f97bdad0647f Mon Sep 17 00:00:00 2001 From: Rahul Kesharwani Date: Tue, 8 Oct 2019 21:35:22 +0530 Subject: [PATCH 01/11] Track number of failure for Batcher when Batcher#close is called. Adding test case for BatchingException. --- .../google/api/gax/batching/BatcherImpl.java | 44 ++++++++++ .../api/gax/batching/BatchingException.java | 87 +++++++++++++++++++ .../api/gax/batching/BatcherImplTest.java | 49 ++++++++++- .../gax/batching/BatchingExceptionTest.java | 63 ++++++++++++++ 4 files changed, 242 insertions(+), 1 deletion(-) create mode 100644 gax/src/main/java/com/google/api/gax/batching/BatchingException.java create mode 100644 gax/src/test/java/com/google/api/gax/batching/BatchingExceptionTest.java diff --git a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java index 5ca1b0139..60a158ca5 100644 --- a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java +++ b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java @@ -37,6 +37,8 @@ import com.google.api.core.BetaApi; import com.google.api.core.InternalApi; import com.google.api.core.SettableApiFuture; +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.StatusCode; import com.google.api.gax.rpc.UnaryCallable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -47,14 +49,17 @@ import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; + /** * Queues up the elements until {@link #flush()} is called; once batching is over, returned future * resolves. @@ -83,9 +88,14 @@ public class BatcherImpl private final AtomicInteger numOfOutstandingBatches = new AtomicInteger(0); private final Object flushLock = new Object(); private final Object elementLock = new Object(); + private final Object errorLock = new Object(); private final Future scheduledFuture; private volatile boolean isClosed = false; + private AtomicLong numOfFailure = new AtomicLong(); + private final Map failuresTypeCount = new ConcurrentHashMap<>(); + private final Map failureStatusCodeCount = new ConcurrentHashMap<>(); + /** * @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements * into wrappers request and response. @@ -176,6 +186,7 @@ public void onSuccess(ResponseT response) { @Override public void onFailure(Throwable throwable) { try { + addException(throwable); accumulatedBatch.onBatchFailure(throwable); } finally { onBatchCompletion(); @@ -201,6 +212,36 @@ private void awaitAllOutstandingBatches() throws InterruptedException { } } + /** + * It keeps the count of number of failed RPCs. This method also tracks the count for exception + * type along with counts for different failed {@link StatusCode}s. + */ + private void addException(Throwable throwable) { + numOfFailure.incrementAndGet(); + Class exceptionClass = throwable.getClass(); + + if (throwable instanceof ApiException) { + StatusCode code = ((ApiException) throwable).getStatusCode(); + exceptionClass = ApiException.class; + + synchronized (errorLock) { + if (failureStatusCodeCount.containsKey(code)) { + failureStatusCodeCount.get(code).get(); + } else { + failureStatusCodeCount.put(code, new AtomicInteger(1)); + } + } + } + + synchronized (errorLock) { + if (failuresTypeCount.containsKey(exceptionClass)) { + failuresTypeCount.get(exceptionClass).incrementAndGet(); + } else { + failuresTypeCount.put(exceptionClass, new AtomicInteger(1)); + } + } + } + /** {@inheritDoc} */ @Override public void close() throws InterruptedException { @@ -210,6 +251,9 @@ public void close() throws InterruptedException { flush(); scheduledFuture.cancel(true); isClosed = true; + if (numOfFailure.get() > 0) { + throw new BatchingException(numOfFailure.get(), failuresTypeCount, failureStatusCodeCount); + } currentBatcherReference.closed = true; currentBatcherReference.clear(); } diff --git a/gax/src/main/java/com/google/api/gax/batching/BatchingException.java b/gax/src/main/java/com/google/api/gax/batching/BatchingException.java new file mode 100644 index 000000000..a065a9340 --- /dev/null +++ b/gax/src/main/java/com/google/api/gax/batching/BatchingException.java @@ -0,0 +1,87 @@ +/* + * Copyright 2019 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.batching; + +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.StatusCode; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This class represents the number of failed exceptions while performing Batching. It also provides + * the count of exceptions types and count of each failed statusCodes occurred in the Batching + * process. + */ +public class BatchingException extends RuntimeException { + + private final long numOfFailure; + private final Map exceptionCount; + private final Map statusCodeCount; + + BatchingException( + long numOfFailure, + Map exceptionCount, + Map statusCodeCount) { + this.numOfFailure = numOfFailure; + this.exceptionCount = exceptionCount; + this.statusCodeCount = statusCodeCount; + } + + public long getTotalFailureCount() { + return numOfFailure; + } + + public Map getFailureTypesCount() { + return exceptionCount; + } + + public Map getFailureStatusCodeCount() { + return statusCodeCount; + } + + @Override + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append("Failed to commit ") + .append(numOfFailure) + .append(" mutations\n") + .append("Mutations failed for Exception types: ") + .append(exceptionCount.entrySet()) + .append("\n"); + + if (!exceptionCount.isEmpty()) { + sb.append("Total ApiException failure are: ") + .append(exceptionCount.get(ApiException.class)) + .append(" with Status Code as: ") + .append(statusCodeCount.entrySet()); + } + return sb.toString(); + } +} diff --git a/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java b/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java index 7a8ee2c1a..1d5fffe9d 100644 --- a/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java +++ b/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java @@ -39,10 +39,14 @@ import com.google.api.core.SettableApiFuture; import com.google.api.gax.batching.BatcherImpl.BatcherReference; import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.StatusCode; import com.google.api.gax.rpc.UnaryCallable; +import com.google.api.gax.rpc.UnimplementedException; import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntList; import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntSquarerCallable; import com.google.api.gax.rpc.testing.FakeBatchableApi.SquarerBatchingDescriptorV2; +import com.google.api.gax.rpc.testing.FakeStatusCode; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; @@ -196,20 +200,32 @@ public ApiFuture> futureCall( return ApiFutures.immediateFailedFuture(fakeError); } }; + underTest = new BatcherImpl<>( SQUARER_BATCHING_DESC_V2, unaryCallable, labeledIntList, batchingSettings, EXECUTOR); Future failedResult = underTest.add(5); underTest.flush(); assertThat(failedResult.isDone()).isTrue(); + Throwable actualError = null; try { failedResult.get(); } catch (InterruptedException | ExecutionException ex) { actualError = ex; } - assertThat(actualError).hasCauseThat().isSameInstanceAs(fakeError); + + try { + underTest.close(); + } catch (RuntimeException e) { + actualError = e; + } + assertThat(actualError).isInstanceOf(BatchingException.class); + BatchingException batchingEx = (BatchingException) actualError; + assertThat(batchingEx.getTotalFailureCount()).isEqualTo(1); + assertThat(batchingEx.getFailureTypesCount()).containsKey(RuntimeException.class); + assertThat(batchingEx.getFailureStatusCodeCount()).isEmpty(); } /** Resolves future results when {@link BatchingDescriptor#splitResponse} throws exception. */ @@ -515,6 +531,37 @@ public boolean isLoggable(LogRecord record) { } } + @Test + public void testExceptionWhileBatching() { + final Exception fakeError = new RuntimeException(); + UnaryCallable> unaryCallable = + new UnaryCallable>() { + @Override + public ApiFuture> futureCall( + LabeledIntList request, ApiCallContext context) { + return ApiFutures.immediateFailedFuture( + new UnimplementedException( + fakeError, FakeStatusCode.of(StatusCode.Code.FAILED_PRECONDITION), false)); + } + }; + underTest = + new BatcherImpl<>( + SQUARER_BATCHING_DESC_V2, unaryCallable, labeledIntList, batchingSettings, EXECUTOR); + underTest.add(2); + Exception actualError = null; + try { + underTest.close(); + } catch (Exception e) { + actualError = e; + } + assertThat(actualError).isInstanceOf(BatchingException.class); + BatchingException batchingEx = (BatchingException) actualError; + assertThat(batchingEx.getTotalFailureCount()).isEqualTo(1); + assertThat(batchingEx.getFailureTypesCount()).containsKey(ApiException.class); + assertThat(batchingEx.getFailureStatusCodeCount()) + .containsKey(FakeStatusCode.of(StatusCode.Code.FAILED_PRECONDITION)); + } + private void testElementTriggers(BatchingSettings settings) throws Exception { underTest = new BatcherImpl<>( diff --git a/gax/src/test/java/com/google/api/gax/batching/BatchingExceptionTest.java b/gax/src/test/java/com/google/api/gax/batching/BatchingExceptionTest.java new file mode 100644 index 000000000..0135f1d10 --- /dev/null +++ b/gax/src/test/java/com/google/api/gax/batching/BatchingExceptionTest.java @@ -0,0 +1,63 @@ +/* + * Copyright 2019 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.batching; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.gax.rpc.StatusCode; +import com.google.api.gax.rpc.testing.FakeStatusCode; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class BatchingExceptionTest { + + @Test + public void testBatchingException() { + Map failureCounts = new ConcurrentHashMap<>(); + failureCounts.put(RuntimeException.class, new AtomicInteger(6)); + failureCounts.put(IOException.class, new AtomicInteger(3)); + + Map statusCounts = new ConcurrentHashMap<>(); + statusCounts.put(FakeStatusCode.of(StatusCode.Code.UNIMPLEMENTED), new AtomicInteger(34)); + statusCounts.put(FakeStatusCode.of(StatusCode.Code.INVALID_ARGUMENT), new AtomicInteger(324)); + + BatchingException underTest = new BatchingException(10, failureCounts, statusCounts); + assertThat(underTest).isInstanceOf(RuntimeException.class); + assertThat(underTest.getTotalFailureCount()).isEqualTo(10); + assertThat(underTest.getFailureTypesCount()).isEqualTo(failureCounts); + assertThat(underTest.getFailureStatusCodeCount()).isEqualTo(statusCounts); + } +} From e30e82a73ec4f352a675ae7d250f6e058f4a6a44 Mon Sep 17 00:00:00 2001 From: Rahul Kesharwani Date: Wed, 9 Oct 2019 17:43:16 +0530 Subject: [PATCH 02/11] Removed `BatchingException#toString()` to only log total failure count --- .../google/api/gax/batching/BatcherImpl.java | 2 +- .../api/gax/batching/BatchingException.java | 22 ++----------------- 2 files changed, 3 insertions(+), 21 deletions(-) diff --git a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java index 60a158ca5..3a080f7ea 100644 --- a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java +++ b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java @@ -226,7 +226,7 @@ private void addException(Throwable throwable) { synchronized (errorLock) { if (failureStatusCodeCount.containsKey(code)) { - failureStatusCodeCount.get(code).get(); + failureStatusCodeCount.get(code).incrementAndGet(); } else { failureStatusCodeCount.put(code, new AtomicInteger(1)); } diff --git a/gax/src/main/java/com/google/api/gax/batching/BatchingException.java b/gax/src/main/java/com/google/api/gax/batching/BatchingException.java index a065a9340..ff246daf7 100644 --- a/gax/src/main/java/com/google/api/gax/batching/BatchingException.java +++ b/gax/src/main/java/com/google/api/gax/batching/BatchingException.java @@ -29,7 +29,6 @@ */ package com.google.api.gax.batching; -import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.StatusCode; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; @@ -49,6 +48,8 @@ public class BatchingException extends RuntimeException { long numOfFailure, Map exceptionCount, Map statusCodeCount) { + super("Failed to commit " + numOfFailure + " mutations"); + this.numOfFailure = numOfFailure; this.exceptionCount = exceptionCount; this.statusCodeCount = statusCodeCount; @@ -65,23 +66,4 @@ public Map getFailureTypesCount() { public Map getFailureStatusCodeCount() { return statusCodeCount; } - - @Override - public String toString() { - StringBuffer sb = new StringBuffer(); - sb.append("Failed to commit ") - .append(numOfFailure) - .append(" mutations\n") - .append("Mutations failed for Exception types: ") - .append(exceptionCount.entrySet()) - .append("\n"); - - if (!exceptionCount.isEmpty()) { - sb.append("Total ApiException failure are: ") - .append(exceptionCount.get(ApiException.class)) - .append(" with Status Code as: ") - .append(statusCodeCount.entrySet()); - } - return sb.toString(); - } } From 45960f14df635cf336d6f0c09df9f45965bc822a Mon Sep 17 00:00:00 2001 From: Rahul Kesharwani Date: Mon, 21 Oct 2019 21:10:58 +0530 Subject: [PATCH 03/11] Introduced `BatchStats` for failures occurred during batching Now BatchStats will keep the counter for each type of exception happened at RPC as well as ElementT/entry object level. Also refactored exception message to be more detailed. --- .../google/api/gax/batching/BatchStats.java | 190 ++++++++++++++++++ .../google/api/gax/batching/BatcherImpl.java | 53 +---- .../api/gax/batching/BatchingException.java | 40 +--- .../api/gax/batching/BatchStatsTest.java | 104 ++++++++++ .../api/gax/batching/BatcherImplTest.java | 136 +++++++++---- .../gax/batching/BatchingExceptionTest.java | 63 ------ 6 files changed, 405 insertions(+), 181 deletions(-) create mode 100644 gax/src/main/java/com/google/api/gax/batching/BatchStats.java create mode 100644 gax/src/test/java/com/google/api/gax/batching/BatchStatsTest.java delete mode 100644 gax/src/test/java/com/google/api/gax/batching/BatchingExceptionTest.java diff --git a/gax/src/main/java/com/google/api/gax/batching/BatchStats.java b/gax/src/main/java/com/google/api/gax/batching/BatchStats.java new file mode 100644 index 000000000..eae517087 --- /dev/null +++ b/gax/src/main/java/com/google/api/gax/batching/BatchStats.java @@ -0,0 +1,190 @@ +/* + * Copyright 2019 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation + * /or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.batching; + +import com.google.api.core.ApiFutureCallback; +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.StatusCode; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nullable; + +/** + * This class keeps the statistics about failed operations(both at RPC and ElementT level) in {@link + * Batcher}. This provides the count of individual exception failure and count of each failed {@link + * StatusCode.Code} occurred in the batching process. + */ +class BatchStats { + + private final Map requestExceptionCounts = new ConcurrentHashMap<>(); + private final Map requestStatusCounts = new ConcurrentHashMap<>(); + private final AtomicInteger partialBatchFailures = new AtomicInteger(0); + private final Map entryExceptionCounts = new ConcurrentHashMap<>(); + private final Map entryStatusCounts = new ConcurrentHashMap<>(); + + private final Object errorLock = new Object(); + private final Object statusLock = new Object(); + + ApiFutureCallback getRequestCallback() { + return new ApiFutureCallback() { + public void onFailure(Throwable t) { + recordRequestException(t); + } + + @Override + public void onSuccess(T result) {} + }; + } + + ApiFutureCallback getEntryCallback() { + return new ApiFutureCallback() { + public void onFailure(Throwable t) { + recordEntryException(t); + } + + @Override + public void onSuccess(T result) {} + }; + } + + private void recordRequestException(Throwable throwable) { + Class exceptionClass = throwable.getClass(); + + if (throwable instanceof ApiException) { + StatusCode.Code code = ((ApiException) throwable).getStatusCode().getCode(); + exceptionClass = ApiException.class; + + synchronized (statusLock) { + if (requestStatusCounts.containsKey(code)) { + requestStatusCounts.get(code).incrementAndGet(); + } else { + requestStatusCounts.put(code, new AtomicInteger(1)); + } + } + } + + synchronized (errorLock) { + if (requestExceptionCounts.containsKey(exceptionClass)) { + requestExceptionCounts.get(exceptionClass).incrementAndGet(); + } else { + synchronized (errorLock) { + requestExceptionCounts.put(exceptionClass, new AtomicInteger(1)); + } + } + } + } + + private void recordEntryException(Throwable throwable) { + Class exceptionClass = throwable.getClass(); + + if (throwable instanceof ApiException) { + StatusCode.Code code = ((ApiException) throwable).getStatusCode().getCode(); + exceptionClass = ApiException.class; + + synchronized (statusLock) { + if (entryStatusCounts.containsKey(code)) { + entryStatusCounts.get(code).incrementAndGet(); + } else { + entryStatusCounts.put(code, new AtomicInteger(1)); + } + } + } + + synchronized (errorLock) { + if (entryExceptionCounts.containsKey(exceptionClass)) { + entryExceptionCounts.get(exceptionClass).incrementAndGet(); + } else { + partialBatchFailures.incrementAndGet(); + entryExceptionCounts.put(exceptionClass, new AtomicInteger(1)); + } + } + } + + /** Calculates and formats the message with request and entry failure count. */ + @Nullable + BatchingException asException() { + if (requestExceptionCounts.isEmpty() && partialBatchFailures.get() == 0) { + return null; + } + + StringBuilder sb = new StringBuilder(); + int batchFailures = requestExceptionCounts.size(); + + if (requestExceptionCounts.isEmpty()) { + sb.append("Batching finished with "); + } else { + sb.append(String.format("%d batches failed to apply due to: ", batchFailures)); + + // compose the exception and return it + for (Class req : requestExceptionCounts.keySet()) { + sb.append( + String.format("%d %s ", requestExceptionCounts.get(req).get(), req.getSimpleName())); + if (req.equals(ApiException.class)) { + sb.append("("); + for (StatusCode.Code statusCode : requestStatusCounts.keySet()) { + sb.append( + String.format("%d %s ", requestStatusCounts.get(statusCode).get(), statusCode)); + } + sb.append(") "); + } + } + } + + if (partialBatchFailures.get() > 0) { + sb.append(String.format("%d partial failures.", partialBatchFailures.get())); + + int totalEntriesEx = 0; + for (AtomicInteger ai : entryExceptionCounts.values()) { + totalEntriesEx += ai.get(); + } + + sb.append( + String.format( + " The %d partial failures contained %d entries that failed with: ", + partialBatchFailures.get(), totalEntriesEx)); + + for (Class entry : entryExceptionCounts.keySet()) { + sb.append( + String.format("%d %s ", entryExceptionCounts.get(entry).get(), entry.getSimpleName())); + if (entry.equals(ApiException.class)) { + sb.append("("); + for (StatusCode.Code code : entryStatusCounts.keySet()) { + sb.append(String.format("%d %s ", entryStatusCounts.get(code).get(), code)); + } + sb.append(") "); + } + } + } + sb.append("."); + return new BatchingException(sb.toString()); + } +} diff --git a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java index 3a080f7ea..cb17ce620 100644 --- a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java +++ b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java @@ -37,8 +37,6 @@ import com.google.api.core.BetaApi; import com.google.api.core.InternalApi; import com.google.api.core.SettableApiFuture; -import com.google.api.gax.rpc.ApiException; -import com.google.api.gax.rpc.StatusCode; import com.google.api.gax.rpc.UnaryCallable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -49,17 +47,14 @@ import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; - /** * Queues up the elements until {@link #flush()} is called; once batching is over, returned future * resolves. @@ -88,13 +83,9 @@ public class BatcherImpl private final AtomicInteger numOfOutstandingBatches = new AtomicInteger(0); private final Object flushLock = new Object(); private final Object elementLock = new Object(); - private final Object errorLock = new Object(); private final Future scheduledFuture; private volatile boolean isClosed = false; - - private AtomicLong numOfFailure = new AtomicLong(); - private final Map failuresTypeCount = new ConcurrentHashMap<>(); - private final Map failureStatusCodeCount = new ConcurrentHashMap<>(); + private final BatchStats batchStats = new BatchStats(); /** * @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements @@ -137,6 +128,7 @@ public ApiFuture add(ElementT element) { Preconditions.checkState(!isClosed, "Cannot add elements on a closed batcher"); SettableApiFuture result = SettableApiFuture.create(); + ApiFutures.addCallback(result, batchStats.getEntryCallback(), directExecutor()); synchronized (elementLock) { currentOpenBatch.add(element, result); } @@ -171,6 +163,9 @@ public void sendOutstanding() { unaryCallable.futureCall(accumulatedBatch.builder.build()); numOfOutstandingBatches.incrementAndGet(); + + ApiFutures.addCallback( + batchResponse, batchStats.getRequestCallback(), directExecutor()); ApiFutures.addCallback( batchResponse, new ApiFutureCallback() { @@ -186,7 +181,6 @@ public void onSuccess(ResponseT response) { @Override public void onFailure(Throwable throwable) { try { - addException(throwable); accumulatedBatch.onBatchFailure(throwable); } finally { onBatchCompletion(); @@ -212,36 +206,6 @@ private void awaitAllOutstandingBatches() throws InterruptedException { } } - /** - * It keeps the count of number of failed RPCs. This method also tracks the count for exception - * type along with counts for different failed {@link StatusCode}s. - */ - private void addException(Throwable throwable) { - numOfFailure.incrementAndGet(); - Class exceptionClass = throwable.getClass(); - - if (throwable instanceof ApiException) { - StatusCode code = ((ApiException) throwable).getStatusCode(); - exceptionClass = ApiException.class; - - synchronized (errorLock) { - if (failureStatusCodeCount.containsKey(code)) { - failureStatusCodeCount.get(code).incrementAndGet(); - } else { - failureStatusCodeCount.put(code, new AtomicInteger(1)); - } - } - } - - synchronized (errorLock) { - if (failuresTypeCount.containsKey(exceptionClass)) { - failuresTypeCount.get(exceptionClass).incrementAndGet(); - } else { - failuresTypeCount.put(exceptionClass, new AtomicInteger(1)); - } - } - } - /** {@inheritDoc} */ @Override public void close() throws InterruptedException { @@ -251,11 +215,12 @@ public void close() throws InterruptedException { flush(); scheduledFuture.cancel(true); isClosed = true; - if (numOfFailure.get() > 0) { - throw new BatchingException(numOfFailure.get(), failuresTypeCount, failureStatusCodeCount); - } currentBatcherReference.closed = true; currentBatcherReference.clear(); + BatchingException exception = batchStats.asException(); + if (exception != null) { + throw exception; + } } /** diff --git a/gax/src/main/java/com/google/api/gax/batching/BatchingException.java b/gax/src/main/java/com/google/api/gax/batching/BatchingException.java index ff246daf7..d3a535add 100644 --- a/gax/src/main/java/com/google/api/gax/batching/BatchingException.java +++ b/gax/src/main/java/com/google/api/gax/batching/BatchingException.java @@ -29,41 +29,15 @@ */ package com.google.api.gax.batching; -import com.google.api.gax.rpc.StatusCode; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; +import com.google.api.core.BetaApi; +import com.google.api.core.InternalExtensionOnly; -/** - * This class represents the number of failed exceptions while performing Batching. It also provides - * the count of exceptions types and count of each failed statusCodes occurred in the Batching - * process. - */ +/** Represents exception occurred during batching. */ +@BetaApi("The surface for batching is not stable yet and may change in the future.") +@InternalExtensionOnly("For google-cloud-java client use only.") public class BatchingException extends RuntimeException { - private final long numOfFailure; - private final Map exceptionCount; - private final Map statusCodeCount; - - BatchingException( - long numOfFailure, - Map exceptionCount, - Map statusCodeCount) { - super("Failed to commit " + numOfFailure + " mutations"); - - this.numOfFailure = numOfFailure; - this.exceptionCount = exceptionCount; - this.statusCodeCount = statusCodeCount; - } - - public long getTotalFailureCount() { - return numOfFailure; - } - - public Map getFailureTypesCount() { - return exceptionCount; - } - - public Map getFailureStatusCodeCount() { - return statusCodeCount; + BatchingException(String message) { + super(message); } } diff --git a/gax/src/test/java/com/google/api/gax/batching/BatchStatsTest.java b/gax/src/test/java/com/google/api/gax/batching/BatchStatsTest.java new file mode 100644 index 000000000..c81d58798 --- /dev/null +++ b/gax/src/test/java/com/google/api/gax/batching/BatchStatsTest.java @@ -0,0 +1,104 @@ +/* + * Copyright 2019 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.batching; + +import static com.google.common.truth.Truth.assertThat; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; + +import com.google.api.core.ApiFutures; +import com.google.api.gax.rpc.ApiExceptionFactory; +import com.google.api.gax.rpc.StatusCode; +import com.google.api.gax.rpc.testing.FakeStatusCode; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class BatchStatsTest { + + @Test + public void testWhenNoException() { + BatchStats batchStats = new BatchStats(); + assertThat(batchStats.asException()).isNull(); + } + + @Test + public void testRequestFailuresOnly() { + BatchStats batchStats = new BatchStats(); + + ApiFutures.addCallback( + ApiFutures.immediateFailedFuture( + ApiExceptionFactory.createException( + new RuntimeException(), + FakeStatusCode.of(StatusCode.Code.INVALID_ARGUMENT), + false)), + batchStats.getRequestCallback(), + directExecutor()); + + ApiFutures.addCallback( + ApiFutures.immediateFailedFuture(new RuntimeException("Request failed")), + batchStats.getRequestCallback(), + directExecutor()); + + BatchingException exception = batchStats.asException(); + assertThat(exception).isNotNull(); + assertThat(exception.getMessage()).contains("2 batches failed to apply"); + assertThat(exception.getMessage()).contains("1 RuntimeException"); + assertThat(exception.getMessage()).contains("1 ApiException (1 INVALID_ARGUMENT ) "); + } + + @Test + public void testRequestAndEntryFailures() { + BatchStats batchStats = new BatchStats(); + ApiFutures.addCallback( + ApiFutures.immediateFailedFuture(new RuntimeException("Request failed")), + batchStats.getRequestCallback(), + directExecutor()); + + ApiFutures.addCallback( + ApiFutures.immediateFailedFuture(new NullPointerException()), + batchStats.getEntryCallback(), + directExecutor()); + + ApiFutures.addCallback( + ApiFutures.immediateFailedFuture( + ApiExceptionFactory.createException( + new RuntimeException(), FakeStatusCode.of(StatusCode.Code.UNAVAILABLE), false)), + batchStats.getEntryCallback(), + directExecutor()); + + BatchingException ex = batchStats.asException(); + assertThat(ex).isNotNull(); + assertThat(ex.getMessage()) + .contains("1 batches failed to apply due to: 1 RuntimeException 2 partial failures."); + assertThat(ex.getMessage()).contains("1 NullPointerException"); + assertThat(ex.getMessage()).contains("1 ApiException (1 UNAVAILABLE )"); + } +} diff --git a/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java b/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java index 1d5fffe9d..b6ca1a74c 100644 --- a/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java +++ b/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java @@ -39,14 +39,10 @@ import com.google.api.core.SettableApiFuture; import com.google.api.gax.batching.BatcherImpl.BatcherReference; import com.google.api.gax.rpc.ApiCallContext; -import com.google.api.gax.rpc.ApiException; -import com.google.api.gax.rpc.StatusCode; import com.google.api.gax.rpc.UnaryCallable; -import com.google.api.gax.rpc.UnimplementedException; import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntList; import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntSquarerCallable; import com.google.api.gax.rpc.testing.FakeBatchableApi.SquarerBatchingDescriptorV2; -import com.google.api.gax.rpc.testing.FakeStatusCode; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; @@ -200,14 +196,14 @@ public ApiFuture> futureCall( return ApiFutures.immediateFailedFuture(fakeError); } }; - underTest = new BatcherImpl<>( SQUARER_BATCHING_DESC_V2, unaryCallable, labeledIntList, batchingSettings, EXECUTOR); Future failedResult = underTest.add(5); + underTest.add(6); + underTest.add(7); underTest.flush(); assertThat(failedResult.isDone()).isTrue(); - Throwable actualError = null; try { failedResult.get(); @@ -221,11 +217,12 @@ public ApiFuture> futureCall( } catch (RuntimeException e) { actualError = e; } + + assertThat(actualError).isNotNull(); assertThat(actualError).isInstanceOf(BatchingException.class); - BatchingException batchingEx = (BatchingException) actualError; - assertThat(batchingEx.getTotalFailureCount()).isEqualTo(1); - assertThat(batchingEx.getFailureTypesCount()).containsKey(RuntimeException.class); - assertThat(batchingEx.getFailureStatusCodeCount()).isEmpty(); + assertThat(actualError.getMessage()) + .contains("1 batches failed to apply due to: 1 RuntimeException"); + assertThat(actualError.getMessage()).contains("3 entries that failed with: 3 RuntimeException"); } /** Resolves future results when {@link BatchingDescriptor#splitResponse} throws exception. */ @@ -254,6 +251,13 @@ public void splitResponse( } assertThat(actualError).hasCauseThat().isSameInstanceAs(fakeError); + try { + underTest.close(); + } catch (Exception batchingEx) { + actualError = batchingEx; + } + assertThat(actualError).isInstanceOf(BatchingException.class); + assertThat(actualError.getMessage()).contains("Batching finished with 1 partial failures."); } /** Resolves future results when {@link BatchingDescriptor#splitException} throws exception */ @@ -287,6 +291,12 @@ public void splitException(Throwable throwable, List> } assertThat(actualError).hasCauseThat().isSameInstanceAs(fakeError); + try { + underTest.close(); + } catch (Exception ex) { + actualError = ex; + } + assertThat(actualError).isInstanceOf(BatchingException.class); } @Test @@ -446,6 +456,81 @@ public ApiFuture> futureCall( underTest.flush(); } + /** To confirm the partial failures in Batching does not mark whole batch failed */ + @Test + public void testPartialFailureWithSplitResponse() throws Exception { + SquarerBatchingDescriptorV2 descriptor = + new SquarerBatchingDescriptorV2() { + @Override + public void splitResponse( + List batchResponse, List> batch) { + for (int i = 0; i < batchResponse.size(); i++) { + if (batchResponse.get(i) > 10_000) { + batch.get(i).setException(new ArithmeticException()); + } else { + batch.get(i).set(batchResponse.get(i)); + } + } + } + }; + + underTest = + new BatcherImpl<>( + descriptor, callLabeledIntSquarer, labeledIntList, batchingSettings, EXECUTOR); + underTest.add(10); + // This will cause partial failure + underTest.add(200); + underTest.flush(); + + underTest.add(40); + underTest.add(50); + underTest.flush(); + + // This will cause partial failure + underTest.add(500); + Exception actualError = null; + try { + underTest.close(); + } catch (Exception e) { + actualError = e; + } + assertThat(actualError).isInstanceOf(BatchingException.class); + assertThat(actualError).isNotNull(); + assertThat(actualError.getMessage()).doesNotContain("batches failed to apply due"); + assertThat(actualError.getMessage()).contains("Batching finished with 1 partial failures."); + assertThat(actualError.getMessage()).contains("2 ArithmeticException"); + } + + @Test + public void testPartialFailureInResultProcessing() throws Exception { + SquarerBatchingDescriptorV2 descriptor = + new SquarerBatchingDescriptorV2() { + + @Override + public void splitResponse( + List batchResponse, List> batch) { + throw new NullPointerException("To verify exceptions from result processing"); + } + }; + + underTest = + new BatcherImpl<>( + descriptor, callLabeledIntSquarer, labeledIntList, batchingSettings, EXECUTOR); + underTest.add(10); + underTest.add(20); + + Exception actualError = null; + try { + underTest.close(); + } catch (Exception e) { + actualError = e; + } + assertThat(actualError).isInstanceOf(BatchingException.class); + assertThat(actualError).isNotNull(); + assertThat(actualError.getMessage()).contains("Batching finished with 1 partial failures."); + assertThat(actualError.getMessage()).contains("2 NullPointerException"); + } + /** * Validates the presence of warning in case {@link BatcherImpl} is garbage collected without * being closed first. @@ -531,37 +616,6 @@ public boolean isLoggable(LogRecord record) { } } - @Test - public void testExceptionWhileBatching() { - final Exception fakeError = new RuntimeException(); - UnaryCallable> unaryCallable = - new UnaryCallable>() { - @Override - public ApiFuture> futureCall( - LabeledIntList request, ApiCallContext context) { - return ApiFutures.immediateFailedFuture( - new UnimplementedException( - fakeError, FakeStatusCode.of(StatusCode.Code.FAILED_PRECONDITION), false)); - } - }; - underTest = - new BatcherImpl<>( - SQUARER_BATCHING_DESC_V2, unaryCallable, labeledIntList, batchingSettings, EXECUTOR); - underTest.add(2); - Exception actualError = null; - try { - underTest.close(); - } catch (Exception e) { - actualError = e; - } - assertThat(actualError).isInstanceOf(BatchingException.class); - BatchingException batchingEx = (BatchingException) actualError; - assertThat(batchingEx.getTotalFailureCount()).isEqualTo(1); - assertThat(batchingEx.getFailureTypesCount()).containsKey(ApiException.class); - assertThat(batchingEx.getFailureStatusCodeCount()) - .containsKey(FakeStatusCode.of(StatusCode.Code.FAILED_PRECONDITION)); - } - private void testElementTriggers(BatchingSettings settings) throws Exception { underTest = new BatcherImpl<>( diff --git a/gax/src/test/java/com/google/api/gax/batching/BatchingExceptionTest.java b/gax/src/test/java/com/google/api/gax/batching/BatchingExceptionTest.java deleted file mode 100644 index 0135f1d10..000000000 --- a/gax/src/test/java/com/google/api/gax/batching/BatchingExceptionTest.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright 2019 Google LLC - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google LLC nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ -package com.google.api.gax.batching; - -import static com.google.common.truth.Truth.assertThat; - -import com.google.api.gax.rpc.StatusCode; -import com.google.api.gax.rpc.testing.FakeStatusCode; -import java.io.IOException; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -@RunWith(JUnit4.class) -public class BatchingExceptionTest { - - @Test - public void testBatchingException() { - Map failureCounts = new ConcurrentHashMap<>(); - failureCounts.put(RuntimeException.class, new AtomicInteger(6)); - failureCounts.put(IOException.class, new AtomicInteger(3)); - - Map statusCounts = new ConcurrentHashMap<>(); - statusCounts.put(FakeStatusCode.of(StatusCode.Code.UNIMPLEMENTED), new AtomicInteger(34)); - statusCounts.put(FakeStatusCode.of(StatusCode.Code.INVALID_ARGUMENT), new AtomicInteger(324)); - - BatchingException underTest = new BatchingException(10, failureCounts, statusCounts); - assertThat(underTest).isInstanceOf(RuntimeException.class); - assertThat(underTest.getTotalFailureCount()).isEqualTo(10); - assertThat(underTest.getFailureTypesCount()).isEqualTo(failureCounts); - assertThat(underTest.getFailureStatusCodeCount()).isEqualTo(statusCounts); - } -} From ecfe7f0148b9f20caea84ec1664b706bc16f299f Mon Sep 17 00:00:00 2001 From: Rahul Kesharwani Date: Tue, 22 Oct 2019 00:07:07 +0530 Subject: [PATCH 04/11] fixed license header for BatchStats.java --- gax/src/main/java/com/google/api/gax/batching/BatchStats.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/gax/src/main/java/com/google/api/gax/batching/BatchStats.java b/gax/src/main/java/com/google/api/gax/batching/BatchStats.java index eae517087..4701109e9 100644 --- a/gax/src/main/java/com/google/api/gax/batching/BatchStats.java +++ b/gax/src/main/java/com/google/api/gax/batching/BatchStats.java @@ -9,8 +9,7 @@ * notice, this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following disclaimer - * in the documentation - * /or other materials provided with the + * in the documentation and/or other materials provided with the * distribution. * * Neither the name of Google LLC nor the names of its * contributors may be used to endorse or promote products derived from From aa5ec0bed3787d4535aa75a09d7d3644ca2be0a2 Mon Sep 17 00:00:00 2001 From: Rahul Kesharwani Date: Tue, 22 Oct 2019 18:54:19 +0530 Subject: [PATCH 05/11] Updated partial failure counting logic to only come in effect when RPC is successful. --- .../google/api/gax/batching/BatcherImpl.java | 14 ++++++--- .../api/gax/batching/BatcherImplTest.java | 30 +++++++++++++++---- 2 files changed, 34 insertions(+), 10 deletions(-) diff --git a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java index cb17ce620..326230441 100644 --- a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java +++ b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java @@ -108,7 +108,7 @@ public BatcherImpl( this.batchingSettings = Preconditions.checkNotNull(batchingSettings, "batching setting cannot be null"); Preconditions.checkNotNull(executor, "executor cannot be null"); - currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings); + currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batchStats); if (batchingSettings.getDelayThreshold() != null) { long delay = batchingSettings.getDelayThreshold().toMillis(); @@ -128,7 +128,6 @@ public ApiFuture add(ElementT element) { Preconditions.checkState(!isClosed, "Cannot add elements on a closed batcher"); SettableApiFuture result = SettableApiFuture.create(); - ApiFutures.addCallback(result, batchStats.getEntryCallback(), directExecutor()); synchronized (elementLock) { currentOpenBatch.add(element, result); } @@ -156,7 +155,7 @@ public void sendOutstanding() { return; } accumulatedBatch = currentOpenBatch; - currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings); + currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batchStats); } final ApiFuture batchResponse = @@ -231,6 +230,7 @@ private static class Batch { private final BatchingRequestBuilder builder; private final List> results; private final BatchingDescriptor descriptor; + private final BatchStats batchStats; private final long elementThreshold; private final long bytesThreshold; @@ -240,7 +240,8 @@ private static class Batch { private Batch( RequestT prototype, BatchingDescriptor descriptor, - BatchingSettings batchingSettings) { + BatchingSettings batchingSettings, + BatchStats batchStats) { this.descriptor = descriptor; this.builder = descriptor.newRequestBuilder(prototype); this.results = new ArrayList<>(); @@ -248,6 +249,7 @@ private Batch( this.elementThreshold = elementCountThreshold == null ? 0 : elementCountThreshold; Long requestByteThreshold = batchingSettings.getRequestByteThreshold(); this.bytesThreshold = requestByteThreshold == null ? 0 : requestByteThreshold; + this.batchStats = batchStats; } void add(ElementT element, SettableApiFuture result) { @@ -259,6 +261,10 @@ void add(ElementT element, SettableApiFuture result) { void onBatchSuccess(ResponseT response) { try { + for (ApiFuture resultFutures : results) { + ApiFutures.addCallback( + resultFutures, batchStats.getEntryCallback(), directExecutor()); + } descriptor.splitResponse(response, results); } catch (Exception ex) { onBatchFailure(ex); diff --git a/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java b/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java index b6ca1a74c..9d9bfaa26 100644 --- a/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java +++ b/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java @@ -43,8 +43,10 @@ import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntList; import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntSquarerCallable; import com.google.api.gax.rpc.testing.FakeBatchableApi.SquarerBatchingDescriptorV2; +import com.google.common.collect.Queues; import java.util.ArrayList; import java.util.List; +import java.util.Queue; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -222,7 +224,6 @@ public ApiFuture> futureCall( assertThat(actualError).isInstanceOf(BatchingException.class); assertThat(actualError.getMessage()) .contains("1 batches failed to apply due to: 1 RuntimeException"); - assertThat(actualError.getMessage()).contains("3 entries that failed with: 3 RuntimeException"); } /** Resolves future results when {@link BatchingDescriptor#splitResponse} throws exception. */ @@ -495,7 +496,6 @@ public void splitResponse( actualError = e; } assertThat(actualError).isInstanceOf(BatchingException.class); - assertThat(actualError).isNotNull(); assertThat(actualError.getMessage()).doesNotContain("batches failed to apply due"); assertThat(actualError.getMessage()).contains("Batching finished with 1 partial failures."); assertThat(actualError.getMessage()).contains("2 ArithmeticException"); @@ -503,21 +503,37 @@ public void splitResponse( @Test public void testPartialFailureInResultProcessing() throws Exception { + final Queue queue = Queues.newArrayBlockingQueue(3); + queue.add(new NullPointerException()); + queue.add(new RuntimeException()); + queue.add(new ArithmeticException()); + SquarerBatchingDescriptorV2 descriptor = new SquarerBatchingDescriptorV2() { @Override public void splitResponse( List batchResponse, List> batch) { - throw new NullPointerException("To verify exceptions from result processing"); + throw queue.poll(); } }; underTest = new BatcherImpl<>( descriptor, callLabeledIntSquarer, labeledIntList, batchingSettings, EXECUTOR); + // This batch should fail with NullPointerException underTest.add(10); + underTest.flush(); + + // This batch should fail with RuntimeException underTest.add(20); + underTest.add(30); + underTest.flush(); + + // This batch should fail with ArithmeticException + underTest.add(40); + underTest.add(50); + underTest.add(60); Exception actualError = null; try { @@ -526,9 +542,11 @@ public void splitResponse( actualError = e; } assertThat(actualError).isInstanceOf(BatchingException.class); - assertThat(actualError).isNotNull(); - assertThat(actualError.getMessage()).contains("Batching finished with 1 partial failures."); - assertThat(actualError.getMessage()).contains("2 NullPointerException"); + assertThat(actualError.getMessage()) + .contains("The 3 partial failures contained 6 entries that failed with:"); + assertThat(actualError.getMessage()).contains("1 NullPointerException"); + assertThat(actualError.getMessage()).contains("2 RuntimeException"); + assertThat(actualError.getMessage()).contains("3 ArithmeticException"); } /** From 1c9fe27744229a1ccc9f663a37d119c83874adca Mon Sep 17 00:00:00 2001 From: Rahul Kesharwani Date: Wed, 23 Oct 2019 19:43:45 +0530 Subject: [PATCH 06/11] Updated BatcherStats to accept Throwable and List of ResultFutures Addressed feedback comments to simplify BatcherStats --- .../google/api/gax/batching/BatchStats.java | 189 ----------------- .../google/api/gax/batching/BatcherImpl.java | 23 +-- .../google/api/gax/batching/BatcherStats.java | 194 ++++++++++++++++++ .../api/gax/batching/BatcherImplTest.java | 11 +- ...chStatsTest.java => BatcherStatsTest.java} | 64 +++--- 5 files changed, 237 insertions(+), 244 deletions(-) delete mode 100644 gax/src/main/java/com/google/api/gax/batching/BatchStats.java create mode 100644 gax/src/main/java/com/google/api/gax/batching/BatcherStats.java rename gax/src/test/java/com/google/api/gax/batching/{BatchStatsTest.java => BatcherStatsTest.java} (61%) diff --git a/gax/src/main/java/com/google/api/gax/batching/BatchStats.java b/gax/src/main/java/com/google/api/gax/batching/BatchStats.java deleted file mode 100644 index 4701109e9..000000000 --- a/gax/src/main/java/com/google/api/gax/batching/BatchStats.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Copyright 2019 Google LLC - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google LLC nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ -package com.google.api.gax.batching; - -import com.google.api.core.ApiFutureCallback; -import com.google.api.gax.rpc.ApiException; -import com.google.api.gax.rpc.StatusCode; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; -import javax.annotation.Nullable; - -/** - * This class keeps the statistics about failed operations(both at RPC and ElementT level) in {@link - * Batcher}. This provides the count of individual exception failure and count of each failed {@link - * StatusCode.Code} occurred in the batching process. - */ -class BatchStats { - - private final Map requestExceptionCounts = new ConcurrentHashMap<>(); - private final Map requestStatusCounts = new ConcurrentHashMap<>(); - private final AtomicInteger partialBatchFailures = new AtomicInteger(0); - private final Map entryExceptionCounts = new ConcurrentHashMap<>(); - private final Map entryStatusCounts = new ConcurrentHashMap<>(); - - private final Object errorLock = new Object(); - private final Object statusLock = new Object(); - - ApiFutureCallback getRequestCallback() { - return new ApiFutureCallback() { - public void onFailure(Throwable t) { - recordRequestException(t); - } - - @Override - public void onSuccess(T result) {} - }; - } - - ApiFutureCallback getEntryCallback() { - return new ApiFutureCallback() { - public void onFailure(Throwable t) { - recordEntryException(t); - } - - @Override - public void onSuccess(T result) {} - }; - } - - private void recordRequestException(Throwable throwable) { - Class exceptionClass = throwable.getClass(); - - if (throwable instanceof ApiException) { - StatusCode.Code code = ((ApiException) throwable).getStatusCode().getCode(); - exceptionClass = ApiException.class; - - synchronized (statusLock) { - if (requestStatusCounts.containsKey(code)) { - requestStatusCounts.get(code).incrementAndGet(); - } else { - requestStatusCounts.put(code, new AtomicInteger(1)); - } - } - } - - synchronized (errorLock) { - if (requestExceptionCounts.containsKey(exceptionClass)) { - requestExceptionCounts.get(exceptionClass).incrementAndGet(); - } else { - synchronized (errorLock) { - requestExceptionCounts.put(exceptionClass, new AtomicInteger(1)); - } - } - } - } - - private void recordEntryException(Throwable throwable) { - Class exceptionClass = throwable.getClass(); - - if (throwable instanceof ApiException) { - StatusCode.Code code = ((ApiException) throwable).getStatusCode().getCode(); - exceptionClass = ApiException.class; - - synchronized (statusLock) { - if (entryStatusCounts.containsKey(code)) { - entryStatusCounts.get(code).incrementAndGet(); - } else { - entryStatusCounts.put(code, new AtomicInteger(1)); - } - } - } - - synchronized (errorLock) { - if (entryExceptionCounts.containsKey(exceptionClass)) { - entryExceptionCounts.get(exceptionClass).incrementAndGet(); - } else { - partialBatchFailures.incrementAndGet(); - entryExceptionCounts.put(exceptionClass, new AtomicInteger(1)); - } - } - } - - /** Calculates and formats the message with request and entry failure count. */ - @Nullable - BatchingException asException() { - if (requestExceptionCounts.isEmpty() && partialBatchFailures.get() == 0) { - return null; - } - - StringBuilder sb = new StringBuilder(); - int batchFailures = requestExceptionCounts.size(); - - if (requestExceptionCounts.isEmpty()) { - sb.append("Batching finished with "); - } else { - sb.append(String.format("%d batches failed to apply due to: ", batchFailures)); - - // compose the exception and return it - for (Class req : requestExceptionCounts.keySet()) { - sb.append( - String.format("%d %s ", requestExceptionCounts.get(req).get(), req.getSimpleName())); - if (req.equals(ApiException.class)) { - sb.append("("); - for (StatusCode.Code statusCode : requestStatusCounts.keySet()) { - sb.append( - String.format("%d %s ", requestStatusCounts.get(statusCode).get(), statusCode)); - } - sb.append(") "); - } - } - } - - if (partialBatchFailures.get() > 0) { - sb.append(String.format("%d partial failures.", partialBatchFailures.get())); - - int totalEntriesEx = 0; - for (AtomicInteger ai : entryExceptionCounts.values()) { - totalEntriesEx += ai.get(); - } - - sb.append( - String.format( - " The %d partial failures contained %d entries that failed with: ", - partialBatchFailures.get(), totalEntriesEx)); - - for (Class entry : entryExceptionCounts.keySet()) { - sb.append( - String.format("%d %s ", entryExceptionCounts.get(entry).get(), entry.getSimpleName())); - if (entry.equals(ApiException.class)) { - sb.append("("); - for (StatusCode.Code code : entryStatusCounts.keySet()) { - sb.append(String.format("%d %s ", entryStatusCounts.get(code).get(), code)); - } - sb.append(") "); - } - } - } - sb.append("."); - return new BatchingException(sb.toString()); - } -} diff --git a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java index 326230441..9eed37835 100644 --- a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java +++ b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java @@ -85,7 +85,7 @@ public class BatcherImpl private final Object elementLock = new Object(); private final Future scheduledFuture; private volatile boolean isClosed = false; - private final BatchStats batchStats = new BatchStats(); + private final BatcherStats batcherStats = new BatcherStats(); /** * @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements @@ -108,7 +108,7 @@ public BatcherImpl( this.batchingSettings = Preconditions.checkNotNull(batchingSettings, "batching setting cannot be null"); Preconditions.checkNotNull(executor, "executor cannot be null"); - currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batchStats); + currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats); if (batchingSettings.getDelayThreshold() != null) { long delay = batchingSettings.getDelayThreshold().toMillis(); @@ -155,16 +155,13 @@ public void sendOutstanding() { return; } accumulatedBatch = currentOpenBatch; - currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batchStats); + currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats); } final ApiFuture batchResponse = unaryCallable.futureCall(accumulatedBatch.builder.build()); numOfOutstandingBatches.incrementAndGet(); - - ApiFutures.addCallback( - batchResponse, batchStats.getRequestCallback(), directExecutor()); ApiFutures.addCallback( batchResponse, new ApiFutureCallback() { @@ -180,6 +177,7 @@ public void onSuccess(ResponseT response) { @Override public void onFailure(Throwable throwable) { try { + batcherStats.recordBatchFailure(throwable); accumulatedBatch.onBatchFailure(throwable); } finally { onBatchCompletion(); @@ -216,7 +214,7 @@ public void close() throws InterruptedException { isClosed = true; currentBatcherReference.closed = true; currentBatcherReference.clear(); - BatchingException exception = batchStats.asException(); + BatchingException exception = batcherStats.asException(); if (exception != null) { throw exception; } @@ -230,7 +228,7 @@ private static class Batch { private final BatchingRequestBuilder builder; private final List> results; private final BatchingDescriptor descriptor; - private final BatchStats batchStats; + private final BatcherStats batcherStats; private final long elementThreshold; private final long bytesThreshold; @@ -241,7 +239,7 @@ private Batch( RequestT prototype, BatchingDescriptor descriptor, BatchingSettings batchingSettings, - BatchStats batchStats) { + BatcherStats batcherStats) { this.descriptor = descriptor; this.builder = descriptor.newRequestBuilder(prototype); this.results = new ArrayList<>(); @@ -249,7 +247,7 @@ private Batch( this.elementThreshold = elementCountThreshold == null ? 0 : elementCountThreshold; Long requestByteThreshold = batchingSettings.getRequestByteThreshold(); this.bytesThreshold = requestByteThreshold == null ? 0 : requestByteThreshold; - this.batchStats = batchStats; + this.batcherStats = batcherStats; } void add(ElementT element, SettableApiFuture result) { @@ -261,10 +259,7 @@ void add(ElementT element, SettableApiFuture result) { void onBatchSuccess(ResponseT response) { try { - for (ApiFuture resultFutures : results) { - ApiFutures.addCallback( - resultFutures, batchStats.getEntryCallback(), directExecutor()); - } + batcherStats.recordBatchElementsCompletion(results); descriptor.splitResponse(response, results); } catch (Exception ex) { onBatchFailure(ex); diff --git a/gax/src/main/java/com/google/api/gax/batching/BatcherStats.java b/gax/src/main/java/com/google/api/gax/batching/BatcherStats.java new file mode 100644 index 000000000..da1d9e71f --- /dev/null +++ b/gax/src/main/java/com/google/api/gax/batching/BatcherStats.java @@ -0,0 +1,194 @@ +/* + * Copyright 2019 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.batching; + +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.StatusCode; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nullable; + +/** + * This class keeps the statistics about failed operations(both at RPC and ElementT level) in {@link + * Batcher}. This provides the count of individual exception failure and count of each failed {@link + * StatusCode.Code} occurred in the batching process. + */ +class BatcherStats { + + private final Map requestExceptionCounts = new ConcurrentHashMap<>(); + private final Map requestStatusCounts = new ConcurrentHashMap<>(); + private final AtomicInteger partialBatchFailures = new AtomicInteger(0); + private final Map entryExceptionCounts = new ConcurrentHashMap<>(); + private final Map entryStatusCounts = new ConcurrentHashMap<>(); + + private final Object lock = new Object(); + + synchronized void recordBatchFailure(Throwable throwable) { + Class exceptionClass = throwable.getClass(); + + synchronized (lock) { + if (throwable instanceof ApiException) { + StatusCode.Code code = ((ApiException) throwable).getStatusCode().getCode(); + exceptionClass = ApiException.class; + + if (requestStatusCounts.containsKey(code)) { + Integer codeCount = requestStatusCounts.get(code); + requestStatusCounts.put(code, ++codeCount); + + } else { + requestStatusCounts.put(code, 1); + } + } + + if (requestExceptionCounts.containsKey(exceptionClass)) { + Integer exCount = requestExceptionCounts.get(exceptionClass); + requestExceptionCounts.put(exceptionClass, ++exCount); + } else { + requestExceptionCounts.put(exceptionClass, 1); + } + } + } + + void recordBatchElementsCompletion(List> batchElementResultFutures) { + final AtomicBoolean markBatchFailure = new AtomicBoolean(); + + for (ApiFuture elementResult : batchElementResultFutures) { + ApiFutures.addCallback( + elementResult, + new ApiFutureCallback() { + @Override + public void onFailure(Throwable throwable) { + if (markBatchFailure.compareAndSet(false, true)) { + partialBatchFailures.incrementAndGet(); + } + Class exceptionClass = throwable.getClass(); + + synchronized (lock) { + if (throwable instanceof ApiException) { + StatusCode.Code code = ((ApiException) throwable).getStatusCode().getCode(); + exceptionClass = ApiException.class; + + if (entryStatusCounts.containsKey(code)) { + Integer statusCount = entryStatusCounts.get(code); + entryStatusCounts.put(code, ++statusCount); + } else { + entryStatusCounts.put(code, 1); + } + } + + if (entryExceptionCounts.containsKey(exceptionClass)) { + Integer exCount = entryExceptionCounts.get(exceptionClass); + entryExceptionCounts.put(exceptionClass, ++exCount); + } else { + entryExceptionCounts.put(exceptionClass, 1); + } + } + } + + @Override + public void onSuccess(T result) {} + }, + directExecutor()); + } + } + + /** Calculates and formats the message with request and entry failure count. */ + @Nullable + BatchingException asException() { + synchronized (lock) { + int partialFailures = partialBatchFailures.get(); + if (requestExceptionCounts.isEmpty() && partialFailures == 0) { + return null; + } + + StringBuilder sb = new StringBuilder(); + int batchFailures = requestExceptionCounts.size(); + + if (requestExceptionCounts.isEmpty()) { + sb.append("Batching finished with "); + } else { + sb.append(String.format("%d batches failed to apply due to: ", batchFailures)); + + for (Class request : requestExceptionCounts.keySet()) { + sb.append( + String.format( + "%d %s ", requestExceptionCounts.get(request), request.getSimpleName())); + if (request.equals(ApiException.class)) { + + sb.append("("); + for (StatusCode.Code statusCode : requestStatusCounts.keySet()) { + sb.append(String.format("%d %s ", requestStatusCounts.get(statusCode), statusCode)); + } + sb.append(") "); + } + } + if (partialFailures > 0) { + sb.append("and "); + } + } + + sb.append(String.format("%d partial failures.", partialFailures)); + if (partialFailures > 0) { + int totalEntriesFailureCount = 0; + for (Integer count : entryExceptionCounts.values()) { + totalEntriesFailureCount += count; + } + + sb.append( + String.format( + " The %d partial failures contained %d entries that failed with: ", + partialFailures, totalEntriesFailureCount)); + + for (Class entry : entryExceptionCounts.keySet()) { + sb.append( + String.format("%d %s ", entryExceptionCounts.get(entry), entry.getSimpleName())); + if (entry.equals(ApiException.class)) { + sb.append("("); + for (StatusCode.Code code : entryStatusCounts.keySet()) { + sb.append(String.format("%d %s ", entryStatusCounts.get(code), code)); + } + sb.append(") "); + } + } + sb.append("."); + } + return new BatchingException(sb.toString()); + } + } +} diff --git a/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java b/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java index 9d9bfaa26..99e15f3f3 100644 --- a/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java +++ b/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java @@ -205,6 +205,7 @@ public ApiFuture> futureCall( underTest.add(6); underTest.add(7); underTest.flush(); + assertThat(failedResult.isDone()).isTrue(); Throwable actualError = null; try { @@ -487,8 +488,9 @@ public void splitResponse( underTest.add(50); underTest.flush(); - // This will cause partial failure + // These will cause partial failure underTest.add(500); + underTest.add(600); Exception actualError = null; try { underTest.close(); @@ -496,9 +498,10 @@ public void splitResponse( actualError = e; } assertThat(actualError).isInstanceOf(BatchingException.class); - assertThat(actualError.getMessage()).doesNotContain("batches failed to apply due"); - assertThat(actualError.getMessage()).contains("Batching finished with 1 partial failures."); - assertThat(actualError.getMessage()).contains("2 ArithmeticException"); + assertThat(actualError.getMessage()).contains("Batching finished with 2 partial failures."); + assertThat(actualError.getMessage()) + .contains( + "The 2 partial failures contained 3 entries that failed with: 3 ArithmeticException ."); } @Test diff --git a/gax/src/test/java/com/google/api/gax/batching/BatchStatsTest.java b/gax/src/test/java/com/google/api/gax/batching/BatcherStatsTest.java similarity index 61% rename from gax/src/test/java/com/google/api/gax/batching/BatchStatsTest.java rename to gax/src/test/java/com/google/api/gax/batching/BatcherStatsTest.java index c81d58798..78abe78d9 100644 --- a/gax/src/test/java/com/google/api/gax/batching/BatchStatsTest.java +++ b/gax/src/test/java/com/google/api/gax/batching/BatcherStatsTest.java @@ -30,44 +30,35 @@ package com.google.api.gax.batching; import static com.google.common.truth.Truth.assertThat; -import static com.google.common.util.concurrent.MoreExecutors.directExecutor; -import com.google.api.core.ApiFutures; +import com.google.api.core.SettableApiFuture; import com.google.api.gax.rpc.ApiExceptionFactory; import com.google.api.gax.rpc.StatusCode; import com.google.api.gax.rpc.testing.FakeStatusCode; +import com.google.common.collect.ImmutableList; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @RunWith(JUnit4.class) -public class BatchStatsTest { +public class BatcherStatsTest { @Test public void testWhenNoException() { - BatchStats batchStats = new BatchStats(); - assertThat(batchStats.asException()).isNull(); + BatcherStats batcherStats = new BatcherStats(); + assertThat(batcherStats.asException()).isNull(); } @Test public void testRequestFailuresOnly() { - BatchStats batchStats = new BatchStats(); + BatcherStats batcherStats = new BatcherStats(); - ApiFutures.addCallback( - ApiFutures.immediateFailedFuture( - ApiExceptionFactory.createException( - new RuntimeException(), - FakeStatusCode.of(StatusCode.Code.INVALID_ARGUMENT), - false)), - batchStats.getRequestCallback(), - directExecutor()); + batcherStats.recordBatchFailure( + ApiExceptionFactory.createException( + new RuntimeException(), FakeStatusCode.of(StatusCode.Code.INVALID_ARGUMENT), false)); + batcherStats.recordBatchFailure(new RuntimeException("Request failed")); - ApiFutures.addCallback( - ApiFutures.immediateFailedFuture(new RuntimeException("Request failed")), - batchStats.getRequestCallback(), - directExecutor()); - - BatchingException exception = batchStats.asException(); + BatchingException exception = batcherStats.asException(); assertThat(exception).isNotNull(); assertThat(exception.getMessage()).contains("2 batches failed to apply"); assertThat(exception.getMessage()).contains("1 RuntimeException"); @@ -76,28 +67,27 @@ public void testRequestFailuresOnly() { @Test public void testRequestAndEntryFailures() { - BatchStats batchStats = new BatchStats(); - ApiFutures.addCallback( - ApiFutures.immediateFailedFuture(new RuntimeException("Request failed")), - batchStats.getRequestCallback(), - directExecutor()); + BatcherStats batcherStats = new BatcherStats(); + batcherStats.recordBatchFailure(new RuntimeException("Request failed")); + + SettableApiFuture runTimeFail = SettableApiFuture.create(); + runTimeFail.setException(new IllegalStateException()); + batcherStats.recordBatchElementsCompletion(ImmutableList.of(runTimeFail)); - ApiFutures.addCallback( - ApiFutures.immediateFailedFuture(new NullPointerException()), - batchStats.getEntryCallback(), - directExecutor()); + SettableApiFuture apiExceptionFuture = SettableApiFuture.create(); + SettableApiFuture npeFuture = SettableApiFuture.create(); + npeFuture.setException(new NullPointerException()); + apiExceptionFuture.setException( + ApiExceptionFactory.createException( + new RuntimeException(), FakeStatusCode.of(StatusCode.Code.UNAVAILABLE), false)); - ApiFutures.addCallback( - ApiFutures.immediateFailedFuture( - ApiExceptionFactory.createException( - new RuntimeException(), FakeStatusCode.of(StatusCode.Code.UNAVAILABLE), false)), - batchStats.getEntryCallback(), - directExecutor()); + batcherStats.recordBatchElementsCompletion(ImmutableList.of(npeFuture, apiExceptionFuture)); - BatchingException ex = batchStats.asException(); + BatchingException ex = batcherStats.asException(); assertThat(ex).isNotNull(); assertThat(ex.getMessage()) - .contains("1 batches failed to apply due to: 1 RuntimeException 2 partial failures."); + .contains("1 batches failed to apply due to: 1 RuntimeException and 2 partial failures."); + assertThat(ex.getMessage()).contains("1 IllegalStateException"); assertThat(ex.getMessage()).contains("1 NullPointerException"); assertThat(ex.getMessage()).contains("1 ApiException (1 UNAVAILABLE )"); } From e23f6b76e9d999e61b3a0e79667a4588fc13da13 Mon Sep 17 00:00:00 2001 From: Rahul Kesharwani Date: Thu, 24 Oct 2019 01:51:38 +0530 Subject: [PATCH 07/11] Updated BatcherStats to address feedback comments --- .../google/api/gax/batching/BatcherImpl.java | 3 +- .../google/api/gax/batching/BatcherStats.java | 197 +++++++++--------- .../api/gax/batching/BatchingException.java | 4 +- .../api/gax/batching/BatcherImplTest.java | 25 ++- .../api/gax/batching/BatcherStatsTest.java | 63 ++++-- 5 files changed, 160 insertions(+), 132 deletions(-) diff --git a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java index 9eed37835..16710bc14 100644 --- a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java +++ b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java @@ -177,7 +177,6 @@ public void onSuccess(ResponseT response) { @Override public void onFailure(Throwable throwable) { try { - batcherStats.recordBatchFailure(throwable); accumulatedBatch.onBatchFailure(throwable); } finally { onBatchCompletion(); @@ -257,6 +256,7 @@ void add(ElementT element, SettableApiFuture result) { byteCounter += descriptor.countBytes(element); } + // TODO: Update exception in splitResponse(), So that it does not marks complete batch failed. void onBatchSuccess(ResponseT response) { try { batcherStats.recordBatchElementsCompletion(results); @@ -268,6 +268,7 @@ void onBatchSuccess(ResponseT response) { void onBatchFailure(Throwable throwable) { try { + batcherStats.recordBatchFailure(throwable); descriptor.splitException(throwable, results); } catch (Exception ex) { for (SettableApiFuture result : results) { diff --git a/gax/src/main/java/com/google/api/gax/batching/BatcherStats.java b/gax/src/main/java/com/google/api/gax/batching/BatcherStats.java index da1d9e71f..636d502ea 100644 --- a/gax/src/main/java/com/google/api/gax/batching/BatcherStats.java +++ b/gax/src/main/java/com/google/api/gax/batching/BatcherStats.java @@ -34,95 +34,94 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; -import com.google.api.core.SettableApiFuture; import com.google.api.gax.rpc.ApiException; -import com.google.api.gax.rpc.StatusCode; +import com.google.api.gax.rpc.StatusCode.Code; +import com.google.common.base.MoreObjects; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; /** - * This class keeps the statistics about failed operations(both at RPC and ElementT level) in {@link - * Batcher}. This provides the count of individual exception failure and count of each failed {@link - * StatusCode.Code} occurred in the batching process. + * Keeps the statistics about failed operations(both at RPC and ElementT) in {@link Batcher}. This + * provides the count of individual exception failure and count of each failed {@link Code} occurred + * in the batching process. */ class BatcherStats { - private final Map requestExceptionCounts = new ConcurrentHashMap<>(); - private final Map requestStatusCounts = new ConcurrentHashMap<>(); - private final AtomicInteger partialBatchFailures = new AtomicInteger(0); - private final Map entryExceptionCounts = new ConcurrentHashMap<>(); - private final Map entryStatusCounts = new ConcurrentHashMap<>(); - + private final Map requestExceptionCounts = new HashMap<>(); + private final Map requestStatusCounts = new HashMap<>(); + private final Map entryExceptionCounts = new HashMap<>(); + private final Map entryStatusCounts = new HashMap<>(); private final Object lock = new Object(); - - synchronized void recordBatchFailure(Throwable throwable) { + private int partialBatchFailures; + + /** + * Records the count of the exception and it's type when complete batch is failed to apply. + * + *

Note: This method aggregates all the subclasses of {@link ApiException} under ApiException + * using the {@link Code status codes} and its number of occurrences. + */ + void recordBatchFailure(Throwable throwable) { Class exceptionClass = throwable.getClass(); synchronized (lock) { if (throwable instanceof ApiException) { - StatusCode.Code code = ((ApiException) throwable).getStatusCode().getCode(); + Code code = ((ApiException) throwable).getStatusCode().getCode(); exceptionClass = ApiException.class; - if (requestStatusCounts.containsKey(code)) { - Integer codeCount = requestStatusCounts.get(code); - requestStatusCounts.put(code, ++codeCount); - - } else { - requestStatusCounts.put(code, 1); - } + int oldCount = MoreObjects.firstNonNull(requestStatusCounts.get(code), 0); + requestStatusCounts.put(code, oldCount + 1); } - if (requestExceptionCounts.containsKey(exceptionClass)) { - Integer exCount = requestExceptionCounts.get(exceptionClass); - requestExceptionCounts.put(exceptionClass, ++exCount); - } else { - requestExceptionCounts.put(exceptionClass, 1); - } + int oldExCount = MoreObjects.firstNonNull(requestExceptionCounts.get(exceptionClass), 0); + requestExceptionCounts.put(exceptionClass, oldExCount + 1); } } - void recordBatchElementsCompletion(List> batchElementResultFutures) { - final AtomicBoolean markBatchFailure = new AtomicBoolean(); + /** + * Records partial failure occurred within per batch. For any exception within a batch, the {@link + * #partialBatchFailures} is incremented once. It also keeps the records of the count and type of + * each entry failure as well. + * + *

Note: This method aggregates all the subclasses of {@link ApiException} under ApiException + * using the {@link Code status codes} and its number of occurrences. + */ + void recordBatchElementsCompletion(List batchElementResultFutures) { + final AtomicBoolean elementResultFailed = new AtomicBoolean(); + for (final ApiFuture elementResult : batchElementResultFutures) { - for (ApiFuture elementResult : batchElementResultFutures) { ApiFutures.addCallback( elementResult, - new ApiFutureCallback() { + new ApiFutureCallback() { + @Override public void onFailure(Throwable throwable) { - if (markBatchFailure.compareAndSet(false, true)) { - partialBatchFailures.incrementAndGet(); - } - Class exceptionClass = throwable.getClass(); - synchronized (lock) { + if (elementResultFailed.compareAndSet(false, true)) { + partialBatchFailures++; + } + + Class exceptionClass = throwable.getClass(); + if (throwable instanceof ApiException) { - StatusCode.Code code = ((ApiException) throwable).getStatusCode().getCode(); + Code code = ((ApiException) throwable).getStatusCode().getCode(); exceptionClass = ApiException.class; - if (entryStatusCounts.containsKey(code)) { - Integer statusCount = entryStatusCounts.get(code); - entryStatusCounts.put(code, ++statusCount); - } else { - entryStatusCounts.put(code, 1); - } + int statusOldCount = MoreObjects.firstNonNull(entryStatusCounts.get(code), 0); + entryStatusCounts.put(code, statusOldCount + 1); } - if (entryExceptionCounts.containsKey(exceptionClass)) { - Integer exCount = entryExceptionCounts.get(exceptionClass); - entryExceptionCounts.put(exceptionClass, ++exCount); - } else { - entryExceptionCounts.put(exceptionClass, 1); - } + int oldCount = + MoreObjects.firstNonNull(entryExceptionCounts.get(exceptionClass), 0); + entryExceptionCounts.put(exceptionClass, oldCount + 1); } } @Override - public void onSuccess(T result) {} + public void onSuccess(Object result) {} }, directExecutor()); } @@ -132,63 +131,69 @@ public void onSuccess(T result) {} @Nullable BatchingException asException() { synchronized (lock) { - int partialFailures = partialBatchFailures.get(); - if (requestExceptionCounts.isEmpty() && partialFailures == 0) { + if (requestExceptionCounts.isEmpty() && partialBatchFailures == 0) { return null; } StringBuilder sb = new StringBuilder(); - int batchFailures = requestExceptionCounts.size(); - - if (requestExceptionCounts.isEmpty()) { - sb.append("Batching finished with "); - } else { - sb.append(String.format("%d batches failed to apply due to: ", batchFailures)); - - for (Class request : requestExceptionCounts.keySet()) { - sb.append( - String.format( - "%d %s ", requestExceptionCounts.get(request), request.getSimpleName())); - if (request.equals(ApiException.class)) { - - sb.append("("); - for (StatusCode.Code statusCode : requestStatusCounts.keySet()) { - sb.append(String.format("%d %s ", requestStatusCounts.get(statusCode), statusCode)); - } - sb.append(") "); - } - } - if (partialFailures > 0) { - sb.append("and "); - } + sb.append("Batching finished with "); + + if (!requestExceptionCounts.isEmpty()) { + sb.append( + String.format("%d batches failed to apply due to: ", requestExceptionCounts.size())) + .append(printKeyValue(requestExceptionCounts, requestStatusCounts)) + .append(" and "); } - sb.append(String.format("%d partial failures.", partialFailures)); - if (partialFailures > 0) { - int totalEntriesFailureCount = 0; + sb.append(String.format("%d partial failures.", partialBatchFailures)); + if (partialBatchFailures > 0) { + int totalEntriesCount = 0; for (Integer count : entryExceptionCounts.values()) { - totalEntriesFailureCount += count; + totalEntriesCount += count; } sb.append( - String.format( - " The %d partial failures contained %d entries that failed with: ", - partialFailures, totalEntriesFailureCount)); - - for (Class entry : entryExceptionCounts.keySet()) { - sb.append( - String.format("%d %s ", entryExceptionCounts.get(entry), entry.getSimpleName())); - if (entry.equals(ApiException.class)) { - sb.append("("); - for (StatusCode.Code code : entryStatusCounts.keySet()) { - sb.append(String.format("%d %s ", entryStatusCounts.get(code), code)); - } - sb.append(") "); + String.format( + " The %d partial failures contained %d entries that failed with: ", + partialBatchFailures, totalEntriesCount)) + .append(printKeyValue(entryExceptionCounts, entryStatusCounts)) + .append("."); + } + return new BatchingException(sb.toString()); + } + } + + /** + * Prints the class name and it's count along with {@link Code status code} and it's counts. + * + *

Example: "1 IllegalStateException, 1 ApiException(1 UNAVAILABLE, 1 ALREADY_EXISTS)". + */ + private String printKeyValue( + Map exceptionCounts, Map statusCounts) { + StringBuilder keyValue = new StringBuilder(); + Iterator> iterator = exceptionCounts.entrySet().iterator(); + + while (iterator.hasNext()) { + Map.Entry request = iterator.next(); + keyValue.append(String.format("%d %s", request.getValue(), request.getKey().getSimpleName())); + + if (ApiException.class.equals(request.getKey())) { + keyValue.append("("); + Iterator> statusItrator = statusCounts.entrySet().iterator(); + while (statusItrator.hasNext()) { + Map.Entry statusCode = statusItrator.next(); + keyValue.append(String.format("%d %s", statusCode.getValue(), statusCode.getKey())); + if (statusItrator.hasNext()) { + keyValue.append(", "); } } - sb.append("."); + keyValue.append(")"); + } + if (iterator.hasNext()) { + keyValue.append(", "); } - return new BatchingException(sb.toString()); } + + return keyValue.toString(); } } diff --git a/gax/src/main/java/com/google/api/gax/batching/BatchingException.java b/gax/src/main/java/com/google/api/gax/batching/BatchingException.java index d3a535add..62cd3b028 100644 --- a/gax/src/main/java/com/google/api/gax/batching/BatchingException.java +++ b/gax/src/main/java/com/google/api/gax/batching/BatchingException.java @@ -30,12 +30,10 @@ package com.google.api.gax.batching; import com.google.api.core.BetaApi; -import com.google.api.core.InternalExtensionOnly; /** Represents exception occurred during batching. */ @BetaApi("The surface for batching is not stable yet and may change in the future.") -@InternalExtensionOnly("For google-cloud-java client use only.") -public class BatchingException extends RuntimeException { +public final class BatchingException extends RuntimeException { BatchingException(String message) { super(message); diff --git a/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java b/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java index 99e15f3f3..f0bbe29ae 100644 --- a/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java +++ b/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java @@ -205,7 +205,6 @@ public ApiFuture> futureCall( underTest.add(6); underTest.add(7); underTest.flush(); - assertThat(failedResult.isDone()).isTrue(); Throwable actualError = null; try { @@ -223,7 +222,8 @@ public ApiFuture> futureCall( assertThat(actualError).isNotNull(); assertThat(actualError).isInstanceOf(BatchingException.class); - assertThat(actualError.getMessage()) + assertThat(actualError) + .hasMessageThat() .contains("1 batches failed to apply due to: 1 RuntimeException"); } @@ -259,7 +259,10 @@ public void splitResponse( actualError = batchingEx; } assertThat(actualError).isInstanceOf(BatchingException.class); - assertThat(actualError.getMessage()).contains("Batching finished with 1 partial failures."); + assertThat(actualError) + .hasMessageThat() + .contains( + "Batching finished with 1 batches failed to apply due to: 1 RuntimeException and 1 partial failures."); } /** Resolves future results when {@link BatchingDescriptor#splitException} throws exception */ @@ -498,10 +501,11 @@ public void splitResponse( actualError = e; } assertThat(actualError).isInstanceOf(BatchingException.class); - assertThat(actualError.getMessage()).contains("Batching finished with 2 partial failures."); - assertThat(actualError.getMessage()) + assertThat(actualError) + .hasMessageThat() .contains( - "The 2 partial failures contained 3 entries that failed with: 3 ArithmeticException ."); + "Batching finished with 2 partial failures. The 2 partial failures contained " + + "3 entries that failed with: 3 ArithmeticException."); } @Test @@ -545,11 +549,12 @@ public void splitResponse( actualError = e; } assertThat(actualError).isInstanceOf(BatchingException.class); - assertThat(actualError.getMessage()) + assertThat(actualError) + .hasMessageThat() .contains("The 3 partial failures contained 6 entries that failed with:"); - assertThat(actualError.getMessage()).contains("1 NullPointerException"); - assertThat(actualError.getMessage()).contains("2 RuntimeException"); - assertThat(actualError.getMessage()).contains("3 ArithmeticException"); + assertThat(actualError).hasMessageThat().contains("1 NullPointerException"); + assertThat(actualError).hasMessageThat().contains("2 RuntimeException"); + assertThat(actualError).hasMessageThat().contains("3 ArithmeticException"); } /** diff --git a/gax/src/test/java/com/google/api/gax/batching/BatcherStatsTest.java b/gax/src/test/java/com/google/api/gax/batching/BatcherStatsTest.java index 78abe78d9..9a4612d2f 100644 --- a/gax/src/test/java/com/google/api/gax/batching/BatcherStatsTest.java +++ b/gax/src/test/java/com/google/api/gax/batching/BatcherStatsTest.java @@ -31,7 +31,7 @@ import static com.google.common.truth.Truth.assertThat; -import com.google.api.core.SettableApiFuture; +import com.google.api.core.ApiFutures; import com.google.api.gax.rpc.ApiExceptionFactory; import com.google.api.gax.rpc.StatusCode; import com.google.api.gax.rpc.testing.FakeStatusCode; @@ -56,39 +56,58 @@ public void testRequestFailuresOnly() { batcherStats.recordBatchFailure( ApiExceptionFactory.createException( new RuntimeException(), FakeStatusCode.of(StatusCode.Code.INVALID_ARGUMENT), false)); + batcherStats.recordBatchFailure(new RuntimeException("Request failed")); BatchingException exception = batcherStats.asException(); assertThat(exception).isNotNull(); - assertThat(exception.getMessage()).contains("2 batches failed to apply"); - assertThat(exception.getMessage()).contains("1 RuntimeException"); - assertThat(exception.getMessage()).contains("1 ApiException (1 INVALID_ARGUMENT ) "); + assertThat(exception).hasMessageThat().contains("2 batches failed to apply"); + assertThat(exception).hasMessageThat().contains("1 RuntimeException"); + assertThat(exception).hasMessageThat().contains("1 ApiException(1 INVALID_ARGUMENT)"); + assertThat(exception).hasMessageThat().contains("and 0 partial failures."); } @Test - public void testRequestAndEntryFailures() { + public void testEntryFailureOnly() { BatcherStats batcherStats = new BatcherStats(); - batcherStats.recordBatchFailure(new RuntimeException("Request failed")); + batcherStats.recordBatchElementsCompletion( + ImmutableList.of( + ApiFutures.immediateFailedFuture(new IllegalStateException("local element failure")))); - SettableApiFuture runTimeFail = SettableApiFuture.create(); - runTimeFail.setException(new IllegalStateException()); - batcherStats.recordBatchElementsCompletion(ImmutableList.of(runTimeFail)); + batcherStats.recordBatchElementsCompletion( + ImmutableList.of( + ApiFutures.immediateFailedFuture( + ApiExceptionFactory.createException( + new RuntimeException(), + FakeStatusCode.of(StatusCode.Code.UNAVAILABLE), + false)))); + BatchingException ex = batcherStats.asException(); + assertThat(ex) + .hasMessageThat() + .contains("The 2 partial failures contained 2 entries that failed with:"); + assertThat(ex).hasMessageThat().contains("1 ApiException(1 UNAVAILABLE)"); + assertThat(ex).hasMessageThat().contains("1 IllegalStateException"); + } - SettableApiFuture apiExceptionFuture = SettableApiFuture.create(); - SettableApiFuture npeFuture = SettableApiFuture.create(); - npeFuture.setException(new NullPointerException()); - apiExceptionFuture.setException( - ApiExceptionFactory.createException( - new RuntimeException(), FakeStatusCode.of(StatusCode.Code.UNAVAILABLE), false)); + @Test + public void testRequestAndEntryFailures() { + BatcherStats batcherStats = new BatcherStats(); - batcherStats.recordBatchElementsCompletion(ImmutableList.of(npeFuture, apiExceptionFuture)); + batcherStats.recordBatchFailure(new RuntimeException("Batch failure")); + batcherStats.recordBatchElementsCompletion( + ImmutableList.of( + ApiFutures.immediateFailedFuture( + ApiExceptionFactory.createException( + new RuntimeException(), + FakeStatusCode.of(StatusCode.Code.ALREADY_EXISTS), + false)))); BatchingException ex = batcherStats.asException(); - assertThat(ex).isNotNull(); - assertThat(ex.getMessage()) - .contains("1 batches failed to apply due to: 1 RuntimeException and 2 partial failures."); - assertThat(ex.getMessage()).contains("1 IllegalStateException"); - assertThat(ex.getMessage()).contains("1 NullPointerException"); - assertThat(ex.getMessage()).contains("1 ApiException (1 UNAVAILABLE )"); + assertThat(ex) + .hasMessageThat() + .contains( + "Batching finished with 1 batches failed to apply due to: 1 RuntimeException and 1 " + + "partial failures. The 1 partial failures contained 1 entries that failed with:" + + " 1 ApiException(1 ALREADY_EXISTS)."); } } From de6a96fe4f4e6cabbb6a96ce29ce9c52355b2860 Mon Sep 17 00:00:00 2001 From: Rahul Kesharwani Date: Thu, 24 Oct 2019 21:48:11 +0530 Subject: [PATCH 08/11] Address more feedback comments - Refactored record methods in BatcherImpl.Batch. - Removed callback and implemented try/catch for entry failures. - Made all three default method synchronized and removed lock as all the content of method needed to be inside lock. - Fixed test case and added a todo for followUp PR. --- .../google/api/gax/batching/BatcherImpl.java | 4 +- .../google/api/gax/batching/BatcherStats.java | 146 ++++++++---------- .../api/gax/batching/BatcherImplTest.java | 11 +- 3 files changed, 73 insertions(+), 88 deletions(-) diff --git a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java index 16710bc14..64a9a1404 100644 --- a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java +++ b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java @@ -259,8 +259,8 @@ void add(ElementT element, SettableApiFuture result) { // TODO: Update exception in splitResponse(), So that it does not marks complete batch failed. void onBatchSuccess(ResponseT response) { try { - batcherStats.recordBatchElementsCompletion(results); descriptor.splitResponse(response, results); + batcherStats.recordBatchElementsCompletion(results); } catch (Exception ex) { onBatchFailure(ex); } @@ -268,13 +268,13 @@ void onBatchSuccess(ResponseT response) { void onBatchFailure(Throwable throwable) { try { - batcherStats.recordBatchFailure(throwable); descriptor.splitException(throwable, results); } catch (Exception ex) { for (SettableApiFuture result : results) { result.setException(ex); } } + batcherStats.recordBatchFailure(throwable); } boolean isEmpty() { diff --git a/gax/src/main/java/com/google/api/gax/batching/BatcherStats.java b/gax/src/main/java/com/google/api/gax/batching/BatcherStats.java index 636d502ea..7ccc3818d 100644 --- a/gax/src/main/java/com/google/api/gax/batching/BatcherStats.java +++ b/gax/src/main/java/com/google/api/gax/batching/BatcherStats.java @@ -29,11 +29,7 @@ */ package com.google.api.gax.batching; -import static com.google.common.util.concurrent.MoreExecutors.directExecutor; - import com.google.api.core.ApiFuture; -import com.google.api.core.ApiFutureCallback; -import com.google.api.core.ApiFutures; import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.StatusCode.Code; import com.google.common.base.MoreObjects; @@ -41,7 +37,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; /** @@ -53,114 +48,101 @@ class BatcherStats { private final Map requestExceptionCounts = new HashMap<>(); private final Map requestStatusCounts = new HashMap<>(); + private int requestPartialFailureCount; + private final Map entryExceptionCounts = new HashMap<>(); private final Map entryStatusCounts = new HashMap<>(); - private final Object lock = new Object(); - private int partialBatchFailures; /** - * Records the count of the exception and it's type when complete batch is failed to apply. + * Records the count of the exception and it's type when a complete batch failed to apply. * *

Note: This method aggregates all the subclasses of {@link ApiException} under ApiException * using the {@link Code status codes} and its number of occurrences. */ - void recordBatchFailure(Throwable throwable) { + synchronized void recordBatchFailure(Throwable throwable) { Class exceptionClass = throwable.getClass(); - synchronized (lock) { - if (throwable instanceof ApiException) { - Code code = ((ApiException) throwable).getStatusCode().getCode(); - exceptionClass = ApiException.class; + if (throwable instanceof ApiException) { + Code code = ((ApiException) throwable).getStatusCode().getCode(); + exceptionClass = ApiException.class; - int oldCount = MoreObjects.firstNonNull(requestStatusCounts.get(code), 0); - requestStatusCounts.put(code, oldCount + 1); - } - - int oldExCount = MoreObjects.firstNonNull(requestExceptionCounts.get(exceptionClass), 0); - requestExceptionCounts.put(exceptionClass, oldExCount + 1); + int oldStatusCount = MoreObjects.firstNonNull(requestStatusCounts.get(code), 0); + requestStatusCounts.put(code, oldStatusCount + 1); } + + int oldExceptionCount = MoreObjects.firstNonNull(requestExceptionCounts.get(exceptionClass), 0); + requestExceptionCounts.put(exceptionClass, oldExceptionCount + 1); } /** - * Records partial failure occurred within per batch. For any exception within a batch, the {@link - * #partialBatchFailures} is incremented once. It also keeps the records of the count and type of - * each entry failure as well. + * Records partial failures within each batch. partialBatchFailures counts the number of batches + * that have at least one failed entry while entryStatusCounts and entryExceptionCounts track the + * count of the entries themselves. * *

Note: This method aggregates all the subclasses of {@link ApiException} under ApiException * using the {@link Code status codes} and its number of occurrences. */ - void recordBatchElementsCompletion(List batchElementResultFutures) { - final AtomicBoolean elementResultFailed = new AtomicBoolean(); + synchronized void recordBatchElementsCompletion( + List batchElementResultFutures) { + boolean isRequestPartiallyFailed = false; for (final ApiFuture elementResult : batchElementResultFutures) { + try { + elementResult.get(); + } catch (Throwable throwable) { + + if (!isRequestPartiallyFailed) { + isRequestPartiallyFailed = true; + requestPartialFailureCount++; + } + Throwable actualCause = throwable.getCause(); + Class exceptionClass = actualCause.getClass(); + + if (actualCause instanceof ApiException) { + Code code = ((ApiException) actualCause).getStatusCode().getCode(); + exceptionClass = ApiException.class; + + int oldExceptionCount = MoreObjects.firstNonNull(entryStatusCounts.get(code), 0); + entryStatusCounts.put(code, oldExceptionCount + 1); + } - ApiFutures.addCallback( - elementResult, - new ApiFutureCallback() { - - @Override - public void onFailure(Throwable throwable) { - synchronized (lock) { - if (elementResultFailed.compareAndSet(false, true)) { - partialBatchFailures++; - } - - Class exceptionClass = throwable.getClass(); - - if (throwable instanceof ApiException) { - Code code = ((ApiException) throwable).getStatusCode().getCode(); - exceptionClass = ApiException.class; - - int statusOldCount = MoreObjects.firstNonNull(entryStatusCounts.get(code), 0); - entryStatusCounts.put(code, statusOldCount + 1); - } - - int oldCount = - MoreObjects.firstNonNull(entryExceptionCounts.get(exceptionClass), 0); - entryExceptionCounts.put(exceptionClass, oldCount + 1); - } - } - - @Override - public void onSuccess(Object result) {} - }, - directExecutor()); + int oldExceptionCount = + MoreObjects.firstNonNull(entryExceptionCounts.get(exceptionClass), 0); + entryExceptionCounts.put(exceptionClass, oldExceptionCount + 1); + } } } /** Calculates and formats the message with request and entry failure count. */ @Nullable - BatchingException asException() { - synchronized (lock) { - if (requestExceptionCounts.isEmpty() && partialBatchFailures == 0) { - return null; - } + synchronized BatchingException asException() { + if (requestExceptionCounts.isEmpty() && requestPartialFailureCount == 0) { + return null; + } - StringBuilder sb = new StringBuilder(); - sb.append("Batching finished with "); + StringBuilder sb = new StringBuilder(); + sb.append("Batching finished with "); - if (!requestExceptionCounts.isEmpty()) { - sb.append( - String.format("%d batches failed to apply due to: ", requestExceptionCounts.size())) - .append(printKeyValue(requestExceptionCounts, requestStatusCounts)) - .append(" and "); - } - - sb.append(String.format("%d partial failures.", partialBatchFailures)); - if (partialBatchFailures > 0) { - int totalEntriesCount = 0; - for (Integer count : entryExceptionCounts.values()) { - totalEntriesCount += count; - } + if (!requestExceptionCounts.isEmpty()) { + sb.append(String.format("%d batches failed to apply due to: ", requestExceptionCounts.size())) + .append(printKeyValue(requestExceptionCounts, requestStatusCounts)) + .append(" and "); + } - sb.append( - String.format( - " The %d partial failures contained %d entries that failed with: ", - partialBatchFailures, totalEntriesCount)) - .append(printKeyValue(entryExceptionCounts, entryStatusCounts)) - .append("."); + sb.append(String.format("%d partial failures.", requestPartialFailureCount)); + if (requestPartialFailureCount > 0) { + int totalEntriesCount = 0; + for (Integer count : entryExceptionCounts.values()) { + totalEntriesCount += count; } - return new BatchingException(sb.toString()); + + sb.append( + String.format( + " The %d partial failures contained %d entries that failed with: ", + requestPartialFailureCount, totalEntriesCount)) + .append(printKeyValue(entryExceptionCounts, entryStatusCounts)) + .append("."); } + return new BatchingException(sb.toString()); } /** diff --git a/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java b/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java index f0bbe29ae..7f5787119 100644 --- a/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java +++ b/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java @@ -262,7 +262,8 @@ public void splitResponse( assertThat(actualError) .hasMessageThat() .contains( - "Batching finished with 1 batches failed to apply due to: 1 RuntimeException and 1 partial failures."); + "Batching finished with 1 batches failed to apply due to: 1 RuntimeException and 0 " + + "partial failures."); } /** Resolves future results when {@link BatchingDescriptor#splitException} throws exception */ @@ -508,6 +509,7 @@ public void splitResponse( + "3 entries that failed with: 3 ArithmeticException."); } + // TODO(rahulkql): fix this test with follow up PR related to exception in splitResponse. @Test public void testPartialFailureInResultProcessing() throws Exception { final Queue queue = Queues.newArrayBlockingQueue(3); @@ -551,10 +553,11 @@ public void splitResponse( assertThat(actualError).isInstanceOf(BatchingException.class); assertThat(actualError) .hasMessageThat() - .contains("The 3 partial failures contained 6 entries that failed with:"); + .contains("Batching finished with 3 batches failed to apply due to:"); assertThat(actualError).hasMessageThat().contains("1 NullPointerException"); - assertThat(actualError).hasMessageThat().contains("2 RuntimeException"); - assertThat(actualError).hasMessageThat().contains("3 ArithmeticException"); + assertThat(actualError).hasMessageThat().contains("1 RuntimeException"); + assertThat(actualError).hasMessageThat().contains("1 ArithmeticException"); + assertThat(actualError).hasMessageThat().contains(" and 0 partial failures."); } /** From cdbf5c19330139ad7b01b2665b0f5c79bd877c3f Mon Sep 17 00:00:00 2001 From: Rahul Kesharwani Date: Tue, 29 Oct 2019 10:15:10 +0530 Subject: [PATCH 09/11] Updated TODO for follow up PR --- gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java index 64a9a1404..44d8a98d7 100644 --- a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java +++ b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java @@ -256,7 +256,8 @@ void add(ElementT element, SettableApiFuture result) { byteCounter += descriptor.countBytes(element); } - // TODO: Update exception in splitResponse(), So that it does not marks complete batch failed. + // TODO: Ensure that all results are resolved in case the descriptor that causes it to + // process all results or throw an exception during processing void onBatchSuccess(ResponseT response) { try { descriptor.splitResponse(response, results); From cbc0acf9fb036cdb8eaa73b7168339a3aaa59c13 Mon Sep 17 00:00:00 2001 From: Rahul Kesharwani Date: Tue, 29 Oct 2019 20:55:30 +0530 Subject: [PATCH 10/11] fixed the name of oldStatusCount --- .../main/java/com/google/api/gax/batching/BatcherStats.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gax/src/main/java/com/google/api/gax/batching/BatcherStats.java b/gax/src/main/java/com/google/api/gax/batching/BatcherStats.java index 7ccc3818d..7370d8e9d 100644 --- a/gax/src/main/java/com/google/api/gax/batching/BatcherStats.java +++ b/gax/src/main/java/com/google/api/gax/batching/BatcherStats.java @@ -101,8 +101,8 @@ synchronized void recordBatchElementsCompletion( Code code = ((ApiException) actualCause).getStatusCode().getCode(); exceptionClass = ApiException.class; - int oldExceptionCount = MoreObjects.firstNonNull(entryStatusCounts.get(code), 0); - entryStatusCounts.put(code, oldExceptionCount + 1); + int oldStatusCount = MoreObjects.firstNonNull(entryStatusCounts.get(code), 0); + entryStatusCounts.put(code, oldStatusCount + 1); } int oldExceptionCount = From 60705f7b742884436c1a02c4ef6937e345e61bb5 Mon Sep 17 00:00:00 2001 From: Rahul Kesharwani Date: Tue, 29 Oct 2019 23:39:22 +0530 Subject: [PATCH 11/11] Fixed variables names to be descriptive --- .../google/api/gax/batching/BatcherStats.java | 54 ++++++++++--------- 1 file changed, 29 insertions(+), 25 deletions(-) diff --git a/gax/src/main/java/com/google/api/gax/batching/BatcherStats.java b/gax/src/main/java/com/google/api/gax/batching/BatcherStats.java index 7370d8e9d..7765f9821 100644 --- a/gax/src/main/java/com/google/api/gax/batching/BatcherStats.java +++ b/gax/src/main/java/com/google/api/gax/batching/BatcherStats.java @@ -119,30 +119,33 @@ synchronized BatchingException asException() { return null; } - StringBuilder sb = new StringBuilder(); - sb.append("Batching finished with "); + StringBuilder messageBuilder = new StringBuilder(); + messageBuilder.append("Batching finished with "); if (!requestExceptionCounts.isEmpty()) { - sb.append(String.format("%d batches failed to apply due to: ", requestExceptionCounts.size())) - .append(printKeyValue(requestExceptionCounts, requestStatusCounts)) + messageBuilder + .append( + String.format("%d batches failed to apply due to: ", requestExceptionCounts.size())) + .append(buildExceptionList(requestExceptionCounts, requestStatusCounts)) .append(" and "); } - sb.append(String.format("%d partial failures.", requestPartialFailureCount)); + messageBuilder.append(String.format("%d partial failures.", requestPartialFailureCount)); if (requestPartialFailureCount > 0) { int totalEntriesCount = 0; for (Integer count : entryExceptionCounts.values()) { totalEntriesCount += count; } - sb.append( + messageBuilder + .append( String.format( " The %d partial failures contained %d entries that failed with: ", requestPartialFailureCount, totalEntriesCount)) - .append(printKeyValue(entryExceptionCounts, entryStatusCounts)) + .append(buildExceptionList(entryExceptionCounts, entryStatusCounts)) .append("."); } - return new BatchingException(sb.toString()); + return new BatchingException(messageBuilder.toString()); } /** @@ -150,32 +153,33 @@ synchronized BatchingException asException() { * *

Example: "1 IllegalStateException, 1 ApiException(1 UNAVAILABLE, 1 ALREADY_EXISTS)". */ - private String printKeyValue( + private String buildExceptionList( Map exceptionCounts, Map statusCounts) { - StringBuilder keyValue = new StringBuilder(); - Iterator> iterator = exceptionCounts.entrySet().iterator(); + StringBuilder messageBuilder = new StringBuilder(); + Iterator> exceptionIterator = exceptionCounts.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry request = iterator.next(); - keyValue.append(String.format("%d %s", request.getValue(), request.getKey().getSimpleName())); + while (exceptionIterator.hasNext()) { + Map.Entry request = exceptionIterator.next(); + messageBuilder.append( + String.format("%d %s", request.getValue(), request.getKey().getSimpleName())); if (ApiException.class.equals(request.getKey())) { - keyValue.append("("); - Iterator> statusItrator = statusCounts.entrySet().iterator(); - while (statusItrator.hasNext()) { - Map.Entry statusCode = statusItrator.next(); - keyValue.append(String.format("%d %s", statusCode.getValue(), statusCode.getKey())); - if (statusItrator.hasNext()) { - keyValue.append(", "); + messageBuilder.append("("); + Iterator> statusIterator = statusCounts.entrySet().iterator(); + while (statusIterator.hasNext()) { + Map.Entry statusCode = statusIterator.next(); + messageBuilder.append(String.format("%d %s", statusCode.getValue(), statusCode.getKey())); + if (statusIterator.hasNext()) { + messageBuilder.append(", "); } } - keyValue.append(")"); + messageBuilder.append(")"); } - if (iterator.hasNext()) { - keyValue.append(", "); + if (exceptionIterator.hasNext()) { + messageBuilder.append(", "); } } - return keyValue.toString(); + return messageBuilder.toString(); } }