Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

COLLECT_LIST in KSQL returning duplicate results OR retaining previous results #10168

Open
AleksandarTokarev opened this issue Dec 30, 2023 · 13 comments
Labels
bug needs-triage streaming-engine Tickets owned by the ksqlDB Streaming Team

Comments

@AleksandarTokarev
Copy link

AleksandarTokarev commented Dec 30, 2023

Describe the bug
We have a use case where we have one KSQL Table and we have a push query - which queries the table continously and creates/feeds another table from it

CREATE or replace table ITEMS_PER_USER_TABLE
    WITH (PARTITIONS = 12, VALUE_FORMAT = 'JSON') AS
SELECT userId, COLLECT_LIST(
        STRUCT("name":=name, "price":=price, "id":=id, "updatedTime"=:updatedTime)) 
        AS items
FROM ITEMS_TABLE
GROUP BY userId
EMIT CHANGES;

In the table ITEMS_TABLE, we have the items using the id as the primary key, while in the new push query, we want to create a table where we will group the items by userId - and the result will be a list.
What we have noticed is - when everything works good after setup - there are no issues. But when KSQLDB has glitches - goes down and comes back up, or there are issues with it- we are getting duplicate (stale) items in the ITEMS_PER_USER_TABLE (we are not exactly sure why and how this happens).

To Reproduce
Steps to reproduce the behavior, include:

  1. The version of KSQL.
    0.29
  2. Sample source data.
    In the table ITEMS_PER_USER_TABLE - we have the following items
id,name,price,userId,updatedTime
1,name1,10,1,1703927564000
2,name2,20,2,1703927564001
3,name3,11,1,1703927564002
4,name4,22,2,1703927564003

In the ITEMS_PER_USER_TABLE - when all is good - we have the following

userId, items
1, [{id=1,name=name1,price=10,updatedTime=1703927564000},{id=3,name=name3,price=11,updatedTime=1703927564002}]
2, [{id=2,name=name2,price=20,updatedTime=1703927564001},{id=3,name=name3,price=22,updatedTime=1703927564003}]

But when things go wrong (eg. KSQL being down or some issues with it) - a bug occurs - and duplicate items start piling up (and they dont get cleared when ITEMS_TABLE gets more records.

userId, items
1, [{id=1,name=name1,price=10,updatedTime=1703927564000},{id=3,name=name3,price=11,updatedTime=1703927564002},{id=1,name=name1,price=10,updatedTime=1703927565000}]
2, [{id=2,name=name2,price=20,updatedTime=1703927564001},{id=4,name=name3,price=22,updatedTime=1703927564003},{id=2,name=name2,price=20,updatedTime=1703927566001},{id=4,name=name3,price=22,updatedTime=1703927574003}]
  1. Any SQL statements you ran
CREATE or replace table ITEMS_PER_USER_TABLE
    WITH (PARTITIONS = 12, VALUE_FORMAT = 'JSON') AS
SELECT userId, COLLECT_LIST(
        STRUCT("name":=name, "price":=price, "id":=id)) 
        AS items
FROM ITEMS_TABLE
GROUP BY userId
EMIT CHANGES;

Expected behavior
The function COLLECT_LIST should not return duplicate items OR retain previous results - because in the previous table the id is the key.

Actual behaviour
Duplicate/Previous retained items are returned as part of the COLLECT_LIST. This should not be the case.
It is interesting to note that not all the items are being duplicated/retained - sometimes some of them are - sometimes all of them.
This leads me to believe that the COLLECT_LIST works - but it retains some items that were previously in the list - instead of replacing them.

@cadonna
Copy link
Member

cadonna commented Dec 30, 2023

@AleksandarTokarev This looks like duplicates that happen due to the at-least-once semantics (ALOS) KSQL guarantees by default. Under ALOS, when a failure happens records might be written multiple times to an aggregate or to an output topic [1]. To solve your issue, you could try to run the application under exactly-once semantics (EOS). I am wondering whether in your specific it would suffice to use COLLECT_SET [2] instead of COLLECT_LIST.

[1] https://docs.ksqldb.io/en/latest/operate-and-deploy/exactly-once-semantics/#at-least-once-semantics
[2] https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/aggregate-functions/#collect_set

@AleksandarTokarev AleksandarTokarev changed the title COLLECT_LIST in KSQL returning duplicate results COLLECT_LIST in KSQL returning duplicate results OR retaining previous results Dec 30, 2023
@AleksandarTokarev
Copy link
Author

AleksandarTokarev commented Dec 30, 2023

@cadonna Is it possible that the items are being retained with COLLECT_LIST ? Because those stale/old items are not in the ITEMS_TABLE table?

Also these ALOS semantics would be on the application level that writes to the ITEMS_TABLE I assume?
And also how would COLLECT_SET know which comparator to use and how to pick the latest item (in our case with the newest updatedTIme)?

@cadonna
Copy link
Member

cadonna commented Dec 30, 2023

Now, I realized that updatedTime is different for the "duplicates". I am sorry for that!
Could you please post a minimal example that reproduces what you experience?
More specifically:

  • the CREATE-statement for ITEMS_TABLE (BTW, in step 2 above it should be ITEMS_TABLE, right?)
  • when to restart ksql to reproduce the situation

@AleksandarTokarev
Copy link
Author

Yes, time updatedTime is different for the records, I guess you missed that.
The actual CREATE statement above is for the ITEMS_PER_USER_TABLE (pasting it below)- I will need to find the one for ITEMS_TABLE

CREATE or replace table ITEMS_PER_USER_TABLE
    WITH (PARTITIONS = 12, VALUE_FORMAT = 'JSON') AS
SELECT userId, COLLECT_LIST(
        STRUCT("name":=name, "price":=price, "id":=id, "updatedTime"=:updatedTime)) 
        AS items
FROM ITEMS_TABLE
GROUP BY userId
EMIT CHANGES;

These duplicates/retaining of items occurs when we have KSQLDB issues - and what is more strange to me is that even if the issues are there for few moments - the stale records would be gone from ITEMS_TABLE - and the actual new records are constantly being updated (therefore the updatedTime) - my assumption is that these duplicates/retaining would go away - but they are not - they are staying - but the ones updated are being updated.

@cadonna
Copy link
Member

cadonna commented Dec 30, 2023

At the moment, I do not understand why the updatedTime of the duplicates have values that do not exist in ITEMS_TABLE.

@AleksandarTokarev
Copy link
Author

AleksandarTokarev commented Dec 30, 2023

They do not exist because I have not put them above (and they do not exist in the beginning) - with time the items are being updated in ITEMS_TABLE.
After some time - they exist - but they old ones are gone from the ITEMS_TABLE. And the actual main issue that we have is with ITEMS_PER_USER_TABLE - because the new ones are there and updating - but we are seeing some stale ones (which in my opinion should not even appear because the COLLECT_LIST is collecting the items from the ITEMS_TABLE

@cadonna
Copy link
Member

cadonna commented Dec 30, 2023

I see. We need a minimal example to reproduce the situation, otherwise it is just guessing. I tried to repro it but was not able.

@AleksandarTokarev
Copy link
Author

What else can I provide in order to be more helpful?

@AleksandarTokarev
Copy link
Author

AleksandarTokarev commented Dec 30, 2023

Regarding the logs in our datadog system that I see for ksql when these issues occur - they are not really helpful - let me know if anything makes sense to you. I can try to look for more logs/data
image

@AleksandarTokarev
Copy link
Author

AleksandarTokarev commented Jan 2, 2024

ok @cadonna some more followup on the issues.
Here are the CREATE table statements for the other tables
ITEMS_TABLE

CREATE OR REPLACE TABLE ITEMS_TABLE WITH (CLEANUP_POLICY=‘compact’, KAFKA_TOPIC=‘ITEMS_TABLE’, 
PARTITIONS=12, REPLICAS=3, RETENTION_MS=600000) AS SELECT * FROM ITEMS_TABLE_INITIAL EMIT CHANGES;

It seems there is one more table that is backing up this ITEMS_TABLE - initial one - here is the DDL
ITEMS_TABLE_INITIAL

CREATE TABLE ITEMS_TABLE_INITIAL (ID STRING PRIMARY KEY, USERID BIGINT, NAME STRING, PRICE DOUBLE, UPDATEDTIME) WITH (KAFKA_TOPIC=‘items.active’, KEY_FORMAT=‘KAFKA’, VALUE_FORMAT=‘JSON’);

Also it seems that these issues have occurred when there was OOM in the KSQLDB - here is one of the logs when OOM occurred -

Process _confluent-ksql (pid: 1886306) triggered an OOM kill on itself. The process had reached 2560000 pages in size.
This OOM kill was invoked by a cgroup, containerID: xxxxx.

Few things that come on top of my mind:

  1. The actual is OOM is concerning - this should not happen in the first place. We might want to increased the RAM
  2. Do you think the issue can come from the fact that we are not using the INITIAL table - instead we are using the one created from it?

Thanks a lot

@suhas-satish suhas-satish added the streaming-engine Tickets owned by the ksqlDB Streaming Team label Jan 5, 2024
@cadonna
Copy link
Member

cadonna commented Jan 5, 2024

@AleksandarTokarev The best thing would be, if you could provide a reproduction with a docker-compose setup. You can find a docker-compose setup that you can re-use under the following link: https://ksqldb.io/quickstart.html#quickstart-content
Once we can reproduce the issue, it gets easier to find the root cause. Otherwise, it is a lot of crystal balling.

@AleksandarTokarev
Copy link
Author

AleksandarTokarev commented Jan 19, 2024

@cadonna we have added exactly once as a server parameter and it was good for like 2 weeks. Today we upgraded to ARM instances and during the upgrade there was an issue with the volumes/resources and the bug occurred again.
We are thinking of writing our custom UDAF where we would be dedupping the LIST in the aggregate part.
Is there a proper example on how to do that? Or maybe some further hints on how to do it.

Also i am not sure if i understand - but how are the merge and undo functions used in the COLLECT_LIST example?
https://github.com/confluentinc/ksql/blob/master/ksqldb-engine/src/main/java/io/confluent/ksql/function/udaf/array/CollectListUdaf.java#L104-L116

@AleksandarTokarev
Copy link
Author

AleksandarTokarev commented Jan 22, 2024

@cadonna I think i have finally found a way how to reproduce this.
The reproducing does not happen always at the same time - but it pretty much happens often.

docker compose file

version: "3"

networks:
  kafka-net:
    name: kafka-net
    driver: bridge

services:
  zookeeper:
    image: docker.io/bitnami/zookeeper:3.8
    container_name: zookeeper
    restart: unless-stopped
    networks:
      - kafka-net
    ports:
      - "2181:2181"
    environment:
      ALLOW_ANONYMOUS_LOGIN: "yes"

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    ports:
      - 8080:8080
    networks:
      - kafka-net
    environment:
      - KAFKA_CLUSTERS_0_NAME=local
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:29092
      - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
    depends_on:
      - kafka

  kafka:
    image: docker.io/bitnami/kafka:3
    container_name: kafka
    restart: unless-stopped
    networks:
      - kafka-net
    ports:
      - "9092:9092"
    environment:
      ALLOW_PLAINTEXT_LISTENER: "yes"
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER_INTERNAL:PLAINTEXT,DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_CFG_LISTENERS: DOCKER_INTERNAL://:29092,DOCKER_EXTERNAL://:9092
      KAFKA_CFG_ADVERTISED_LISTENERS: DOCKER_INTERNAL://kafka:29092,DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
      KAFKA_CFG_INTER_BROKER_LISTENER_NAME: DOCKER_INTERNAL
      KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CFG_BROKER_ID: 1
      KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
    depends_on:
      - zookeeper

  kafka-ksqldb:
    image: confluentinc/ksqldb-server:0.29.0
    container_name: kafka-ksqldb
    restart: unless-stopped
    networks:
      - kafka-net
    ports:
      - "8088:8088"
    environment:
      KSQL_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:29092
      KSQL_LISTENERS: http://0.0.0.0:8088/
      KSQL_KSQL_SERVICE_ID: kafka-ksqldb
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
      KSQL_KSQL_EXTENSION_DIR: "/data/udfs"
    volumes:
      - ./data/kafka-ksqldb-data/scripts:/data/scripts/
      - ./data/kafka-ksqldb-data/udfs:/data/udfs/
    depends_on:
      - kafka

Steps:
a) Create topic users.items
b) Create table on top of the topic

CREATE TABLE USERS_ITEMS_INITIAL (F_KEY STRING PRIMARY KEY, USERID BIGINT, NAME STRING, DESCRIPTION STRING, AMOUNT DOUBLE, UPDATED BIGINT) WITH (KAFKA_TOPIC='users.items', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');

c) Use a COLLECT_LIST

CREATE TABLE IF NOT EXISTS ITEMS_PER_USER_TABLE WITH (KAFKA_TOPIC='ITEMS_PER_USER_TABLE', PARTITIONS=3, VALUE_FORMAT='JSON') AS SELECT USERS_ITEMS_INITIAL.USERID USERID,
  COLLECT_LIST(STRUCT(`NAME`:=USERS_ITEMS_INITIAL.NAME, `DESCRIPTION`:=USERS_ITEMS_INITIAL.DESCRIPTION, `AMOUNT`:=USERS_ITEMS_INITIAL.AMOUNT, `UPDATED`:=USERS_ITEMS_INITIAL.UPDATED)) ACTIVE_ITEMS
FROM USERS_ITEMS_INITIAL
GROUP BY USERS_ITEMS_INITIAL.USERID
EMIT CHANGES;

Do some insertions in the users.items topic

Key
10-T-TOM
Value
{
	"name": "TOMATOES",
	"description": "They are tasty",
	"amount": 1,
	"updated": 1905906822314,
	"userId": 10
}

Key
10-T-TOM
Value
{
	"name": "TOMATOES",
	"description": "They are tasty",
	"amount": 2,
	"updated": 1925906822314,
	"userId": 10
}

Key
10-T-TOM
Value
{
	"name": "TOMATOES",
	"description": "They are tasty",
	"amount": 3,
	"updated": 1935906822314,
	"userId": 10
}

Key
10-T-POT
Value
{
	"name": "Potatoes",
	"description": "They are tasty too",
	"amount": 0,
	"updated": 1905906822314,
	"userId": 10
}

Key
10-T-POT
Value
{
	"name": "Potatoes",
	"description": "They are tasty too",
	"amount": 1,
	"updated": 1915906822314,
	"userId": 10
}

After the last one - i seem to have ended with 2 Potatoes records in the resulting table - which is not what I want

{
	"ACTIVE_ITEMS": [
		{
			"NAME": "TOMATOES",
			"DESCRIPTION": "They are tasty",
			"AMOUNT": 3,
			"UPDATED": 1935906822314
		},
		{
			"NAME": "Potatoes",
			"DESCRIPTION": "They are tasty too",
			"AMOUNT": 0,
			"UPDATED": 1905906822314
		},
		{
			"NAME": "Potatoes",
			"DESCRIPTION": "They are tasty too",
			"AMOUNT": 1,
			"UPDATED": 1915906822314
		}
	]
}

Here is a screenshot
image

DISCLAIMER: This does not happen always and for the same records - it is kinda intermittent.
Would gladly appreciate the help

UPDATE 1: It seems this happens when same message key goes into different partitions. I wonder why and if this could happen in the failure scenarios (like OOM, volume issues, etc)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug needs-triage streaming-engine Tickets owned by the ksqlDB Streaming Team
Projects
None yet
Development

No branches or pull requests

3 participants