Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Commit method respond slow (was: Make poll interval adaptive) #128

Open
blindspotbounty opened this issue Aug 31, 2023 · 3 comments
Open

Comments

@blindspotbounty
Copy link
Collaborator

Currently, even if there are move than 100 events (which is hardcoded), we won't continue polling but call Task.sleep() which seems pretty strange.
I would like to suggest some adaptive polling interval, which would be depending on events: e.g.:

func eventPoll(events: inout [KafkaEvent], maxEvents: inout Int) -> Bool /* or enum { case shouldSleep, case shouldYield } */{
        events.removeAll(keepingCapacity: true)
        events.reserveCapacity(maxEvents)
        
        var shouldSleep = true

for ...
            switch eventType {
            case .deliveryReport:
                let forwardEvent = self.handleDeliveryReportEvent(event)
                events.append(forwardEvent)
                shouldSleep = false
            case .fetch:
                if let forwardEvent = self.handleFetchEvent(event) {
                    events.append(forwardEvent)
                    shouldSleep = false
                }
            case .log:
                self.handleLogEvent(event)
            case .offsetCommit:
                self.handleOffsetCommitEvent(event)
                shouldSleep = false
            case .statistics:
                events.append(self.handleStatistics(event))
            case .rebalance:
                events.append(self.handleRebalance(event))
                shouldSleep = false
            case .error:
                break
            case .none:
                // Finished reading events, return early
                return shouldSleep
            default:
                break // Ignored Event
            }

The idea is to then adjust polling time the following way:

            case .pollForAndYieldMessage(let client, let source, let eventSource):
                let shouldSleep = client.eventPoll(events: &events, maxEvents: &maxEvents)
                for event in events { ... }
                if shouldSleep {
                    pollInterval = min(self.configuration.pollInterval, pollInterval * 2)
                    try await Task.sleep(for: pollInterval)
                } else {
                    pollInterval = max(pollInterval / 3, .milliseconds(1))
                    await Task.yield()
                }

That might be also useful to adjust events size but not sure if it is feasible.

@FranzBusch
Copy link
Contributor

Could you elaborate why we might want to do this? Are you running into any problems with the current behaviour?

@blindspotbounty
Copy link
Collaborator Author

Sure, this is primarily related to performance with commitSync.
Let's take some simple example:

for try await message in consumer.messages {
    print("Current ts: \(Date.now.timeIntervalSince1970)")
    try await consumer.commitSync(message)
}

All timestamps will differ with 100ms (default poll interval):

Current ts: 1694001547.020994
Current ts: 1694001547.125323
Current ts: 1694001547.227994
Current ts: 1694001547.3286428
Current ts: 1694001547.432111
Current ts: 1694001547.53237
Current ts: 1694001547.635818
Current ts: 1694001547.739641
Current ts: 1694001547.840351
Current ts: 1694001547.94223

That is because commit offsetCommit will be handled only after 100ms.
if use less poll interval (e.g. 20ms):

Current ts: 1694001918.638655
Current ts: 1694001918.6592941
Current ts: 1694001918.682397
Current ts: 1694001918.707478
Current ts: 1694001918.730077
Current ts: 1694001918.752017
Current ts: 1694001918.777021
Current ts: 1694001918.8020592
Current ts: 1694001918.827096
Current ts: 1694001918.852134

and if zero:

Current ts: 1694001998.81574
Current ts: 1694001998.816685
Current ts: 1694001998.817298
Current ts: 1694001998.81777
Current ts: 1694001998.8181949
Current ts: 1694001998.818651
Current ts: 1694001998.819083
Current ts: 1694001998.8194818
Current ts: 1694001998.819936
Current ts: 1694001998.82037

Zero is good but it will lead to continuous load for polling. While 100ms is too slow when there are a lot of data.

Therefore I suggest to be adaptive to number of events and e.g. continue processing events non-stop (or Task.yield()) while KafkaClient provide them and tweak time between sleep within some min/max interval.
That way with default configuration (pollInterval=100ms) the outcome will be:

Current ts: 1694002548.8543139
Current ts: 1694002548.891454
Current ts: 1694002548.914485
Current ts: 1694002548.93012
Current ts: 1694002548.942556
Current ts: 1694002548.949646
Current ts: 1694002548.955224
Current ts: 1694002548.959221
Current ts: 1694002548.961931
Current ts: 1694002548.964264

At the same time, if there are no event or they are not often, interval will be adjusted to corresponding message/offset commit rate.

If compare time just for 100 messages and default pollInterval the difference for test time will be the following (I've modified test 'testProduceAndConsumeWithMessageHeaders' for 100 events and time measurement before/after loop).
For fixed poll interval (100ms):

Test Suite 'KafkaTests' passed at 2023-09-06 15:22:48.479.
	 Executed 1 test, with 0 failures (0 unexpected) in 11.715 (11.715) seconds
Test Suite 'swift-kafka-clientPackageTests.xctest' passed at 2023-09-06 15:22:48.479.
	 Executed 1 test, with 0 failures (0 unexpected) in 11.715 (11.715) seconds
Test Suite 'Selected tests' passed at 2023-09-06 15:22:48.479.
	 Executed 1 test, with 0 failures (0 unexpected) in 11.715 (11.716) seconds
Time passed 11.374598979949951 for count 100

For fixed poll interval (20ms):

Test Suite 'KafkaTests' passed at 2023-09-06 15:25:42.750.
	 Executed 1 test, with 0 failures (0 unexpected) in 3.720 (3.720) seconds
Test Suite 'swift-kafka-clientPackageTests.xctest' passed at 2023-09-06 15:25:42.750.
	 Executed 1 test, with 0 failures (0 unexpected) in 3.720 (3.720) seconds
Test Suite 'Selected tests' passed at 2023-09-06 15:25:42.750.
	 Executed 1 test, with 0 failures (0 unexpected) in 3.720 (3.721) seconds
Time passed 3.4709099531173706 for count 100

For fixed poll interval (1ms):

Test Suite 'KafkaTests' passed at 2023-09-06 15:42:03.707.
	 Executed 1 test, with 0 failures (0 unexpected) in 1.453 (1.453) seconds
Test Suite 'swift-kafka-clientPackageTests.xctest' passed at 2023-09-06 15:42:03.707.
	 Executed 1 test, with 0 failures (0 unexpected) in 1.453 (1.453) seconds
Test Suite 'Selected tests' passed at 2023-09-06 15:42:03.707.
	 Executed 1 test, with 0 failures (0 unexpected) in 1.453 (1.454) seconds
Time passed 1.2268959283828735 for count 100

For zero:

Test Suite 'KafkaTests' passed at 2023-09-06 15:26:39.346.
	 Executed 1 test, with 0 failures (0 unexpected) in 1.365 (1.365) seconds
Test Suite 'swift-kafka-clientPackageTests.xctest' passed at 2023-09-06 15:26:39.346.
	 Executed 1 test, with 0 failures (0 unexpected) in 1.365 (1.365) seconds
Test Suite 'Selected tests' passed at 2023-09-06 15:26:39.346.
	 Executed 1 test, with 0 failures (0 unexpected) in 1.365 (1.366) seconds
Time passed 1.0599110126495361 for count 100

For adaptive (with max 100ms):

Test Suite 'KafkaTests' passed at 2023-09-06 15:22:07.496.
	 Executed 1 test, with 0 failures (0 unexpected) in 1.488 (1.488) seconds
Test Suite 'swift-kafka-clientPackageTests.xctest' passed at 2023-09-06 15:22:07.496.
	 Executed 1 test, with 0 failures (0 unexpected) in 1.488 (1.488) seconds
Test Suite 'Selected tests' passed at 2023-09-06 15:22:07.496.
	 Executed 1 test, with 0 failures (0 unexpected) in 1.488 (1.488) seconds
Time passed 1.2500579357147217 for count 100

For adaptive (with max 20ms):

Test Suite 'KafkaTests' passed at 2023-09-06 15:39:42.329.
	 Executed 1 test, with 0 failures (0 unexpected) in 1.381 (1.381) seconds
Test Suite 'swift-kafka-clientPackageTests.xctest' passed at 2023-09-06 15:39:42.329.
	 Executed 1 test, with 0 failures (0 unexpected) in 1.381 (1.381) seconds
Test Suite 'Selected tests' passed at 2023-09-06 15:39:42.329.
	 Executed 1 test, with 0 failures (0 unexpected) in 1.381 (1.382) seconds
Time passed 1.1491138935089111 for count 100

For adaptive (with max 1ms):

Test Suite 'KafkaTests' passed at 2023-09-06 15:40:17.853.
	 Executed 1 test, with 0 failures (0 unexpected) in 1.373 (1.373) seconds
Test Suite 'swift-kafka-clientPackageTests.xctest' passed at 2023-09-06 15:40:17.853.
	 Executed 1 test, with 0 failures (0 unexpected) in 1.373 (1.373) seconds
Test Suite 'Selected tests' passed at 2023-09-06 15:40:17.853.
	 Executed 1 test, with 0 failures (0 unexpected) in 1.373 (1.374) seconds
Time passed 1.1237449645996094 for count 100

For adaptive (with max 0ms) - just for sanity

Test Suite 'KafkaTests' passed at 2023-09-06 15:43:13.706.
	 Executed 1 test, with 0 failures (0 unexpected) in 1.362 (1.362) seconds
Test Suite 'swift-kafka-clientPackageTests.xctest' passed at 2023-09-06 15:43:13.706.
	 Executed 1 test, with 0 failures (0 unexpected) in 1.362 (1.362) seconds
Test Suite 'Selected tests' passed at 2023-09-06 15:43:13.706.
	 Executed 1 test, with 0 failures (0 unexpected) in 1.362 (1.362) seconds
Time passed 1.0795170068740845 for count 100

So, that should allow to maintain low cpu usage when there are a small number of events (make rare polls) and be pro-active when there are spikes of data.

@blindspotbounty blindspotbounty changed the title Make poll interval adaptive Commit method respond slow (was: Make poll interval adaptive) Nov 14, 2023
@blindspotbounty
Copy link
Collaborator Author

I've renamed the issue to the problem that we see.

While benchmark infrastructure is on the way, I wrote couple of tests within not merged infrastructure.

There are some results, with current swift-kafka-client implementation.
Sources can be found at: https://github.com/ordo-one/swift-kafka-client/tree/perf-consumer-with-backpressure
It is possible to run them with swift package --disable-sandbox benchmark --filter "SwiftKafkaConsumer - manual commits.*".

Results:

Host 'MacBook-Pro.local' with 12 'arm64' processors with 96 GB memory, running:
Darwin Kernel Version 23.0.0: Fri Sep 15 14:43:05 PDT 2023; root:xnu-10002.1.13~1/RELEASE_ARM64_T6020

============================
SwiftKafkaConsumerBenchmarks
============================

SwiftKafkaConsumer - manual commits - pollInterval 0ms
╒════════════════════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╕
│ Metric                 │      p0 │     p25 │     p50 │     p75 │     p90 │     p99 │    p100 │ Samples │
╞════════════════════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╡
│ Memory (allocated) (M) │      51 │      51 │      51 │      51 │      51 │      51 │      51 │       1 │
├────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Time (total CPU) (s)   │      12 │      12 │      12 │      12 │      12 │      12 │      12 │       1 │
├────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Time (wall clock) (s)  │      98 │      98 │      98 │      98 │      98 │      98 │      98 │       1 │
╘════════════════════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╛

SwiftKafkaConsumer - manual commits - pollInterval 100ms (default)
╒════════════════════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╕
│ Metric                 │      p0 │     p25 │     p50 │     p75 │     p90 │     p99 │    p100 │ Samples │
╞════════════════════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╡
│ Memory (allocated) (M) │      43 │      43 │      43 │      43 │      43 │      43 │      43 │       1 │
├────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Time (total CPU) (s)   │      11 │      11 │      11 │      11 │      11 │      11 │      11 │       1 │
├────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Time (wall clock) (s)  │      97 │      97 │      97 │      97 │      97 │      97 │      97 │       1 │
╘════════════════════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╛

SwiftKafkaConsumer - manual commits - pollInterval 1ms
╒════════════════════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╕
│ Metric                 │      p0 │     p25 │     p50 │     p75 │     p90 │     p99 │    p100 │ Samples │
╞════════════════════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╡
│ Memory (allocated) (M) │      41 │      41 │      41 │      41 │      41 │      41 │      41 │       1 │
├────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Time (total CPU) (s)   │      12 │      12 │      12 │      12 │      12 │      12 │      12 │       1 │
├────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Time (wall clock) (s)  │      97 │      97 │      97 │      97 │      97 │      97 │      97 │       1 │
╘════════════════════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╛

With adaptive poll. Sources can be found at: https://github.com/ordo-one/swift-kafka-client/tree/perf-consumer-with-backpressure--adaptive
It is possible to run them with the same command swift package --disable-sandbox benchmark --filter "SwiftKafkaConsumer - manual commits.*".

Results:

Host 'MacBook-Pro.local' with 12 'arm64' processors with 96 GB memory, running:
Darwin Kernel Version 23.0.0: Fri Sep 15 14:43:05 PDT 2023; root:xnu-10002.1.13~1/RELEASE_ARM64_T6020

============================
SwiftKafkaConsumerBenchmarks
============================

SwiftKafkaConsumer - manual commits - pollInterval 0ms
╒════════════════════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╕
│ Metric                 │      p0 │     p25 │     p50 │     p75 │     p90 │     p99 │    p100 │ Samples │
╞════════════════════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╡
│ Memory (allocated) (M) │      43 │      43 │      43 │      43 │      43 │      43 │      43 │       1 │
├────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Time (total CPU) (ms)  │    8797 │    8797 │    8797 │    8797 │    8797 │    8797 │    8797 │       1 │
├────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Time (wall clock) (s)  │      51 │      51 │      51 │      51 │      51 │      51 │      51 │       1 │
╘════════════════════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╛

SwiftKafkaConsumer - manual commits - pollInterval 100ms (default)
╒════════════════════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╕
│ Metric                 │      p0 │     p25 │     p50 │     p75 │     p90 │     p99 │    p100 │ Samples │
╞════════════════════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╡
│ Memory (allocated) (M) │      45 │      45 │      45 │      45 │      45 │      45 │      45 │       1 │
├────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Time (total CPU) (ms)  │    8478 │    8478 │    8478 │    8478 │    8478 │    8478 │    8478 │       1 │
├────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Time (wall clock) (s)  │      51 │      51 │      51 │      51 │      51 │      51 │      51 │       1 │
╘════════════════════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╛

SwiftKafkaConsumer - manual commits - pollInterval 1ms
╒════════════════════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╕
│ Metric                 │      p0 │     p25 │     p50 │     p75 │     p90 │     p99 │    p100 │ Samples │
╞════════════════════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╡
│ Memory (allocated) (M) │      43 │      43 │      43 │      43 │      43 │      43 │      43 │       1 │
├────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Time (total CPU) (ms)  │    8634 │    8634 │    8634 │    8634 │    8634 │    8634 │    8634 │       1 │
├────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Time (wall clock) (s)  │      51 │      51 │      51 │      51 │      51 │      51 │      51 │       1 │
╘════════════════════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╛

The difference is about 2 times by wall clock and 1.5 by cpu time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants