diff --git a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java index 5ca1b0139..44d8a98d7 100644 --- a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java +++ b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java @@ -85,6 +85,7 @@ public class BatcherImpl private final Object elementLock = new Object(); private final Future scheduledFuture; private volatile boolean isClosed = false; + private final BatcherStats batcherStats = new BatcherStats(); /** * @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements @@ -107,7 +108,7 @@ public BatcherImpl( this.batchingSettings = Preconditions.checkNotNull(batchingSettings, "batching setting cannot be null"); Preconditions.checkNotNull(executor, "executor cannot be null"); - currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings); + currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats); if (batchingSettings.getDelayThreshold() != null) { long delay = batchingSettings.getDelayThreshold().toMillis(); @@ -154,7 +155,7 @@ public void sendOutstanding() { return; } accumulatedBatch = currentOpenBatch; - currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings); + currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats); } final ApiFuture batchResponse = @@ -212,6 +213,10 @@ public void close() throws InterruptedException { isClosed = true; currentBatcherReference.closed = true; currentBatcherReference.clear(); + BatchingException exception = batcherStats.asException(); + if (exception != null) { + throw exception; + } } /** @@ -222,6 +227,7 @@ private static class Batch { private final BatchingRequestBuilder builder; private final List> results; private final BatchingDescriptor descriptor; + private final BatcherStats batcherStats; private final long elementThreshold; private final long bytesThreshold; @@ -231,7 +237,8 @@ private static class Batch { private Batch( RequestT prototype, BatchingDescriptor descriptor, - BatchingSettings batchingSettings) { + BatchingSettings batchingSettings, + BatcherStats batcherStats) { this.descriptor = descriptor; this.builder = descriptor.newRequestBuilder(prototype); this.results = new ArrayList<>(); @@ -239,6 +246,7 @@ private Batch( this.elementThreshold = elementCountThreshold == null ? 0 : elementCountThreshold; Long requestByteThreshold = batchingSettings.getRequestByteThreshold(); this.bytesThreshold = requestByteThreshold == null ? 0 : requestByteThreshold; + this.batcherStats = batcherStats; } void add(ElementT element, SettableApiFuture result) { @@ -248,9 +256,12 @@ void add(ElementT element, SettableApiFuture result) { byteCounter += descriptor.countBytes(element); } + // TODO: Ensure that all results are resolved in case the descriptor that causes it to + // process all results or throw an exception during processing void onBatchSuccess(ResponseT response) { try { descriptor.splitResponse(response, results); + batcherStats.recordBatchElementsCompletion(results); } catch (Exception ex) { onBatchFailure(ex); } @@ -264,6 +275,7 @@ void onBatchFailure(Throwable throwable) { result.setException(ex); } } + batcherStats.recordBatchFailure(throwable); } boolean isEmpty() { diff --git a/gax/src/main/java/com/google/api/gax/batching/BatcherStats.java b/gax/src/main/java/com/google/api/gax/batching/BatcherStats.java new file mode 100644 index 000000000..7765f9821 --- /dev/null +++ b/gax/src/main/java/com/google/api/gax/batching/BatcherStats.java @@ -0,0 +1,185 @@ +/* + * Copyright 2019 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.batching; + +import com.google.api.core.ApiFuture; +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.StatusCode.Code; +import com.google.common.base.MoreObjects; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; + +/** + * Keeps the statistics about failed operations(both at RPC and ElementT) in {@link Batcher}. This + * provides the count of individual exception failure and count of each failed {@link Code} occurred + * in the batching process. + */ +class BatcherStats { + + private final Map requestExceptionCounts = new HashMap<>(); + private final Map requestStatusCounts = new HashMap<>(); + private int requestPartialFailureCount; + + private final Map entryExceptionCounts = new HashMap<>(); + private final Map entryStatusCounts = new HashMap<>(); + + /** + * Records the count of the exception and it's type when a complete batch failed to apply. + * + *

Note: This method aggregates all the subclasses of {@link ApiException} under ApiException + * using the {@link Code status codes} and its number of occurrences. + */ + synchronized void recordBatchFailure(Throwable throwable) { + Class exceptionClass = throwable.getClass(); + + if (throwable instanceof ApiException) { + Code code = ((ApiException) throwable).getStatusCode().getCode(); + exceptionClass = ApiException.class; + + int oldStatusCount = MoreObjects.firstNonNull(requestStatusCounts.get(code), 0); + requestStatusCounts.put(code, oldStatusCount + 1); + } + + int oldExceptionCount = MoreObjects.firstNonNull(requestExceptionCounts.get(exceptionClass), 0); + requestExceptionCounts.put(exceptionClass, oldExceptionCount + 1); + } + + /** + * Records partial failures within each batch. partialBatchFailures counts the number of batches + * that have at least one failed entry while entryStatusCounts and entryExceptionCounts track the + * count of the entries themselves. + * + *

Note: This method aggregates all the subclasses of {@link ApiException} under ApiException + * using the {@link Code status codes} and its number of occurrences. + */ + synchronized void recordBatchElementsCompletion( + List batchElementResultFutures) { + boolean isRequestPartiallyFailed = false; + for (final ApiFuture elementResult : batchElementResultFutures) { + try { + elementResult.get(); + } catch (Throwable throwable) { + + if (!isRequestPartiallyFailed) { + isRequestPartiallyFailed = true; + requestPartialFailureCount++; + } + Throwable actualCause = throwable.getCause(); + Class exceptionClass = actualCause.getClass(); + + if (actualCause instanceof ApiException) { + Code code = ((ApiException) actualCause).getStatusCode().getCode(); + exceptionClass = ApiException.class; + + int oldStatusCount = MoreObjects.firstNonNull(entryStatusCounts.get(code), 0); + entryStatusCounts.put(code, oldStatusCount + 1); + } + + int oldExceptionCount = + MoreObjects.firstNonNull(entryExceptionCounts.get(exceptionClass), 0); + entryExceptionCounts.put(exceptionClass, oldExceptionCount + 1); + } + } + } + + /** Calculates and formats the message with request and entry failure count. */ + @Nullable + synchronized BatchingException asException() { + if (requestExceptionCounts.isEmpty() && requestPartialFailureCount == 0) { + return null; + } + + StringBuilder messageBuilder = new StringBuilder(); + messageBuilder.append("Batching finished with "); + + if (!requestExceptionCounts.isEmpty()) { + messageBuilder + .append( + String.format("%d batches failed to apply due to: ", requestExceptionCounts.size())) + .append(buildExceptionList(requestExceptionCounts, requestStatusCounts)) + .append(" and "); + } + + messageBuilder.append(String.format("%d partial failures.", requestPartialFailureCount)); + if (requestPartialFailureCount > 0) { + int totalEntriesCount = 0; + for (Integer count : entryExceptionCounts.values()) { + totalEntriesCount += count; + } + + messageBuilder + .append( + String.format( + " The %d partial failures contained %d entries that failed with: ", + requestPartialFailureCount, totalEntriesCount)) + .append(buildExceptionList(entryExceptionCounts, entryStatusCounts)) + .append("."); + } + return new BatchingException(messageBuilder.toString()); + } + + /** + * Prints the class name and it's count along with {@link Code status code} and it's counts. + * + *

Example: "1 IllegalStateException, 1 ApiException(1 UNAVAILABLE, 1 ALREADY_EXISTS)". + */ + private String buildExceptionList( + Map exceptionCounts, Map statusCounts) { + StringBuilder messageBuilder = new StringBuilder(); + Iterator> exceptionIterator = exceptionCounts.entrySet().iterator(); + + while (exceptionIterator.hasNext()) { + Map.Entry request = exceptionIterator.next(); + messageBuilder.append( + String.format("%d %s", request.getValue(), request.getKey().getSimpleName())); + + if (ApiException.class.equals(request.getKey())) { + messageBuilder.append("("); + Iterator> statusIterator = statusCounts.entrySet().iterator(); + while (statusIterator.hasNext()) { + Map.Entry statusCode = statusIterator.next(); + messageBuilder.append(String.format("%d %s", statusCode.getValue(), statusCode.getKey())); + if (statusIterator.hasNext()) { + messageBuilder.append(", "); + } + } + messageBuilder.append(")"); + } + if (exceptionIterator.hasNext()) { + messageBuilder.append(", "); + } + } + + return messageBuilder.toString(); + } +} diff --git a/gax/src/main/java/com/google/api/gax/batching/BatchingException.java b/gax/src/main/java/com/google/api/gax/batching/BatchingException.java new file mode 100644 index 000000000..62cd3b028 --- /dev/null +++ b/gax/src/main/java/com/google/api/gax/batching/BatchingException.java @@ -0,0 +1,41 @@ +/* + * Copyright 2019 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.batching; + +import com.google.api.core.BetaApi; + +/** Represents exception occurred during batching. */ +@BetaApi("The surface for batching is not stable yet and may change in the future.") +public final class BatchingException extends RuntimeException { + + BatchingException(String message) { + super(message); + } +} diff --git a/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java b/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java index 7a8ee2c1a..7f5787119 100644 --- a/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java +++ b/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java @@ -43,8 +43,10 @@ import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntList; import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntSquarerCallable; import com.google.api.gax.rpc.testing.FakeBatchableApi.SquarerBatchingDescriptorV2; +import com.google.common.collect.Queues; import java.util.ArrayList; import java.util.List; +import java.util.Queue; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -200,6 +202,8 @@ public ApiFuture> futureCall( new BatcherImpl<>( SQUARER_BATCHING_DESC_V2, unaryCallable, labeledIntList, batchingSettings, EXECUTOR); Future failedResult = underTest.add(5); + underTest.add(6); + underTest.add(7); underTest.flush(); assertThat(failedResult.isDone()).isTrue(); Throwable actualError = null; @@ -208,8 +212,19 @@ public ApiFuture> futureCall( } catch (InterruptedException | ExecutionException ex) { actualError = ex; } - assertThat(actualError).hasCauseThat().isSameInstanceAs(fakeError); + + try { + underTest.close(); + } catch (RuntimeException e) { + actualError = e; + } + + assertThat(actualError).isNotNull(); + assertThat(actualError).isInstanceOf(BatchingException.class); + assertThat(actualError) + .hasMessageThat() + .contains("1 batches failed to apply due to: 1 RuntimeException"); } /** Resolves future results when {@link BatchingDescriptor#splitResponse} throws exception. */ @@ -238,6 +253,17 @@ public void splitResponse( } assertThat(actualError).hasCauseThat().isSameInstanceAs(fakeError); + try { + underTest.close(); + } catch (Exception batchingEx) { + actualError = batchingEx; + } + assertThat(actualError).isInstanceOf(BatchingException.class); + assertThat(actualError) + .hasMessageThat() + .contains( + "Batching finished with 1 batches failed to apply due to: 1 RuntimeException and 0 " + + "partial failures."); } /** Resolves future results when {@link BatchingDescriptor#splitException} throws exception */ @@ -271,6 +297,12 @@ public void splitException(Throwable throwable, List> } assertThat(actualError).hasCauseThat().isSameInstanceAs(fakeError); + try { + underTest.close(); + } catch (Exception ex) { + actualError = ex; + } + assertThat(actualError).isInstanceOf(BatchingException.class); } @Test @@ -430,6 +462,104 @@ public ApiFuture> futureCall( underTest.flush(); } + /** To confirm the partial failures in Batching does not mark whole batch failed */ + @Test + public void testPartialFailureWithSplitResponse() throws Exception { + SquarerBatchingDescriptorV2 descriptor = + new SquarerBatchingDescriptorV2() { + @Override + public void splitResponse( + List batchResponse, List> batch) { + for (int i = 0; i < batchResponse.size(); i++) { + if (batchResponse.get(i) > 10_000) { + batch.get(i).setException(new ArithmeticException()); + } else { + batch.get(i).set(batchResponse.get(i)); + } + } + } + }; + + underTest = + new BatcherImpl<>( + descriptor, callLabeledIntSquarer, labeledIntList, batchingSettings, EXECUTOR); + underTest.add(10); + // This will cause partial failure + underTest.add(200); + underTest.flush(); + + underTest.add(40); + underTest.add(50); + underTest.flush(); + + // These will cause partial failure + underTest.add(500); + underTest.add(600); + Exception actualError = null; + try { + underTest.close(); + } catch (Exception e) { + actualError = e; + } + assertThat(actualError).isInstanceOf(BatchingException.class); + assertThat(actualError) + .hasMessageThat() + .contains( + "Batching finished with 2 partial failures. The 2 partial failures contained " + + "3 entries that failed with: 3 ArithmeticException."); + } + + // TODO(rahulkql): fix this test with follow up PR related to exception in splitResponse. + @Test + public void testPartialFailureInResultProcessing() throws Exception { + final Queue queue = Queues.newArrayBlockingQueue(3); + queue.add(new NullPointerException()); + queue.add(new RuntimeException()); + queue.add(new ArithmeticException()); + + SquarerBatchingDescriptorV2 descriptor = + new SquarerBatchingDescriptorV2() { + + @Override + public void splitResponse( + List batchResponse, List> batch) { + throw queue.poll(); + } + }; + + underTest = + new BatcherImpl<>( + descriptor, callLabeledIntSquarer, labeledIntList, batchingSettings, EXECUTOR); + // This batch should fail with NullPointerException + underTest.add(10); + underTest.flush(); + + // This batch should fail with RuntimeException + underTest.add(20); + underTest.add(30); + underTest.flush(); + + // This batch should fail with ArithmeticException + underTest.add(40); + underTest.add(50); + underTest.add(60); + + Exception actualError = null; + try { + underTest.close(); + } catch (Exception e) { + actualError = e; + } + assertThat(actualError).isInstanceOf(BatchingException.class); + assertThat(actualError) + .hasMessageThat() + .contains("Batching finished with 3 batches failed to apply due to:"); + assertThat(actualError).hasMessageThat().contains("1 NullPointerException"); + assertThat(actualError).hasMessageThat().contains("1 RuntimeException"); + assertThat(actualError).hasMessageThat().contains("1 ArithmeticException"); + assertThat(actualError).hasMessageThat().contains(" and 0 partial failures."); + } + /** * Validates the presence of warning in case {@link BatcherImpl} is garbage collected without * being closed first. diff --git a/gax/src/test/java/com/google/api/gax/batching/BatcherStatsTest.java b/gax/src/test/java/com/google/api/gax/batching/BatcherStatsTest.java new file mode 100644 index 000000000..9a4612d2f --- /dev/null +++ b/gax/src/test/java/com/google/api/gax/batching/BatcherStatsTest.java @@ -0,0 +1,113 @@ +/* + * Copyright 2019 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.batching; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.core.ApiFutures; +import com.google.api.gax.rpc.ApiExceptionFactory; +import com.google.api.gax.rpc.StatusCode; +import com.google.api.gax.rpc.testing.FakeStatusCode; +import com.google.common.collect.ImmutableList; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class BatcherStatsTest { + + @Test + public void testWhenNoException() { + BatcherStats batcherStats = new BatcherStats(); + assertThat(batcherStats.asException()).isNull(); + } + + @Test + public void testRequestFailuresOnly() { + BatcherStats batcherStats = new BatcherStats(); + + batcherStats.recordBatchFailure( + ApiExceptionFactory.createException( + new RuntimeException(), FakeStatusCode.of(StatusCode.Code.INVALID_ARGUMENT), false)); + + batcherStats.recordBatchFailure(new RuntimeException("Request failed")); + + BatchingException exception = batcherStats.asException(); + assertThat(exception).isNotNull(); + assertThat(exception).hasMessageThat().contains("2 batches failed to apply"); + assertThat(exception).hasMessageThat().contains("1 RuntimeException"); + assertThat(exception).hasMessageThat().contains("1 ApiException(1 INVALID_ARGUMENT)"); + assertThat(exception).hasMessageThat().contains("and 0 partial failures."); + } + + @Test + public void testEntryFailureOnly() { + BatcherStats batcherStats = new BatcherStats(); + batcherStats.recordBatchElementsCompletion( + ImmutableList.of( + ApiFutures.immediateFailedFuture(new IllegalStateException("local element failure")))); + + batcherStats.recordBatchElementsCompletion( + ImmutableList.of( + ApiFutures.immediateFailedFuture( + ApiExceptionFactory.createException( + new RuntimeException(), + FakeStatusCode.of(StatusCode.Code.UNAVAILABLE), + false)))); + BatchingException ex = batcherStats.asException(); + assertThat(ex) + .hasMessageThat() + .contains("The 2 partial failures contained 2 entries that failed with:"); + assertThat(ex).hasMessageThat().contains("1 ApiException(1 UNAVAILABLE)"); + assertThat(ex).hasMessageThat().contains("1 IllegalStateException"); + } + + @Test + public void testRequestAndEntryFailures() { + BatcherStats batcherStats = new BatcherStats(); + + batcherStats.recordBatchFailure(new RuntimeException("Batch failure")); + batcherStats.recordBatchElementsCompletion( + ImmutableList.of( + ApiFutures.immediateFailedFuture( + ApiExceptionFactory.createException( + new RuntimeException(), + FakeStatusCode.of(StatusCode.Code.ALREADY_EXISTS), + false)))); + + BatchingException ex = batcherStats.asException(); + assertThat(ex) + .hasMessageThat() + .contains( + "Batching finished with 1 batches failed to apply due to: 1 RuntimeException and 1 " + + "partial failures. The 1 partial failures contained 1 entries that failed with:" + + " 1 ApiException(1 ALREADY_EXISTS)."); + } +}