Skip to content

Commit

Permalink
Merge pull request #31151 from andyzhangdialpad/master
Browse files Browse the repository at this point in the history
ElasticsearchIO: handle org.elasticsearch.client.ResponseException gracefully
  • Loading branch information
egalpin committed May 15, 2024
2 parents 54db453 + 68790eb commit 3d897b2
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -310,4 +310,10 @@ public void testWriteWindowPreservation() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWindowPreservation();
}

@Test
public void testWriteWithClientResponseException() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithElasticClientResponseException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.FAMOUS_SCIENTISTS;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.INVALID_DOCS_IDS;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.INVALID_LONG_ID;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.NUM_SCIENTISTS;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.SCRIPT_SOURCE;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.countByMatch;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.countByScientistName;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.flushAndRefreshAllIndices;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.insertTestDocuments;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.mapToInputId;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.mapToInputIdString;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestUtils.refreshIndexAndGetCurrentNumDocs;
import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
import static org.apache.beam.sdk.values.TypeDescriptors.integers;
Expand Down Expand Up @@ -460,6 +462,46 @@ void testWriteWithErrorsReturnedAllowedErrors() throws Exception {
pipeline.run();
}

void testWriteWithElasticClientResponseException() throws Exception {
Write write =
ElasticsearchIO.write()
.withConnectionConfiguration(connectionConfiguration)
.withMaxBatchSize(numDocs + 1)
.withMaxBatchSizeBytes(
Long.MAX_VALUE) // Max long number to make sure all docs are flushed in one batch.
.withThrowWriteErrors(false)
.withIdFn(new ExtractValueFn("id"))
.withUseStatefulBatches(true);

List<String> data =
ElasticsearchIOTestUtils.createDocuments(
numDocs, ElasticsearchIOTestUtils.InjectionMode.INJECT_ONE_ID_TOO_LONG_DOC);

PCollectionTuple outputs = pipeline.apply(Create.of(data)).apply(write);

// The whole batch should fail and direct to tag FAILED_WRITES because of one invalid doc.
PCollection<String> success =
outputs
.get(Write.SUCCESSFUL_WRITES)
.apply("Convert success to input ID", MapElements.via(mapToInputIdString));

PCollection<String> fail =
outputs
.get(Write.FAILED_WRITES)
.apply("Convert fails to input ID", MapElements.via(mapToInputIdString));

Set<String> failedIds =
IntStream.range(0, data.size() - 1).mapToObj(String::valueOf).collect(Collectors.toSet());
failedIds.add(INVALID_LONG_ID);
PAssert.that(success).empty();
PAssert.that(fail).containsInAnyOrder(failedIds);

// Verify response item contains the corresponding error message.
PAssert.that(outputs.get(Write.FAILED_WRITES))
.satisfies(responseItemJsonSubstringValidator("java.io.IOException"));
pipeline.run();
}

void testWriteWithAllowedErrors() throws Exception {
Write write =
ElasticsearchIO.write()
Expand Down Expand Up @@ -1349,4 +1391,14 @@ SerializableFunction<Iterable<Document>, Void> windowPreservationValidator(
return null;
};
}

SerializableFunction<Iterable<Document>, Void> responseItemJsonSubstringValidator(
String responseItemSubstring) {
return input -> {
for (Document d : input) {
assertTrue(d.getResponseItemJson().contains(responseItemSubstring));
}
return null;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class ElasticsearchIOTestUtils {
static final String ELASTICSEARCH_PASSWORD = "superSecure";
static final String ELASTIC_UNAME = "elastic";
static final Set<Integer> INVALID_DOCS_IDS = new HashSet<>(Arrays.asList(6, 7));
static final String INVALID_LONG_ID = new String(new char[513]).replace('\0', '2');
static final String ALIAS_SUFFIX = "-aliased";

static final String[] FAMOUS_SCIENTISTS = {
Expand All @@ -80,7 +81,8 @@ class ElasticsearchIOTestUtils {
/** Enumeration that specifies whether to insert malformed documents. */
public enum InjectionMode {
INJECT_SOME_INVALID_DOCS,
DO_NOT_INJECT_INVALID_DOCS
DO_NOT_INJECT_INVALID_DOCS,
INJECT_ONE_ID_TOO_LONG_DOC
}

/** Deletes the given index synchronously. */
Expand Down Expand Up @@ -349,6 +351,14 @@ static List<String> createDocuments(long numDocs, InjectionMode injectionMode) {
FAMOUS_SCIENTISTS[index], i, baseDateTime.plusSeconds(i).toString()));
}
}
// insert 1 additional id too long doc. It should trigger
// org.elasticsearch.client.ResponseException.
if (InjectionMode.INJECT_ONE_ID_TOO_LONG_DOC.equals(injectionMode)) {
data.add(
String.format(
"{\"scientist\":\"invalid_scientist\", \"id\":%s, \"@timestamp\" : \"%s\"}",
INVALID_LONG_ID, baseDateTime));
}
return data;
}

Expand Down Expand Up @@ -524,4 +534,18 @@ public Integer apply(Document document) {
}
}
};

static SimpleFunction<Document, String> mapToInputIdString =
new SimpleFunction<Document, String>() {
@Override
public String apply(Document document) {
try {
// Account for intentionally invalid input json docs
String fixedJson = document.getInputDoc().replaceAll(";", ":");
return MAPPER.readTree(fixedJson).path("id").asText();
} catch (JsonProcessingException e) {
return "-1";
}
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import org.checkerframework.checker.nullness.qual.Nullable;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.joda.time.Duration;
Expand Down Expand Up @@ -2560,11 +2561,12 @@ public BulkIO withMaxParallelRequests(int maxParallelRequests) {
/**
* Whether to throw runtime exceptions when write (IO) errors occur. Especially useful in
* streaming pipelines where non-transient IO failures will cause infinite retries. If true, a
* runtime error will be thrown for any error found by {@link
* ElasticsearchIO#createWriteReport}. If false, a {@link PCollectionTuple} will be returned
* with tags {@link Write#SUCCESSFUL_WRITES} and {@link Write#FAILED_WRITES}, each being a
* {@link PCollection} of {@link Document} representing documents which were written to
* Elasticsearch without errors and those which failed to write due to errors, respectively.
* runtime error will be thrown for any error found by {@link ElasticsearchIO#createWriteReport}
* and/or java.io.IOException (which is what org.elasticsearch.client.ResponseException based
* on) found by in batch flush. If false, a {@link PCollectionTuple} will be returned with tags
* {@link Write#SUCCESSFUL_WRITES} and {@link Write#FAILED_WRITES}, each being a {@link
* PCollection} of {@link Document} representing documents which were written to Elasticsearch
* without errors and those which failed to write due to errors, respectively.
*
* @param throwWriteErrors whether to surface write errors as runtime exceptions or return them
* in a {@link PCollection}
Expand Down Expand Up @@ -2788,6 +2790,13 @@ private boolean isRetryableClientException(Throwable t) {
// RestClient#performRequest only throws wrapped IOException so we must inspect the
// exception cause to determine if the exception is likely transient i.e. retryable or
// not.

// Retry for 500-range response code except for 501.
if (t.getCause() instanceof ResponseException) {
ResponseException ex = (ResponseException) t.getCause();
int statusCode = ex.getResponse().getStatusLine().getStatusCode();
return statusCode >= 500 && statusCode != 501;
}
return t.getCause() instanceof ConnectTimeoutException
|| t.getCause() instanceof SocketTimeoutException
|| t.getCause() instanceof ConnectionClosedException
Expand Down Expand Up @@ -2830,6 +2839,9 @@ private List<Document> flushBatch() throws IOException, InterruptedException {

HttpEntity requestBody =
new NStringEntity(bulkRequest.toString(), ContentType.APPLICATION_JSON);

String elasticResponseExceptionMessage = null;

try {
Request request = new Request("POST", endPoint);
request.addParameters(Collections.emptyMap());
Expand All @@ -2838,12 +2850,18 @@ private List<Document> flushBatch() throws IOException, InterruptedException {
responseEntity = new BufferedHttpEntity(response.getEntity());
} catch (java.io.IOException ex) {
if (spec.getRetryConfiguration() == null || !isRetryableClientException(ex)) {
throw ex;
if (spec.getThrowWriteErrors()) {
throw ex;
} else {
elasticResponseExceptionMessage = ex.getMessage();
}
} else {
LOG.error("Caught ES timeout, retrying", ex);
}
LOG.error("Caught ES timeout, retrying", ex);
}

if (spec.getRetryConfiguration() != null
&& elasticResponseExceptionMessage == null
&& (response == null
|| responseEntity == null
|| spec.getRetryConfiguration().getRetryPredicate().test(responseEntity))) {
Expand All @@ -2854,9 +2872,25 @@ private List<Document> flushBatch() throws IOException, InterruptedException {
responseEntity = handleRetry("POST", endPoint, Collections.emptyMap(), requestBody);
}

List<Document> responses =
createWriteReport(
responseEntity, spec.getAllowedResponseErrors(), spec.getThrowWriteErrors());
List<Document> responses;
// If java.io.IOException was thrown, return all input Documents with
// withHasError(true)
// so that they could be caught by FAILED_WRITES tag.
if (elasticResponseExceptionMessage != null) {
String errorJsonMessage =
String.format(
"{\"message\":\"java.io.IOException was thrown in batch flush: %s\"}",
elasticResponseExceptionMessage);

responses =
inputEntries.stream()
.map(doc -> doc.withHasError(true).withResponseItemJson(errorJsonMessage))
.collect(Collectors.toList());
} else {
responses =
createWriteReport(
responseEntity, spec.getAllowedResponseErrors(), spec.getThrowWriteErrors());
}

return Streams.zip(
inputEntries.stream(),
Expand Down

0 comments on commit 3d897b2

Please sign in to comment.