Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ksqlDB unable to deserialize JSON topic data to MAP format #10161

Open
cpettigre opened this issue Dec 12, 2023 · 0 comments
Open

ksqlDB unable to deserialize JSON topic data to MAP format #10161

cpettigre opened this issue Dec 12, 2023 · 0 comments

Comments

@cpettigre
Copy link

cpettigre commented Dec 12, 2023

Describe the bug
This issue appears when running a SELECT statement against a Stream that defines a MAP, where the MAP data has been stored in JSON_SR format. The issue only affects ksqlDB on Confluent Cloud.

To Reproduce
Create a topic MAP_TEST, then:

CREATE STREAM MAP_TEST (ID INT KEY, DATA STRUCT<VAL MAP<STRING, STRING>>)
WITH (kafka_topic='MAP_TEST', partitions=1, value_format='json_sr');

INSERT INTO MAP_TEST (ID, DATA) VALUES (1, STRUCT(VAL := MAP('a' := '1', 'b' := '2')));

SELECT * FROM MAP_TEST;

Expected behavior
A new row should be emitted showing the inserted data.

Actual behaviour
No data is emitted and the ksqlDB processing logs show:

ERRORMESSAGE": "Failed to deserialize value from topic: MAP_TEST. Can't convert type. sourceType: ArrayNode, requiredType: MAP<VARCHAR, VARCHAR>, path: $.DATA.VAL"

Additional context
The INSERT INTO registers a schema with an array of objects (see below). Note the schema shows "connect.type":"map". It seems that the consumer used by the SELECT statement doesn't know how to deserialise the array to the map format.

{
  "properties": {
    "DATA": {
      "connect.index": 0,
      "oneOf": [
        {
          "type": "null"
        },
        {
          "properties": {
            "VAL": {
              "connect.index": 0h,
              "oneOf": [
                {
                  "type": "null"
                },
                {
                  "connect.type": "map",
                  "items": {
                    "properties": {
                      "key": {
                        "connect.index": 0,
                        "oneOf": [
                          {
                            "type": "null"
                          },
                          {
                            "type": "string"
                          }
                        ]
                      },
                      "value": {
                        "connect.index": 1,
                        "oneOf": [
                          {
                            "type": "null"
                          },
                          {
                            "type": "string"
                          }
                        ]
                      }
                    },
                    "type": "object"
                  },
                  "type": "array"
                }
              ]
            }
          },
          "type": "object"
        }
      ]
    }
  },
  "type": "object"
}

Workaround
define the stream using the array of structs (as is the case under the hood) instead of a map, as in the below example:

CREATE STREAM ARR_TEST (ID INTEGER KEY, DATA STRUCT<VAL ARRAY<STRUCT<`value` STRING, `key` STRING>>>) WITH (KAFKA_TOPIC='ARR_TEST', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON_SR');

This can handle inserts and queries on the struct fields in the below manner

INSERT INTO ARR_TEST (ID, DATA) VALUES (1, STRUCT(VAL := ARRAY[STRUCT(`value` := '1', `key` := 'a'), STRUCT(`value` := '2', `key` := 'b')]));
SELECT ID, FILTER(DATA->VAL, (s) => s->key = 'a' AND s->value = '1') AS DATA FROM ARR_TEST;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant