Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HEADERS and HEADER column can't be selected, returns null or other column values. #10123

Open
mrt181 opened this issue Nov 17, 2023 · 0 comments

Comments

@mrt181
Copy link

mrt181 commented Nov 17, 2023

I am currently evaluating ksqldb and confluent cloud as a possible solution for a new project.

I am using this setup:

CONTAINER ID  IMAGE                                                      COMMAND               CREATED     STATUS       PORTS                                           NAMES
d80264a9b068  docker.io/confluentinc/cp-kafka:7.5.2                      /etc/confluent/do...  2 days ago  Up 2 days    0.0.0.0:9092->9092/tcp, 0.0.0.0:9101->9101/tcp  broker
c7a682693a9a  docker.io/confluentinc/cp-schema-registry:7.5.2            /etc/confluent/do...  2 days ago  Up 30 hours  0.0.0.0:8081->8081/tcp                          schema-registry
6745244f5009  docker.io/cnfldemos/cp-server-connect-datagen:0.6.2-7.5.0  /etc/confluent/do...  2 days ago  Up 2 days    0.0.0.0:8083->8083/tcp                          connect
86f2b921d238  docker.io/confluentinc/cp-kafka-rest:7.5.2                 /etc/confluent/do...  2 days ago  Up 2 days    0.0.0.0:8082->8082/tcp                          rest-proxy
da8baf91b142  docker.io/confluentinc/cp-ksqldb-server:7.5.2              /etc/confluent/do...  2 days ago  Up 2 days    0.0.0.0:8088->8088/tcp                          ksqldb-server
fc61a849dd37  docker.io/confluentinc/cp-enterprise-control-center:7.5.2  /etc/confluent/do...  2 days ago  Up 2 days    0.0.0.0:9021->9021/tcp                          control-center
d252df63d02e  docker.io/confluentinc/cp-ksqldb-cli:7.5.2                                       2 days ago  Up 2 days                                                    ksqldb-cli

I have created this table:

CREATE SOURCE TABLE locations (
    id VARCHAR PRIMARY KEY,
    meta ARRAY<STRUCT<key STRING, value BYTES>> HEADERS
) WITH (
    KAFKA_TOPIC = 'locations',
    VALUE_FORMAT = 'JSON_SR',
    VALUE_SCHEMA_ID = 1
);

That uses this schema:

{
  "schemaType":"JSON",
  "schema":"{\"$id\":\"http://schema-registry:8081/schemas/ids/1\",\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",\"title\":\"Locationvalue\",\"type\":\"object\",\"properties\":{\"profileId\":{\"type\":\"string\",\"description\":\"The id of the location.\"},\"latitude\":{\"type\":\"number\",\"minimum\":-90,\"maximum\":90,\"description\":\"The location's latitude.\"},\"longitude\":{\"type\":\"number\",\"minimum\":-180,\"maximum\":180,\"description\":\"The location's longitude.\"}}}"
}

Then I produce data with headers:

kafka-json-schema-console-producer \
--bootstrap-server broker:9092 \
--topic locations \
--property key.serializer=org.apache.kafka.common.serialization.StringSerializer \
--property parse.key=true \
--property parse.headers=true \
--property schema.registry.url=http://schema-registry:8081 \
--property value.schema.id=1

x-correlationid:36d2784c-8233-11ee-b2ce-05ab766c73d1	asd	{"profileId":"asd","latitude":0,"longitude":-1}

And I am able to see it in the table:

ksql> SELECT * FROM locations;
+------------------+------------------+------------------+------------------+------------------+
|ID                |META              |profileId         |latitude          |longitude         |
+------------------+------------------+------------------+------------------+------------------+
|asd               |[{KEY=x-correlatio|asd               |0.0               |-1.0              |
|                  |nid, VALUE=MzZkMjc|                  |                  |                  |
|                  |4NGMtODIzMy0xMWVlL|                  |                  |                  |
|                  |WIyY2UtMDVhYjc2NmM|                  |                  |                  |
|                  |3M2Qy}]           |                  |                  |                  |
Query terminated
ksql> describe locations;

Name                 : LOCATIONS
 Field     | Type
-----------------------------------------------------------------------
 META      | ARRAY<STRUCT<KEY VARCHAR(STRING), VALUE BYTES>> (headers)
 ID        | VARCHAR(STRING)  (primary key)
 profileId | VARCHAR(STRING)
 latitude  | DOUBLE
 longitude | DOUBLE
-----------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;

My problems start when I want to access the META column:

ksql> SELECT ID, META FROM locations;
+--------------------------------------------------+--------------------------------------------------+
|ID                                                |META                                              |
+--------------------------------------------------+--------------------------------------------------+
|asd                                               |-1.0                                              |

META returns the values from column longitude?

Trying to access the data in column META does also not work:

ksql> SELECT ID, META[0]->KEY FROM locations;
+--------------------------------------------------+--------------------------------------------------+
|ID                                                |KEY                                               |
+--------------------------------------------------+--------------------------------------------------+
|asd                                               |null                                              |
Query terminated
ksql> SELECT ID, META[0]->KEY FROM locations;
+--------------------------------------------------+--------------------------------------------------+
|ID                                                |KEY                                               |
+--------------------------------------------------+--------------------------------------------------+
|asd                                               |null                                              |
Query terminated
ksql> SELECT ID, FROM_BYTES(META[0]->VALUE, 'ascii') FROM locations;
+--------------------------------------------------+--------------------------------------------------+
|ID                                                |KSQL_COL_0                                        |
+--------------------------------------------------+--------------------------------------------------+
|asd                                               |null                                              |
Query terminated
ksql> SELECT ID, FROM_BYTES(META[0]->VALUE, 'base64') FROM locations;
+--------------------------------------------------+--------------------------------------------------+
|ID                                                |KSQL_COL_0                                        |
+--------------------------------------------------+--------------------------------------------------+
|asd                                               |null                                              |
Query terminated
ksql> SELECT ID, FROM_BYTES(META[0]->VALUE, 'hex') FROM locations;
+--------------------------------------------------+--------------------------------------------------+
|ID                                                |KSQL_COL_0                                        |
+--------------------------------------------------+--------------------------------------------------+
|asd                                               |null                                              |
Query terminated
ksql> SELECT ID, FROM_BYTES(META[0]->VALUE, 'utf8') FROM locations;
+--------------------------------------------------+--------------------------------------------------+
|ID                                                |KSQL_COL_0                                        |
+--------------------------------------------------+--------------------------------------------------+
|asd                                               |null                                              |
Query terminated

How can I use column META?

This looks similar to this issue: #8895

When I recreate my table to get direct access to the header value:

ksql> CREATE SOURCE TABLE locations (
    id VARCHAR PRIMARY KEY,
    correlationid BYTES HEADER('x-correlationid')
) WITH (
    KAFKA_TOPIC = 'locations',
    VALUE_FORMAT = 'JSON_SR',
    VALUE_SCHEMA_ID = 1
>);

 Message
---------------------------------------
 Created query with ID CST_LOCATIONS_5
---------------------------------------
ksql> describe locations;

Name                 : LOCATIONS
 Field         | Type
-----------------------------------------------------------------
 CORRELATIONID | BYTES            (header('x-correlationid'))
 ID            | VARCHAR(STRING)  (primary key)
 profileId     | VARCHAR(STRING)
 latitude      | DOUBLE
 longitude     | DOUBLE
-----------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;
ksql> SELECT * from locations;
+------------------+------------------+------------------+------------------+------------------+
|ID                |CORRELATIONID     |profileId         |latitude          |longitude         |
+------------------+------------------+------------------+------------------+------------------+
|asd               |MzZkMjc4NGMtODIzMy|asd               |0.0               |-1.0              |
|                  |0xMWVlLWIyY2UtMDVh|                  |                  |                  |
|                  |Yjc2NmM3M2Qx      |                  |                  |                  |
Query terminated

I see my header value in the new column.

The value MzZkMjc4NGMtODIzMy0xMWVlLWIyY2UtMDVhYjc2NmM3M2Qx is the string 36d2784c-8233-11ee-b2ce-05ab766c73d1 in base64.

But when I try to decode it I get a null column and the values shift to the other columns:

ksql> SELECT ID, FROM_BYTES(CORRELATIONID, 'base64') as CORRELATIONID, `profileId`, `latitude`, `longitude` from locations;
+------------------+------------------+------------------+------------------+------------------+
|ID                |CORRELATIONID     |profileId         |latitude          |longitude         |
+------------------+------------------+------------------+------------------+------------------+
|asd               |null              |MzZkMjc4NGMtODIzMy|asd               |0.0               |
|                  |                  |0xMWVlLWIyY2UtMDVh|                  |                  |
|                  |                  |Yjc2NmM3M2Qx      |                  |                  |
Query terminated

Selecting just that column also does not work:

ksql> SELECT CORRELATIONID from locations;
+----------------------------------------------------------------------------------------------+
|CORRELATIONID                                                                                 |
+----------------------------------------------------------------------------------------------+
|-1.0                                                                                          |
Query terminated
ksql> SELECT `CORRELATIONID` from locations;
+----------------------------------------------------------------------------------------------+
|CORRELATIONID                                                                                 |
+----------------------------------------------------------------------------------------------+
|-1.0                                                                                          |
Query terminated

When I recreate the table as a non SOURCE table I can select the CORRELATIONID column:

ksql> CREATE TABLE locations (
    id VARCHAR PRIMARY KEY,
    correlationid BYTES HEADER('x-correlationid')
) WITH (
    KAFKA_TOPIC = 'locations',
    VALUE_FORMAT = 'JSON_SR',
    VALUE_SCHEMA_ID = 1
);

 Message
---------------
 Table created
---------------
ksql> SELECT CORRELATIONID from locations EMIT CHANGES;
+----------------------------------------------------------------------------------------------+
|CORRELATIONID                                                                                 |
+----------------------------------------------------------------------------------------------+
|                                                                                              |
|MzZkMjc4NGMtODIzMy0xMWVlLWIyY2UtMDVhYjc2NmM3M2Qx                                              |
^CQuery terminated
ksql> SELECT CORRELATIONID, FROM_BYTES(CORRELATIONID, 'base64') from locations EMIT CHANGES;
+-------------------------------------------------+-------------------------------------------------+
|CORRELATIONID                                    |KSQL_COL_0                                       |
+-------------------------------------------------+-------------------------------------------------+
|                                                 |                                                 |
|MzZkMjc4NGMtODIzMy0xMWVlLWIyY2UtMDVhYjc2NmM3M2Qx |MzZkMjc4NGMtODIzMy0xMWVlLWIyY2UtMDVhYjc2NmM3M2Qx |
^CQuery terminated

But decoding that column still does not work.
Using a STREAM instead of a TABLE doesn't change this behavior.

I also tried to to create a simple table:

ksql> CREATE table test (key varchar primary key, value BYTES)
  WITH (kafka_topic='test', value_format='json', partitions=1);

 Message
---------------
 Table created
---------------
ksql> INSERT INTO test (KEY, VALUE) VALUES ('A', TO_BYTES('36d2784c823311eeb2ce05ab766c73d1', 'base64'));
ksql> SELECT KEY, VALUE, FROM_BYTES(VALUE, 'base64') AS FOO FROM test EMIT CHANGES;
+--------------------------------+--------------------------------+--------------------------------+
|KEY                             |VALUE                           |FOO                             |
+--------------------------------+--------------------------------+--------------------------------+
|A                               |36d2784c823311eeb2ce05ab766c73d1|36d2784c823311eeb2ce05ab766c73d1|
^CQuery terminated

The given value for the VALUE column is not encoded as base64? Or is it decoded implicitly for the output?

When I try to insert this string I get an error:

ksql> INSERT INTO test (KEY, VALUE) VALUES ('B', TO_BYTES('36d2784c-8233-11ee-b2ce-05ab766c73d1', 'base64'));
Failed to insert values into 'TEST'.

the ksqldb-server has this exception logged:

[2023-11-17 13:09:10,692] INFO Processed unsuccessfully: KsqlRequest{configOverrides={auto.offset.reset=earliest}, requestProperties={}, commandSequenceNumber=Optional[40]} (f102667e-3ffa-3bf1-ba78-f7daaef003ed): INSERT INTO stream1 (column1, column2) VALUES ('[string]' ,'[string]'); (io.confluent.ksql.logging.query.QueryLogger)
io.confluent.ksql.util.KsqlStatementException: Failed to insert values into 'TEST'.
        at io.confluent.ksql.rest.server.execution.InsertValuesExecutor.buildRecord(InsertValuesExecutor.java:293)
        at io.confluent.ksql.rest.server.execution.InsertValuesExecutor.execute(InsertValuesExecutor.java:166)
        at io.confluent.ksql.rest.server.validation.CustomValidators.validate(CustomValidators.java:145)
        at io.confluent.ksql.rest.server.validation.RequestValidator.validate(RequestValidator.java:166)
        at io.confluent.ksql.rest.server.validation.RequestValidator.validate(RequestValidator.java:129)
        at io.confluent.ksql.rest.server.resources.KsqlResource.handleKsqlStatements(KsqlResource.java:310)
        at io.confluent.ksql.rest.server.KsqlServerEndpoints.lambda$executeKsqlRequest$2(KsqlServerEndpoints.java:183)
        at io.confluent.ksql.rest.server.KsqlServerEndpoints.lambda$executeOldApiEndpointOnWorker$24(KsqlServerEndpoints.java:348)
        at io.confluent.ksql.rest.server.KsqlServerEndpoints.lambda$executeOnWorker$23(KsqlServerEndpoints.java:334)
        at io.vertx.core.impl.ContextBase.lambda$null$0(ContextBase.java:137)                                                                           
        at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:264)                                                                        
        at io.vertx.core.impl.ContextBase.lambda$executeBlocking$1(ContextBase.java:135)                                                                
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)                                                    
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)                                                    
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)                                                        
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: io.confluent.ksql.util.KsqlException:
        at io.confluent.ksql.engine.generic.GenericExpressionResolver$1.error(GenericExpressionResolver.java:62)
        at io.confluent.ksql.execution.codegen.CompiledExpression.evaluate(CompiledExpression.java:96)
        at io.confluent.ksql.engine.generic.GenericExpressionResolver$Visitor.visitExpression(GenericExpressionResolver.java:114)
        at io.confluent.ksql.engine.generic.GenericExpressionResolver$Visitor.visitExpression(GenericExpressionResolver.java:99)
        at io.confluent.ksql.execution.expression.tree.VisitParentExpressionVisitor.visitFunctionCall(VisitParentExpressionVisitor.java:101)
        at io.confluent.ksql.execution.expression.tree.FunctionCall.accept(FunctionCall.java:65)                                                        
        at io.confluent.ksql.execution.expression.tree.ExpressionVisitor.process(ExpressionVisitor.java:21)
        at io.confluent.ksql.engine.generic.GenericExpressionResolver.resolve(GenericExpressionResolver.java:96)
        at io.confluent.ksql.engine.generic.GenericRecordFactory.resolveValues(GenericRecordFactory.java:161)                                           
        at io.confluent.ksql.engine.generic.GenericRecordFactory.build(GenericRecordFactory.java:90)                                                    
        at io.confluent.ksql.rest.server.execution.InsertValuesExecutor.buildRecord(InsertValuesExecutor.java:271)
        ... 15 more

It looks like base64 byte encoding is broken.
Any non alphanumeric string causes an error when using base64.

I can do this:

ksql> INSERT INTO test (KEY, VALUE) VALUES ('C', TO_BYTES('1234567890', 'base64'));
ksql> INSERT INTO test (KEY, VALUE) VALUES ('D', TO_BYTES('1234567890', 'utf8'));
ksql> SELECT KEY, VALUE, FROM_BYTES(VALUE, 'base64') AS FOO FROM test EMIT CHANGES;
+--------------------------------+--------------------------------+--------------------------------+
|KEY                             |VALUE                           |FOO                             |
+--------------------------------+--------------------------------+--------------------------------+
|C                               |123456789w==                    |123456789w==                    |
|D                               |MTIzNDU2Nzg5MA==                |MTIzNDU2Nzg5MA==                |
^CQuery terminated
ksql> SELECT KEY, VALUE, FROM_BYTES(VALUE, 'utf8') AS FOO FROM test EMIT CHANGES;
+--------------------------------+--------------------------------+--------------------------------+
|KEY                             |VALUE                           |FOO                             |
+--------------------------------+--------------------------------+--------------------------------+
|C                               |123456789w==                    |�m����                          |
|D                               |MTIzNDU2Nzg5MA==                |1234567890                      |
^CQuery terminated

Notice how the value in row Ccolumn FOO is not decoded when using base64 but it works in row D when decoding the utf8 value.

UPDATE:
I have tried this with a my initial SOURCE table:

ksql> SELECT KEY, FROM_BYTES(CORRELATIONID, 'utf8') FROM locations;
+--------------------------------------------------+--------------------------------------------------+
|KEY                                               |KSQL_COL_0                                        |
+--------------------------------------------------+--------------------------------------------------+
|asd                                               |null                                              |
Query terminated
ksql> SELECT KEY, FROM_BYTES(CORRELATIONID, 'utf8') FROM locations EMIT CHANGES;
+--------------------------------------------------+--------------------------------------------------+
|KEY                                               |KSQL_COL_0                                        |
+--------------------------------------------------+--------------------------------------------------+
|asd                                               |36d2784c-8233-11ee-b2ce-05ab766c73d1              |
^CQuery terminated

Which only gives me the correct result on a push query but not on a pull query.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant