Skip to content

Commit

Permalink
Add Codec.ScanBinary and an implementation for record types.
Browse files Browse the repository at this point in the history
  • Loading branch information
SpencerC committed Jun 7, 2023
1 parent 88c1690 commit 15af226
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 0 deletions.
51 changes: 51 additions & 0 deletions codec.go
Expand Up @@ -57,6 +57,8 @@ type Codec struct {
nativeFromBinary func([]byte) (interface{}, []byte, error)
textualFromNative func([]byte, interface{}) ([]byte, error)

scanBinary func([]byte, ...interface{}) ([]byte, error)

Rabin uint64
}

Expand Down Expand Up @@ -583,6 +585,55 @@ func (c *Codec) TextualFromNative(buf []byte, datum interface{}) ([]byte, error)
return newBuf, nil
}

// ScanBinary copies the values from the binary encoded byte slice into the
// values pointed to by dest in the order of the fields of the Avro schema
// supplied when creating the Codec. On success, it returns a byte slice
// containing the remaining undecoded bytes, and a nil error value. On error, it
// returns the original byte slice, and the error message.
//
// func ExampleCodec_ScanBinary_avro() {
// codec, err := NewCodec(`
// {
// "type": "record",
// "name": "r1",
// "fields" : [
// {"name": "f1", "type": "string"},
// {"name": "f2", "type": "int"}
// ]
// }
// `)
//
// if err != nil {
// log.Fatal(err)
// }
//
// binary := []byte{
// 0x10, // field1 size = 8
// 't', 'h', 'i', 'r', 't', 'e', 'e', 'n',
// 0x1a, // field2 == 13
// }
//
// var f1 string
// var f2 int
// if _, err = codec.ScanBinary(binary, &f1, &f2); err != nil {
// log.Fatal(err)
// }
//
// fmt.Printf("f1: %v, f2: %v", f1, f2)
// // Output: f1: thirteen, f2: 13
// }
func (c *Codec) ScanBinary(buf []byte, dest ...interface{}) ([]byte, error) {
// TODO: implement for every type and remove
if c.scanBinary == nil {
return buf, fmt.Errorf("ScanBinary not implemented for codec with schema: %s", c.schemaOriginal)
}
newBuf, err := c.scanBinary(buf, dest...)
if err != nil {
return buf, err // if error, return original byte slice
}
return newBuf, nil
}

// Schema returns the original schema used to create the Codec.
func (c *Codec) Schema() string {
return c.schemaOriginal
Expand Down
16 changes: 16 additions & 0 deletions record.go
Expand Up @@ -229,5 +229,21 @@ func makeRecordCodec(st map[string]*Codec, enclosingNamespace string, schemaMap
return genericMapTextEncoder(buf, datum, nil, codecFromFieldName)
}

c.scanBinary = func(buf []byte, dest ...interface{}) ([]byte, error) {
for i, fieldCodec := range codecFromIndex {
name := nameFromIndex[i]
var value interface{}
var err error
value, buf, err = fieldCodec.nativeFromBinary(buf)
if err != nil {
return nil, fmt.Errorf("cannot decode binary record %q field %q: %w", c.typeName, name, err)
}
if err := convertAssign(dest[i], value); err != nil {
return nil, fmt.Errorf("cannot convert binary record %q field %q: %w", c.typeName, name, err)
}
}
return buf, nil
}

return c, nil
}
32 changes: 32 additions & 0 deletions record_test.go
Expand Up @@ -12,6 +12,7 @@ package goavro
import (
"bytes"
"fmt"
"log"
"testing"
)

Expand Down Expand Up @@ -612,6 +613,37 @@ func ExampleCodec_TextualFromNative_avro() {
// Output: {"next":{"LongList":{"next":{"LongList":{"next":null}}}}}
}

func ExampleCodec_ScanBinary_avro() {
codec, err := NewCodec(`
{
"type": "record",
"name": "r1",
"fields" : [
{"name": "f1", "type": "string"},
{"name": "f2", "type": "int"}
]
}
`)
if err != nil {
log.Fatal(err)
}

binary := []byte{
0x10, // field1 size = 8
't', 'h', 'i', 'r', 't', 'e', 'e', 'n',
0x1a, // field2 == 13
}

var f1 string
var f2 int
if _, err = codec.ScanBinary(binary, &f1, &f2); err != nil {
log.Fatal(err)
}

fmt.Printf("f1: %v, f2: %v", f1, f2)
// Output: f1: thirteen, f2: 13
}

func TestRecordFieldFixedDefaultValue(t *testing.T) {
testSchemaValid(t, `{"type": "record", "name": "r1", "fields":[{"name": "f1", "type": {"type": "fixed", "name": "someFixed", "size": 1}, "default": "\u0000"}]}`)
}
Expand Down

0 comments on commit 15af226

Please sign in to comment.