New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Schema Registry Issue | Serialization/Deserialization Decimal Logical Type #1040
Comments
Under the hood benthos is using github.com/linkedin/goavro/v2, specifically using https://pkg.go.dev/github.com/linkedin/goavro/v2#Codec.NativeFromBinary and https://pkg.go.dev/github.com/linkedin/goavro/v2#Codec.BinaryFromNative to do conversions. For the purpose of decoding avro into a generic format we could probably solve this to a satisfactory level by walking the resulting structure and manually picking out these values. However, I'm not sure what we can do for serialization without either forking or commiting upstream the ability to work with I've therefore flagged as needs investigation. At the same time we ought to look into any other custom types that we might need to convert to/from. |
Perfect, thank you so much! Also, at first We tried to implement a couple of plugging in order to do the conversations, I can shared them with you, maybe they can be useful: Convert_float_to_bigRatNotes: The problem is when the result is too big we lose precision of the value, for example: func ConvertFloatToBigRat(floatVal float64) (interface{}, error) {
bigRatValue := new(big.Rat).SetFloat64(floatVal)
return bigRatValue, nil
} Convert_bigRat_to_floatNotes: Returns a Float number with three fractional numbers (for example: func ConvertBigRatToFloat(bigRatValue string) (interface{}, error) {
resultBigRat, exact := new(big.Rat).SetString(strings.Replace(bigRatValue,"\"","",-1))
if exact == false {
err := errors.New("Argument BigRat cannot be undefined or null")
return nil, err
}
floatVal,exactValue := resultBigRat.Float64()
if exactValue {
v := math.Round(floatVal*100)/1000
return v,nil
}
return floatVal, nil
} Maybe they can be useful:) |
First of all, thanks for reporting this bug and providing reproduction steps! TL; DR: Note that this fix I made to I did some digging and testing and here is what I found. Brace yourselves, this is going to be a doozy... In order to test stuff I ran the following prerequisites: > docker run --rm -p 8081:8081 -p 8082:8082 -p 9092:9092 --name redpanda docker.vectorized.io/vectorized/redpanda redpanda start --smp 1 --overprovisioned --kafka-addr 0.0.0.0:9092 --advertise-kafka-addr 0.0.0.0:9092 --pandaproxy-addr 0.0.0.0:8082 --advertise-pandaproxy-addr 0.0.0.0:8082
> curl --location --request POST 'http://localhost:8081/subjects/payment/versions' --header 'Content-Type: application/json' --data-raw '{"schema": "{ \"type\": \"record\", \"name\": \"payment\", \"fields\": [ { \"name\" : \"departmentId\" , \"type\" : \"string\" }, { \"name\" : \"paymentFee\" , \"type\" : { \"type\": \"bytes\", \"logicalType\": \"decimal\", \"precision\": 18, \"scale\": 3 }} ]}"}' For the serialisation issue, as you already noticed, it looks like the underlying goavro library expects the number to be stored as a string in the JSON for this particular schema. For example, this works: input:
generate:
mapping: |
root = {
"departmentId": "123HumanResources",
"paymentFee": 23.900
}
interval: 0s
count: 1
pipeline:
processors:
- bloblang: |
root = this
root.paymentFee = this.paymentFee.string()
- schema_registry_encode:
url: http://localhost:8081
subject: payment
refresh_period: 10m
avro_raw_json: true
- log:
message: "Encode error: ${! error() }"
output:
kafka:
addresses:
- localhost:9092
topic: payment
client_id: benthos Note that I'm setting I decided to check what Java does in this case and it barfs with error // > export JAVA_HOME=/usr/local/opt/openjdk
// > export PATH="${JAVA_HOME}/bin:$PATH"
// > java --version
// openjdk 18.0.1 2022-04-19
// OpenJDK Runtime Environment Homebrew (build 18.0.1+0)
// OpenJDK 64-Bit Server VM Homebrew (build 18.0.1+0, mixed mode, sharing)
// > java -cp avro_1.11/avro-1.11.0.jar:avro_1.11/jackson-core-2.12.5.jar:avro_1.11/jackson-annotations-2.12.5.jar:avro_1.11/jackson-databind-2.12.5.jar:avro_1.11/slf4j-api-1.7.32.jar Main.java
import java.math.BigInteger;
import java.util.Arrays;
import java.io.*;
import org.apache.avro.Schema;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.*;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.Conversions;
import org.apache.avro.file.DataFileReader;
import java.util.HexFormat;
public class Main {
static byte[] fromJsonToAvro(String json, Schema schema) throws Exception {
InputStream input = new ByteArrayInputStream(json.getBytes());
DataInputStream din = new DataInputStream(input);
Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din);
DatumReader<Object> reader = new GenericDatumReader<Object>(schema);
Object datum = reader.read(null, decoder);
GenericDatumWriter<Object> w = new GenericDatumWriter<Object>(schema);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
Encoder e = EncoderFactory.get().binaryEncoder(outputStream, null);
w.write(datum, e);
e.flush();
return outputStream.toByteArray();
}
public static void main(String[] args) {
try {
String json = "{\"departmentId\": \"123HumanResources\",\"paymentFee\": 23.900}";
String schemaJSON ="{ \"type\": \"record\", \"name\": \"payment\", \"fields\": [ { \"name\" : \"departmentId\" , \"type\" : \"string\" }, { \"name\" : \"paymentFee\" , \"type\" : { \"type\": \"bytes\", \"logicalType\": \"decimal\", \"precision\": 18, \"scale\": 3 }} ]}";
Schema schema = new Schema.Parser().parse(schemaJSON);
byte[] avroByteArray = fromJsonToAvro(json, schema);
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
Decoder decoder = DecoderFactory.get().binaryDecoder(avroByteArray, null);
GenericRecord record = reader.read(null, decoder);
System.out.println(record);
}
catch (Exception e) {
System.out.println(e);
}
}
} Try replacing For the deserialisation issue, since this fix I mentioned at the beginning went in, we get the following output:
for this config: input:
kafka:
addresses:
- localhost:9092
topics:
- payment
consumer_group: foo
client_id: benthos
pipeline:
processors:
- schema_registry_decode:
url: http://localhost:8081
- log:
message: ${! json("paymentFee").number() - 3.9}
output:
stdout: {} Note that I had to cast the For your particular schema, things seem OK so far, but let's look at the following example: > curl --location --request POST 'http://localhost:8081/subjects/payment/versions' --header 'Content-Type: application/json' --data-raw '{"schema": "{ \"type\": \"record\", \"name\": \"bytesdecimal\", \"fields\": [ { \"default\": null, \"name\": \"pos_0_33333333\", \"type\": [ \"null\", { \"logicalType\": \"decimal\", \"precision\": 16, \"scale\": 2, \"type\": \"bytes\" } ] } ]}"}' input:
generate:
mapping: |
root = {"pos_0_33333333": "!"}
interval: 0s
count: 1
pipeline:
processors:
- schema_registry_encode:
url: http://localhost:8081
subject: payment
refresh_period: 10m
avro_raw_json: true
- log:
message: "Encode error: ${! error() }"
output:
kafka:
addresses:
- localhost:9092
topic: bytesdecimal
client_id: benthos input:
kafka:
addresses:
- localhost:9092
topics:
- bytesdecimal
consumer_group: foo
client_id: benthos
pipeline:
processors:
- schema_registry_decode:
url: http://localhost:8081
- log:
message: "Decode error: ${! error() }"
output:
stdout: {} On the consumer side we get the following output after my fix: {"pos_0_33333333":{"bytes.decimal":"!"}} However, that's not what most people would expect. First, we lose the precision and scale from the schema (but maybe that should be expected, given how this type is declared in the schema?), but we also get that extra nesting in there with the type as a key, which might not be all that useful. Also, plugging this schema and JSON into the Java code above, we get this output: // ...
String json = "{\"pos_0_33333333\":{\"bytes\":\"!\"}}";
String schemaJSON ="{ \"type\": \"record\", \"name\": \"bytesdecimal\", \"fields\": [ { \"default\": null, \"name\": \"pos_0_33333333\", \"type\": [ \"null\", { \"logicalType\": \"decimal\", \"precision\": 16, \"scale\": 2, \"type\": \"bytes\" } ] } ]}";
// ... {"pos_0_33333333": "!"} But that's not what goavro's Note: I used The good news is that I just tested this code that the current goavro maintainer is planning to merge soon and it seems to produce the same output as Java, namely But, this isn't the end of the story, unfortunately. For example, the Snowflake Kafka Connector uses // > export JAVA_HOME=/usr/local/opt/openjdk
// > export PATH="${JAVA_HOME}/bin:$PATH"
// > java --version
// openjdk 18.0.1 2022-04-19
// OpenJDK Runtime Environment Homebrew (build 18.0.1+0)
// OpenJDK 64-Bit Server VM Homebrew (build 18.0.1+0, mixed mode, sharing)
// > java -cp avro_1.11/avro-1.11.0.jar:avro_1.11/jackson-core-2.12.5.jar:avro_1.11/jackson-annotations-2.12.5.jar:avro_1.11/jackson-databind-2.12.5.jar:avro_1.11/slf4j-api-1.7.32.jar Main.java
import java.math.BigInteger;
import java.util.Arrays;
import java.io.*;
import org.apache.avro.Schema;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.*;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.Conversions;
import org.apache.avro.file.DataFileReader;
import java.util.HexFormat;
public class Main {
static byte[] fromJsonToAvro(String json, Schema schema) throws Exception {
InputStream input = new ByteArrayInputStream(json.getBytes());
DataInputStream din = new DataInputStream(input);
Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din);
DatumReader<Object> reader = new GenericDatumReader<Object>(schema);
Object datum = reader.read(null, decoder);
GenericDatumWriter<Object> w = new GenericDatumWriter<Object>(schema);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
Encoder e = EncoderFactory.get().binaryEncoder(outputStream, null);
w.write(datum, e);
e.flush();
return outputStream.toByteArray();
}
public static void main(String[] args) {
try {
String json = "{\"pos_0_33333333\":{\"bytes\":\"!\"}}";
String schemaJSON ="{ \"type\": \"record\", \"name\": \"bytesdecimal\", \"fields\": [ { \"default\": null, \"name\": \"pos_0_33333333\", \"type\": [ \"null\", { \"logicalType\": \"decimal\", \"precision\": 16, \"scale\": 2, \"type\": \"bytes\" } ] } ]}";
Schema schema = new Schema.Parser().parse(schemaJSON);
byte[] avroByteArray = fromJsonToAvro(json, schema);
final GenericData genericData = new GenericData();
genericData.addLogicalTypeConversion(new Conversions.DecimalConversion());
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema, schema, genericData);
Decoder decoder = DecoderFactory.get().binaryDecoder(avroByteArray, null);
GenericRecord record = reader.read(null, decoder);
System.out.println(record);
}
catch (Exception e) {
System.out.println(e);
}
}
} then I get the following output: Anyhow, I'll open a PR here to update the |
Hello everyone, I am having some issues with the serialization/deserialization of a message that contains a
Decimal Logical Type
.Serialization Issue
Scenario: We have a Avro Schema with two fields: DepartmentId (String) & PaymentFee (Logical Type: Decimal), When We tried to serialize the following message:
{ "departmentId": "123HumanResources", "paymentFee": 23.900 }
We get the following issue:
Error: cannot encode binary record "payment" field "paymentFee": value does not match its schema: cannot transform to bytes, expected *big.Rat, received json.Number
Deserialization Issue
Scenario: We have a Dotnet Producer (which sends the exact same message), When Benthos deserializes the message, the value comes in
Null
& if we add.string()
the value of the paymentFee is "239/10"Questions
Notes
Here is a little poc with the issue: https://github.com/AplHelp05/SchemaRegistryPoC
The text was updated successfully, but these errors were encountered: