Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug: KafkaSource Lack of Schema Registry Support for Messages #1529

Open
tobeberger opened this issue Jan 31, 2024 · 0 comments · May be fixed by #1530
Open

Bug: KafkaSource Lack of Schema Registry Support for Messages #1529

tobeberger opened this issue Jan 31, 2024 · 0 comments · May be fixed by #1530

Comments

@tobeberger
Copy link

Issue Description

When utilizing the official Kafka Client with Schema Registry enabled, messages produced include a leading 0 followed by the next 4 bytes representing the SchemaId. Consequently, the actual message content begins at byte 6 (index 5).

The TriggerMesh KafkaSource currently overlooks this behavior, resulting in the generated events being invalid JSON. As a consequence, these events are NULL cannot be processed successfully.

Steps to Reproduce

  • Enable Schema Registry for the Kafka Client.
  • Produce messages using the Kafka Client.
  • Observe the leading 0 and SchemaId in the message structure.
  • Trigger the KafkaSource in a scenario where it consumes these messages.

Expected Behavior

The KafkaSource should handle messages produced with Schema Registry, recognizing the leading 0 and SchemaId, and correctly extracting the actual message content from byte 6 onwards.

Actual Behavior

The KafkaSource does not account for the Schema Registry format, resulting in invalid JSON events that cannot be processed.

Code from the Confluent Kafka Go Client

JsonSerializer

func (s *Serializer) Serialize(topic string, msg interface{}) ([]byte, error) {
	if msg == nil {
		return nil, nil
	}
	jschema := jsonschema.Reflect(msg)
	raw, err := json.Marshal(jschema)
	if err != nil {
		return nil, err
	}
	info := schemaregistry.SchemaInfo{
		Schema:     string(raw),
		SchemaType: "JSON",
	}
	id, err := s.GetID(topic, msg, &info)
	if err != nil {
		return nil, err
	}
	raw, err = json.Marshal(msg)
	if err != nil {
		return nil, err
	}
	if s.validate {
		// Need to unmarshal to pure interface
		var obj interface{}
		err = json.Unmarshal(raw, &obj)
		if err != nil {
			return nil, err
		}
		jschema, err := toJSONSchema(s.Client, info)
		if err != nil {
			return nil, err
		}
		err = jschema.Validate(obj)
		if err != nil {
			return nil, err
		}
	}
	payload, err := s.WriteBytes(id, raw)
	if err != nil {
		return nil, err
	}
	return payload, nil
}

Serde

func (s *BaseSerializer) WriteBytes(id int, msgBytes []byte) ([]byte, error) {
	var buf bytes.Buffer
	err := buf.WriteByte(magicByte)
	if err != nil {
		return nil, err
	}
	idBytes := make([]byte, 4)
	binary.BigEndian.PutUint32(idBytes, uint32(id))
	_, err = buf.Write(idBytes)
	if err != nil {
		return nil, err
	}
	_, err = buf.Write(msgBytes)
	if err != nil {
		return nil, err
	}
	return buf.Bytes(), nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant