Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to query existing data in kafka cluster. AVRO deserialization error #10147

Open
Mr-Esweg opened this issue Dec 4, 2023 · 1 comment
Open

Comments

@Mr-Esweg
Copy link

Mr-Esweg commented Dec 4, 2023

Hi everyone.

I'm a newcomer to KSQL, but not to Kafka.

We're wanting to use KSQL to perform some stream queries on many of our Kafka topics, but we're having some trouble, and were hoping some of you kind folks could help us out.

Our current setup is that we have nested .avsc files which define our schema.

These are in the general format of

[
    {
        "namespace": "com.package.subclass",
        "type": "enum",
        "name": "Enum",
        "symbols": [
            "TYPE1",
            "TYPE2",
        ]
    },
    {
        "namespace": "com.package.subclass",
        "type": "record",
        "name": "SubField",
        "fields": [
            {
                "name": "field1",
                "type": [
                    "null",
                    "string"
                ]
            },
            {
                "name": "field2",
                "type": [
                    "null",
                    "string"
                ]
            }
        ]
    },
    {
        "namespace": "com.package",
        "type": "record",
        "name": "OurClass",
        "fields": [
            {
                "name": "enum",
                "type": "com.package.subclass.enum"
            },
            {
                "name": "subfield",
                "type": "com.package.subclass.SubField"
            }
        ]
    }
]

We use the avro-maven-plugin to convert the avsc file into POJOs.

Our application services (Kafka producers) then create POJOs put the POJOs onto various Kafka topics.

We're using serdes which look like this...

public class OurClassSerde extends WrapperSerde<OurClass>
{
    public OurClassSerde()
    {
        super(new AvroSerializerBase<OurClass>(), new OurClassDeserializer());
    }
}

We use the build-in serialiser.

Our deserialiser is

OurClassDeserializer<T extends SpecificRecordBase> implements Deserializer<T>

The core of which is

DatumReader<T> datumReader = new SpecificDatumReader<T>(clazz.newInstance().getSchema());
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
returnObject = (T) datumReader.read(null, decoder);

clazz is the class of the POJO we're deserialising.

As far as I'm aware this is not using wire-format. This may be the problem.

This all works, and has been functioning for years in its own right.

We now want to use KSQL, as we can see a very large increase in the amount of data we need to process, and stream processing will scale better than running aggregations on static data.

We're running in Docker (Docker Desktop for dev purposes, backed by WSL2), and have used the recommended zookeeper / kafka / KSQLSB / KSQL CLI / schema-registry compose file. Just in case that makes a difference.

We've created a stream using...

CREATE STREAM teststream  WITH (KAFKA_TOPIC='TopicOurData', VALUE_FORMAT='AVRO');

This doesn't fail.

When we describe the stream, it has correctly identified all of the sub fields, and sub-sub fields.

As soon as we create a query to read from the stream, all of the data is instantly consumed, along with the following error...

deserialization error

The error we're seeing makes me think that we're not using wire-format.

If we try to manually specify the shape of the data using a STRUCT, we get the same error.

We have a fundamental problem with using the schema-registry, as when we set it up, it doesn't seem to "support" nested schemas.

I can't find a way of manually creating a schema in a single JSON object which is accepted by schema-registry. We can create a file which the avro-maven-plugin will use, but schema-registry doesn't like it.

If we un-nest the schema, and specify the sub-sub fields, sub-fields and fields (as it looks in the avsc example above), when we add the file to schema-registry it accepts it, but when we create a stream, it adds all sub-sub-fields, then sub-fields, then the actual object last of all. It doesn't feel like that is correct.

We have so many questions!

  • Is the problem we're seeing (Unknown Magic Byte!) definitely down to the fact that we're not using the wire-format?
  • As the POJO data we are inserting into Kafka has a built-in schema, and seems to be recognised by KSQL, do we need schema-registry? We have the schemas in one place, in one file, which we use to create the POJOs. If we can get the topic to auto-register the schema from the POJOs, that's a relief for us, as we only need to maintain a single point of truth.
  • If we need schema registry, how can we import our single AVSC file (which contains all shared sub-sub fields, sub fields, and fields). If we have to have one file per top-level object, how can we import the sub-field and sub-sub field objects in a generic manner, so each one isn't associated specifically to a topic, but is just present?
  • Is there a Kafka provided Java class to perform wire-format serialisation and deserialisation, or do we have to pack the magic byte and 4 bytes of version ourselves, manually? If we need to, this is fine, but if there's built-in functionality there, I'd rather we used it.

If anyone can help us out here, we'd be extremely grateful.

Thanks.

@Mr-Esweg
Copy link
Author

Mr-Esweg commented Dec 4, 2023

tried to modify the serialiser and deserialiser to add 5 bytes of 0s but no luck

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant