-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Decrease local parallelism of processors in JdbcSqlConnector [HZ-1637] #24534
Decrease local parallelism of processors in JdbcSqlConnector [HZ-1637] #24534
Conversation
The call to `Map#clear` with enabled `MapStore` results in at least 1 call to `MapStore#deleteAll` for each partition. The ultimate fix would be add `MapStore#deleteAll` method, but the benefit of such API in MapStore is unclear. However, it made the following issue visible: When not explicitly set via forceTotalParallelismOne or setLocalParallelism the local parallelism default value is larger than 1. This results in 2 or more processors being created for each insert/update/delete/sink into. Each processor requests a connection from a pool. It is possible that 2 concurrently running queries borrow 1 connection each, exhausting the pool resulting in a deadlock, which ends after 30 s timeout (default) from the connection pool. It is arguably a user error that the connection pool size is too low, but with localParallelism>1 it is really easy to hit this issue (as seen in the `Map#clear`) Also it is theoretically possible to hit this issue across multiple members (processor on one member successfuly obtains connection from the pool, but waits on another member and vice versa), but I didn't manage to reproduce this, so I didn't focus on fixing that issue. In the future we can make this behaviour smarter for insert / sink into or update / delete which have input and could make use of higher local parallelism. Fixes hazelcast#22567
The job Click to expand the log file-------------------------- -------TEST FAILURE------- -------------------------- [INFO] Results: [INFO] [ERROR] Failures: [ERROR] StreamKafkaPTest.when_processingGuaranteeNoneWithConsumerGroup_then_continueFromLastReadMessageAfterJobRestart:265->testWithJobRestart:347->HazelcastTestSupport.assertTrueEventually:1304->HazelcastTestSupport.assertTrueEventually:1285->lambda$testWithJobRestart$4:347 expected:<200> but was:<198> [INFO] [ERROR] Tests run: 51, Failures: 1, Errors: 0, Skipped: 1 [INFO] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Theoretically it should be executed on 1 member, but I guess it's a different pool, so little risk or exhausting connections.
I'm not very convinced why insert
has localParallelism=1 and not update, sink into and delete. IMHO all of them should have the same value, am I wrong?
So after this PR we have the following:
|
Number of connections may be limited on database side also. In such case 2 concurrent jobs can deadlock if DB does not accept more connections and pool on HZ side has dynamic size. |
@@ -354,7 +354,7 @@ public VertexWithInputConfig insertProcessor(@Nonnull DagBuildContext context) { | |||
builder.query(), | |||
table.getBatchLimit() | |||
) | |||
)); | |||
).localParallelism(1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will likely increase run time for larger inserts
@@ -501,7 +501,7 @@ public Vertex sinkProcessor(@Nonnull DagBuildContext context) { | |||
upsertStatement, | |||
jdbcTable.getBatchLimit() | |||
) | |||
); | |||
).localParallelism(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above - cap for parallelism increases runtime
This change impacts performance of normal SQL queries, not only generic map store. |
IMO in the future we may implement some strategy to partition such selects, eg. |
The call to
Map#clear
with enabledMapStore
results in at least 1 call toMapStore#deleteAll
for each partition. The ultimate fix would be to addMapStore#deleteAll
method, but the benefit of such API in MapStore is unclear.However, it made the following issue visible:
When not explicitly set via forceTotalParallelismOne or setLocalParallelism the local parallelism default value is larger than
It is arguably a user error that the connection pool size is too low, but with localParallelism>1 it is really easy to hit this issue (as seen in the
Map#clear
)Also, it is theoretically possible to hit this issue across multiple members (the processor on one member successfully obtains a connection from the pool, but waits on another member and vice versa), but I didn't manage to reproduce this, so I didn't focus on fixing that issue.
In the future, we can make this behaviour smarter for insert/sink into or update/delete which have input and could make use of higher local parallelism.
Fixes #22567
Checklist:
Team:
,Type:
,Source:
,Module:
) and Milestone setAdd to Release Notes
orNot Release Notes content
set