Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry mechanism doesn't work in bulk operation #2196

Open
itayB opened this issue Apr 9, 2023 · 1 comment
Open

Retry mechanism doesn't work in bulk operation #2196

itayB opened this issue Apr 9, 2023 · 1 comment
Assignees

Comments

@itayB
Copy link
Contributor

itayB commented Apr 9, 2023

Elasticsearch version: 8.6.1 (docker)

elasticsearch-py version: 8.7.0

Description of the problem including expected versus actual behavior:
I am using the async_bulk from helpers to fill my index with documents. If one of the cluster's nodes goes down (causing unavailable_shards_exception), the retry mechanism doesn't seems to work and the bulk fails after the first failure.

Steps to reproduce:
Bring up a two nodes es01, es02 cluster with docker compose and mount the port 9200 of the first one (es01) to localhost.
Create an index with more than one shard (to ensure that both nodes contains shard of this index). In the middle of indexing, run:

docker kill es02

The bulk operation will stop after it first attempt, although we define 10 retries:

2023-04-09 09:54:49,667 11388 WARNING Node <AiohttpHttpNode(http://localhost:9200)> has failed for 1 times in a row, putting on 1 second timeout [_node_pool.py:246]
2023-04-09 09:54:49,667 11388 WARNING Retrying request after failure (attempt 0 of 10) [_async_transport.py:332]
Traceback (most recent call last):
  File "/Users/itay/workspace/aud-elasticsearch-indexer/venv/lib/python3.11/site-packages/elastic_transport/_async_transport.py", line 259, in perform_request
    resp = await node.perform_request(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/itay/workspace/aud-elasticsearch-indexer/venv/lib/python3.11/site-packages/elastic_transport/_node/_http_aiohttp.py", line 218, in perform_request
    raise err from None
elastic_transport.ConnectionTimeout: Connection timeout caused by: TimeoutError()
2023-04-09 09:54:52,659 11390 ERROR   205 document(s) failed to index: [{'index': {'_index': '8776374_23-04-09_09-52-11', '_id': 'DrvKZIcBaGvN-i7R6kmA', 'status': 503, 'error': {'type': 'unavailable_shards_exception', 'reason': '[8776374_23-04-09_09-52-11][8] primary shard is not active Timeout: [1m], request: [BulkShardRequest [[8776374_23-04-09_09-52-11][8]] containing [42] requests]'}}}, {'index': {'_index': '8776374_23-04-09_09-52-11', '_id': 'D7vKZIcBaGvN-i7R6kmA', 'status': 503, 'error': {'type': 'unavailable_shards_exception', 'reason': '[8776374_23-04-09_09-52-11][0] primary shard is not active Timeout: [1m], request: [BulkShardRequest [[8776374_23-04-09_09-52-11][0]] containing [34] requests]'}}}, {'index': {'_index': '8776374_23-04-09_09-52-11', '_id': 'ELvKZIcBaGvN-i7R6kmA', 'status': 503, 'error': {'type': 'unavailable_shards_exception', 'reason': '[8776374_23-04-09_09-52-11][4] primary shard is not active Timeout: [1m], request: [BulkShardRequest [[8776374_23-04-09_09-52-11][4]] containing [29] requests]'}}}, {'index': {'_index': '8776374_23-04-09_09-52-11', '_id': 'ErvKZIcBaGvN-i7R6kmA', 'status': 503, 'error': {'type': 'unavailable_shards_exception', 'reason': '[8776374_23-04-09_09-52-11][10] primary shard is not active Timeout: [1m], request: [BulkShardRequest [[8776374_23-04-09_09-52-11][10]] containing [25] requests]'}}}, {'index': {'_index': '8776374_23-04-09_09-52-11', '_id': 'FbvKZIcBaGvN-i7R6kmA', 'status': 503, 'error': {'type': 'unavailable_shards_exception', 'reason': '[8776374_23-04-09_09-52-11][2] primary shard is not active Timeout: [1m], request: [BulkShardRequest [[8776374_23-04-09_09-52-11][2]] containing [40] requests]'}}}] [index.py:115]
2023-04-09 09:54:52,660 11390 ERROR   fail indexing extendedEs/2023-04-04/8776374/block30/part-r-04061.avro [index.py:194]
Traceback (most recent call last):
  File "/Users/itay/workspace/aud-elasticsearch-indexer/aud_elasticsearch_indexer/index.py", line 183, in async_wrapper_download_and_index
    await download_and_index(
  File "/Users/itay/workspace/aud-elasticsearch-indexer/aud_elasticsearch_indexer/index.py", line 157, in download_and_index
    await write_to_elasticsearch(
  File "/Users/itay/workspace/aud-elasticsearch-indexer/aud_elasticsearch_indexer/index.py", line 118, in write_to_elasticsearch
    raise Exception("failed to index to Elasticsearch")
Exception: failed to index to Elasticsearch

Code snippet:

es_async = AsyncElasticsearch(
    "http://localhost:9200",
    http_compress=True,
    request_timeout=60,
    max_retries=10,
    retry_on_timeout=True,
)

docs = [
  # generate docs here
]

await async_bulk(
    es_async,
    docs,
    request_timeout=ELASTICSEARCH_TIMEOUT_IN_SECONDS,
    raise_on_error=False,
)

Note that in other operations, not from helpers, such as create index, force_merge, .. - the retry mechanism works in case node goes down.

@itayB
Copy link
Contributor Author

itayB commented May 14, 2023

@ezimuel I wonder if there is something I can help with, is there additional information needed here? any workaround I can do meanwhile?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants