Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decrease local parallelism of processors in JdbcSqlConnector [HZ-1637] #24534

Conversation

frant-hartm
Copy link
Contributor

The call to Map#clear with enabled MapStore results in at least 1 call to MapStore#deleteAll for each partition. The ultimate fix would be to add MapStore#deleteAll method, but the benefit of such API in MapStore is unclear.

However, it made the following issue visible:

When not explicitly set via forceTotalParallelismOne or setLocalParallelism the local parallelism default value is larger than

  1. This results in 2 or more processors being created for each insert/update/delete/sink into. Each processor requests a connection from a pool. It is possible that 2 concurrently running queries borrow 1 connection each, exhausting the pool and resulting in a deadlock, which ends after a 30 s timeout (default) from the connection pool.

It is arguably a user error that the connection pool size is too low, but with localParallelism>1 it is really easy to hit this issue (as seen in the Map#clear)

Also, it is theoretically possible to hit this issue across multiple members (the processor on one member successfully obtains a connection from the pool, but waits on another member and vice versa), but I didn't manage to reproduce this, so I didn't focus on fixing that issue.

In the future, we can make this behaviour smarter for insert/sink into or update/delete which have input and could make use of higher local parallelism.

Fixes #22567

Checklist:

  • Labels (Team:, Type:, Source:, Module:) and Milestone set
  • Label Add to Release Notes or Not Release Notes content set
  • Request reviewers if possible

The call to `Map#clear` with enabled `MapStore` results in at least 1
call to `MapStore#deleteAll` for each partition.  The ultimate fix would
be add `MapStore#deleteAll` method, but the benefit of such API in
MapStore is unclear.

However, it made the following issue visible:

When not explicitly set via forceTotalParallelismOne or
setLocalParallelism the local parallelism default value is larger than
1. This results in 2 or more processors being created for each
insert/update/delete/sink into.  Each processor requests a connection
from a pool. It is possible that 2 concurrently running queries borrow 1
connection each, exhausting the pool resulting in a deadlock, which ends
after 30 s timeout (default) from the connection pool.

It is arguably a user error that the connection pool size is too low,
but with localParallelism>1 it is really easy to hit this issue (as seen
in the `Map#clear`)

Also it is theoretically possible to hit this issue across multiple
members (processor on one member successfuly obtains connection from the
pool, but waits on another member and vice versa), but I didn't manage
to reproduce this, so I didn't focus on fixing that issue.

In the future we can make this behaviour smarter for insert / sink into
or update / delete which have input and could make use of higher local
parallelism.

Fixes hazelcast#22567
@hz-devops-test
Copy link

The job Hazelcast-pr-builder of your PR failed. (Hazelcast internal details: build log, artifacts).
Through arcane magic we have determined that the following fragments from the build log may contain information about the problem.

Click to expand the log file
--------------------------
-------TEST FAILURE-------
--------------------------
[INFO] Results:
[INFO] 
[ERROR] Failures: 
[ERROR]   StreamKafkaPTest.when_processingGuaranteeNoneWithConsumerGroup_then_continueFromLastReadMessageAfterJobRestart:265->testWithJobRestart:347->HazelcastTestSupport.assertTrueEventually:1304->HazelcastTestSupport.assertTrueEventually:1285->lambda$testWithJobRestart$4:347 expected:<200> but was:<198>
[INFO] 
[ERROR] Tests run: 51, Failures: 1, Errors: 0, Skipped: 1
[INFO] 

[ERROR] There are test failures.

Copy link
Contributor

@TomaszGaweda TomaszGaweda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theoretically it should be executed on 1 member, but I guess it's a different pool, so little risk or exhausting connections.

I'm not very convinced why insert has localParallelism=1 and not update, sink into and delete. IMHO all of them should have the same value, am I wrong?

@frant-hartm
Copy link
Contributor Author

So after this PR we have the following:

  • SELECT - always uses forceTotalParallelismOne, no risk of deadlock
  • UPDATE/DELETE - depending if they have input or not they either use forceTotalParallelismOne
    if (!hasInput) {
    // There is no input, we push the whole update query to the database, but we need single dummy item
    // to execute the update in WriteJdbcP
    // We can consider refactoring the WriteJdbcP, so it doesn't need the dummy item in the future.
    // Use local member address to run the dummy source and processor on the same member
    Address localAddress = context.getNodeEngine().getThisAddress();
    Vertex v = dummySourceVertex(context, "DummySourceFor" + statement, localAddress);
    Vertex updateVertex = context.getDag().newUniqueVertex(
    statement + "(" + table.getExternalNameList() + ")",
    forceTotalParallelismOne(processorSupplier, localAddress)
    );
    context.getDag().edge(Edge.between(v, updateVertex));
    return updateVertex;

    or LP=1 already
    } else {
    Vertex updateVertex = context.getDag().newUniqueVertex(
    statement + "(" + table.getExternalNameList() + ")",
    processorSupplier
    ).localParallelism(1);
  • INSERT/SINK INTO - runs on all members with LP=1, e.g. stream from kafka into a table, this PR changes both

@frant-hartm frant-hartm merged commit 1c2f539 into hazelcast:master May 12, 2023
8 checks passed
@frant-hartm frant-hartm deleted the fix/5.3/JdbcSqlConnector-decrease-local-parallelism branch May 12, 2023 07:50
@k-jamroz
Copy link
Contributor

it is theoretically possible to hit this issue across multiple members (the processor on one member successfully obtains a connection from the pool, but waits on another member and vice versa), but I didn't manage to reproduce this, so I didn't focus on fixing that issue.

Number of connections may be limited on database side also. In such case 2 concurrent jobs can deadlock if DB does not accept more connections and pool on HZ side has dynamic size.

@@ -354,7 +354,7 @@ public VertexWithInputConfig insertProcessor(@Nonnull DagBuildContext context) {
builder.query(),
table.getBatchLimit()
)
));
).localParallelism(1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will likely increase run time for larger inserts

@@ -501,7 +501,7 @@ public Vertex sinkProcessor(@Nonnull DagBuildContext context) {
upsertStatement,
jdbcTable.getBatchLimit()
)
);
).localParallelism(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above - cap for parallelism increases runtime

@k-jamroz
Copy link
Contributor

This change impacts performance of normal SQL queries, not only generic map store.

@k-jamroz
Copy link
Contributor

SELECT - always uses forceTotalParallelismOne, no risk of deadlock

IMO in the future we may implement some strategy to partition such selects, eg. where id % processorsCount = processorIndex

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Map.clear w/ GenericMapStore and default settings fails w/ small amount of data [HZ-1637]
5 participants