Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inconsistent results in pull queries with distributed KsqlDB setup #10241

Open
xneg opened this issue Feb 23, 2024 · 1 comment
Open

Inconsistent results in pull queries with distributed KsqlDB setup #10241

xneg opened this issue Feb 23, 2024 · 1 comment

Comments

@xneg
Copy link

xneg commented Feb 23, 2024

Setup

First of all, we have 6 machines each containing its instance of running in Docker from the image confluentinc/ksqldb-server v 0.29.0.

Second, we have this setup:

listeners=http://0.0.0.0:8088/
ksq.advertised_listener  is set for each node
ksql.heartbeat.enable=true
ksql.streams.num.standby.replicas=1
ksql.query.pull.enable.standby.reads=true
ksql.heartbeat.enable=true

We have a scenario very similar to what is described here.

  1. We have an input topic with 60 partitions. Topic's name is events.
  2. We declared a stream:
CREATE STREAM EVENTS (EVENT_TYPE STRING, TS STRING) WITH (CLEANUP_POLICY='delete', FORMAT='json', KAFKA_TOPIC='events', TIMESTAMP='ts', TIMESTAMP_FORMAT='yyyy-MM-dd''T''HH:mm:ssX');
  1. We defined a table with aggregations like this:
CREATE TABLE EVENTS_HOURLY_COUNTS AS
SELECT EVENTS.ROW_PARTITION AS PARTITION,
       COUNT(*)
FROM EVENTS
WINDOW TUMBLING ( SIZE 1 HOURS )
GROUP EVENTS.BY ROW_PARTITION
EMIT CHANGES;

It created for us 3 topics, 1 visible and 2 hidden.

Kafka Topic                                                                                             | Partitions | Partition Replicas
-------------------------------------------------------------------------------------------------------------------------------------------
 EVENTS_HOURLY_COUNTS                                                                                      | 60         | 2
 _confluent-ksql-data_query_CTAS_EVENTS_HOURLY_COUNTS_209-Aggregate-Aggregate-Materialize-changelog        | 60         | 2
 _confluent-ksql-data_query_CTAS_EVENTS_HOURLY_COUNTS_209-Aggregate-GroupBy-repartition                    | 60         | 2

The problem

When we issue pull queries for this table it returns us sporadically inconsistent results without any errors in logs.
Our queries look like this:

SELECT WINDOWSTART, partition, event_count FROM events_hourly_counts  WHERE WINDOWSTART >= 1708452000000 AND WINDOWEND  <= 1708509600000

We run them against already closed periods so we expect that newly arrived data shouldn't interfere with it.
We expect to get data from 60 partitions per hour but sometimes (roughly 1 out of 10) it returns us fewer rows from 44 to 54 and sometimes even 61.
My guess is some of the nodes "timeout" and do not return results in our multi-node setup but without any errors in logs, it's hard to investigate further.

If anyone could help somehow or point to the direction where to dig it would be great. Thanks in advance!

@xneg
Copy link
Author

xneg commented Feb 27, 2024

Some addition.

I tried to create a table with only one partition:

CREATE TABLE EVENTS_HOURLY_COUNTS WITH (PARTITIONS=1)
AS SELECT EVENTS.ROW_PARTITION AS PARTITION,
       COUNT(*)
FROM EVENTS
WINDOW TUMBLING ( SIZE 1 HOURS )
GROUP EVENTS.BY ROW_PARTITION
EMIT CHANGES;

So there is only one partition but it still collects the keys from 0 to 59. And it's the same behavior. When I run pull query for 20 hours I expect to receive 1200 rows in results. Most times it is 1200 rows but from time to time it could be 1199, 936 or even 1201 rows!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant