Track number of failure in batching when Batcher#close is called #800
Changes from 6 commits
37063ef
e30e82a
45960f1
ecfe7f0
aa5ec0b
1c9fe27
e23f6b7
de6a96f
cdbf5c1
cbc0acf
60705f7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,194 @@ | ||
/* | ||
* Copyright 2019 Google LLC | ||
* | ||
* Redistribution and use in source and binary forms, with or without | ||
* modification, are permitted provided that the following conditions are | ||
* met: | ||
* | ||
* * Redistributions of source code must retain the above copyright | ||
* notice, this list of conditions and the following disclaimer. | ||
* * Redistributions in binary form must reproduce the above | ||
* copyright notice, this list of conditions and the following disclaimer | ||
* in the documentation and/or other materials provided with the | ||
* distribution. | ||
* * Neither the name of Google LLC nor the names of its | ||
* contributors may be used to endorse or promote products derived from | ||
* this software without specific prior written permission. | ||
* | ||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS | ||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT | ||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR | ||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT | ||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, | ||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT | ||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, | ||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY | ||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT | ||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE | ||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | ||
*/ | ||
package com.google.api.gax.batching; | ||
|
||
import static com.google.common.util.concurrent.MoreExecutors.directExecutor; | ||
|
||
import com.google.api.core.ApiFuture; | ||
import com.google.api.core.ApiFutureCallback; | ||
import com.google.api.core.ApiFutures; | ||
import com.google.api.core.SettableApiFuture; | ||
import com.google.api.gax.rpc.ApiException; | ||
import com.google.api.gax.rpc.StatusCode; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import javax.annotation.Nullable; | ||
|
||
/** | ||
* This class keeps the statistics about failed operations(both at RPC and ElementT level) in {@link | ||
* Batcher}. This provides the count of individual exception failure and count of each failed {@link | ||
* StatusCode.Code} occurred in the batching process. | ||
*/ | ||
class BatcherStats { | ||
|
||
private final Map<Class, Integer> requestExceptionCounts = new ConcurrentHashMap<>(); | ||
private final Map<StatusCode.Code, Integer> requestStatusCounts = new ConcurrentHashMap<>(); | ||
private final AtomicInteger partialBatchFailures = new AtomicInteger(0); | ||
private final Map<Class, Integer> entryExceptionCounts = new ConcurrentHashMap<>(); | ||
private final Map<StatusCode.Code, Integer> entryStatusCounts = new ConcurrentHashMap<>(); | ||
rahulKQL marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
private final Object lock = new Object(); | ||
igorbernstein2 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
synchronized void recordBatchFailure(Throwable throwable) { | ||
rahulKQL marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Class exceptionClass = throwable.getClass(); | ||
|
||
synchronized (lock) { | ||
if (throwable instanceof ApiException) { | ||
StatusCode.Code code = ((ApiException) throwable).getStatusCode().getCode(); | ||
exceptionClass = ApiException.class; | ||
|
||
if (requestStatusCounts.containsKey(code)) { | ||
Integer codeCount = requestStatusCounts.get(code); | ||
requestStatusCounts.put(code, ++codeCount); | ||
|
||
} else { | ||
requestStatusCounts.put(code, 1); | ||
} | ||
rahulKQL marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
if (requestExceptionCounts.containsKey(exceptionClass)) { | ||
Integer exCount = requestExceptionCounts.get(exceptionClass); | ||
requestExceptionCounts.put(exceptionClass, ++exCount); | ||
} else { | ||
requestExceptionCounts.put(exceptionClass, 1); | ||
} | ||
rahulKQL marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
<T> void recordBatchElementsCompletion(List<SettableApiFuture<T>> batchElementResultFutures) { | ||
rahulKQL marked this conversation as resolved.
Show resolved
Hide resolved
|
||
final AtomicBoolean markBatchFailure = new AtomicBoolean(); | ||
|
||
for (ApiFuture<T> elementResult : batchElementResultFutures) { | ||
ApiFutures.addCallback( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think try/catch would be easier to read: for (ApiFuture<T> elementFuture : batchElementResultFutures) {
try {
elementFuture.get();
} catch (Throwable t) {
....
}
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also remove the atomic above There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I understood correctly, you meant to block and check if any entry is being failed after I hope after the last commit, this method looks better, Can you please take a look and let me know your thoughts. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All of the element results should be resolved at this point, so nothing should block. If an entry is not resolved at this point then it never will be. This would be due to a bug in the splitResponse. We can address this issue in a follow up PR that would iterate over all of the element results and sets an IllegalStateException for unresolved entries |
||
elementResult, | ||
new ApiFutureCallback<T>() { | ||
@Override | ||
public void onFailure(Throwable throwable) { | ||
if (markBatchFailure.compareAndSet(false, true)) { | ||
partialBatchFailures.incrementAndGet(); | ||
} | ||
rahulKQL marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Class exceptionClass = throwable.getClass(); | ||
|
||
synchronized (lock) { | ||
if (throwable instanceof ApiException) { | ||
StatusCode.Code code = ((ApiException) throwable).getStatusCode().getCode(); | ||
exceptionClass = ApiException.class; | ||
rahulKQL marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if (entryStatusCounts.containsKey(code)) { | ||
Integer statusCount = entryStatusCounts.get(code); | ||
entryStatusCounts.put(code, ++statusCount); | ||
} else { | ||
entryStatusCounts.put(code, 1); | ||
} | ||
rahulKQL marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
if (entryExceptionCounts.containsKey(exceptionClass)) { | ||
Integer exCount = entryExceptionCounts.get(exceptionClass); | ||
entryExceptionCounts.put(exceptionClass, ++exCount); | ||
} else { | ||
entryExceptionCounts.put(exceptionClass, 1); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public void onSuccess(T result) {} | ||
}, | ||
directExecutor()); | ||
} | ||
} | ||
|
||
/** Calculates and formats the message with request and entry failure count. */ | ||
@Nullable | ||
BatchingException asException() { | ||
synchronized (lock) { | ||
int partialFailures = partialBatchFailures.get(); | ||
if (requestExceptionCounts.isEmpty() && partialFailures == 0) { | ||
return null; | ||
} | ||
|
||
StringBuilder sb = new StringBuilder(); | ||
int batchFailures = requestExceptionCounts.size(); | ||
|
||
if (requestExceptionCounts.isEmpty()) { | ||
sb.append("Batching finished with "); | ||
rahulKQL marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} else { | ||
sb.append(String.format("%d batches failed to apply due to: ", batchFailures)); | ||
|
||
for (Class request : requestExceptionCounts.keySet()) { | ||
sb.append( | ||
String.format( | ||
"%d %s ", requestExceptionCounts.get(request), request.getSimpleName())); | ||
if (request.equals(ApiException.class)) { | ||
|
||
sb.append("("); | ||
for (StatusCode.Code statusCode : requestStatusCounts.keySet()) { | ||
sb.append(String.format("%d %s ", requestStatusCounts.get(statusCode), statusCode)); | ||
} | ||
sb.append(") "); | ||
} | ||
} | ||
if (partialFailures > 0) { | ||
sb.append("and "); | ||
} | ||
} | ||
|
||
sb.append(String.format("%d partial failures.", partialFailures)); | ||
if (partialFailures > 0) { | ||
int totalEntriesFailureCount = 0; | ||
for (Integer count : entryExceptionCounts.values()) { | ||
totalEntriesFailureCount += count; | ||
} | ||
|
||
sb.append( | ||
String.format( | ||
" The %d partial failures contained %d entries that failed with: ", | ||
partialFailures, totalEntriesFailureCount)); | ||
|
||
for (Class entry : entryExceptionCounts.keySet()) { | ||
sb.append( | ||
String.format("%d %s ", entryExceptionCounts.get(entry), entry.getSimpleName())); | ||
if (entry.equals(ApiException.class)) { | ||
sb.append("("); | ||
for (StatusCode.Code code : entryStatusCounts.keySet()) { | ||
sb.append(String.format("%d %s ", entryStatusCounts.get(code), code)); | ||
} | ||
sb.append(") "); | ||
} | ||
} | ||
sb.append("."); | ||
} | ||
return new BatchingException(sb.toString()); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
/* | ||
* Copyright 2019 Google LLC | ||
* | ||
* Redistribution and use in source and binary forms, with or without | ||
* modification, are permitted provided that the following conditions are | ||
* met: | ||
* | ||
* * Redistributions of source code must retain the above copyright | ||
* notice, this list of conditions and the following disclaimer. | ||
* * Redistributions in binary form must reproduce the above | ||
* copyright notice, this list of conditions and the following disclaimer | ||
* in the documentation and/or other materials provided with the | ||
* distribution. | ||
* * Neither the name of Google LLC nor the names of its | ||
* contributors may be used to endorse or promote products derived from | ||
* this software without specific prior written permission. | ||
* | ||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS | ||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT | ||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR | ||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT | ||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, | ||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT | ||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, | ||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY | ||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT | ||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE | ||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | ||
*/ | ||
package com.google.api.gax.batching; | ||
|
||
import com.google.api.core.BetaApi; | ||
import com.google.api.core.InternalExtensionOnly; | ||
|
||
/** Represents exception occurred during batching. */ | ||
@BetaApi("The surface for batching is not stable yet and may change in the future.") | ||
@InternalExtensionOnly("For google-cloud-java client use only.") | ||
rahulKQL marked this conversation as resolved.
Show resolved
Hide resolved
|
||
public class BatchingException extends RuntimeException { | ||
|
||
BatchingException(String message) { | ||
super(message); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we push this down into Batch to make it symmetrical with onBatchSuccess?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually tried with the same approach, but If some exception occurs while post-processing in
splitResponse()
then we would end up with 1 batch and n partial failures even though some elements in the batch could be successful.gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java
Lines 251 to 267 in 39fb471
I thought that would be a misleading figure, that why I kept the
BatcherStats#recordBatchFailure
here.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to keep the code symmetrical and then have a follow up PR that handles failures from splitResponse
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion, Updated this change with symmetrical code and added a
TODO
as well forsplitResponse()
.