Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat ipc #62

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions .gitignore
@@ -1 +1,2 @@
*.test
.idea
56 changes: 56 additions & 0 deletions avro_flupe.avrp
@@ -0,0 +1,56 @@
{
"protocol":"AvroSourceProtocol",
"namespace":"org.apache.flume.source.avro",
"doc":"* Licensed to the Apache Software Foundation (ASF).",
"types":[
{
"type":"enum",
"name":"Status",
"symbols":[
"OK",
"FAILED",
"UNKNOWN"
]
},
{
"type":"record",
"name":"AvroFlumeEvent",
"fields":[
{
"name":"headers",
"type":{
"type":"map",
"values":"string"
}
},
{
"name":"body",
"type":"bytes"
}
]
}
],
"messages":{
"append":{
"request":[
{
"name":"event",
"type":"AvroFlumeEvent"
}
],
"response":"Status"
},
"appendBatch":{
"request":[
{
"name":"events",
"type":{
"type":"array",
"items":"AvroFlumeEvent"
}
}
],
"response":"Status"
}
}
}
41 changes: 41 additions & 0 deletions encoder_test.go
@@ -0,0 +1,41 @@
package goavro

import (
"testing"
"bytes"
)

const testSchema= `
{
"type": "record",
"name": "test",
"fields" : [
{"name": "a", "type": "long"},
{"name": "b", "type": "string"}
]
}
`
func TestEncode(t *testing.T) {
record,err := NewRecord(RecordSchema(testSchema))
if err != nil {
t.Fatal(err)
}
record.Set("a",int64(27))
record.Set("b","foo")

codec, err := NewCodec(testSchema)
if err != nil {
t.Fatal(err)
}

bb := new(bytes.Buffer)
if err = codec.Encode(bb, record); err !=nil {
t.Fatal(err)
}
actual := bb.Bytes()
expected := []byte("\x36\x06\x66\x6f\x6f")

if bytes.Compare(actual, expected) != 0 {
t.Errorf("Actual: %#v; Expected: %#v", actual, expected)
}
}
50 changes: 50 additions & 0 deletions examples/flume/client.go
@@ -0,0 +1,50 @@
package main
import (
"github.com/sebglon/goavro"
"log"
"github.com/sebglon/goavro/transceiver/netty"
"time"
)

func main() {
//t.SkipNow()
transceiver,err := netty.NewTransceiver(netty.Config{AsyncConnect:false, NettyHost:"192.168.11.152"})
if err != nil {
log.Fatal(err)
}
protocol, err := goavro.NewProtocol()
if err != nil {
log.Fatal(err)
}

flumeRecord, errFlume := protocol.NewRecord("AvroFlumeEvent")
if errFlume != nil {
log.Fatal(errFlume)
}
headers := make(map[string]interface{})
headers["host_header"] = "127.0.0.1"
flumeRecord.Set("headers", headers)

requestor := goavro.NewRequestor(protocol, transceiver)

flumeRecord.Set("body", []byte("test 1"))
err = requestor.Request("append", flumeRecord)

if err != nil {
log.Fatal("Request 1: ", err)
}

log.Printf("Test 1 OK")


time.Sleep(5 * time.Second)
flumeRecord.Set("body", []byte("test 2"))
err = requestor.Request("append", flumeRecord)

if err != nil {
log.Fatal("Request 2: ", err)
}
log.Printf("Test 2 OK")

}

155 changes: 155 additions & 0 deletions protocol.go
@@ -0,0 +1,155 @@
package goavro

import (
"crypto/md5"
"encoding/json"
"fmt"
)

var TYPES_CACHE map[string]ProtocolType

type Protocol struct {
Name string `json:"protocol"`
Namespace string `json:"namespace"`
Fullname string `json:"-"`
Doc string `json:"doc"`
Types []AbsType `json:"types"`
Messages map[string]ProtocolMessage `json:"messages"`
MD5 []byte `json:"-"`
}

type ProtocolType struct {
TypeX string `json:"type"`
Name string `json:"name,omitempty"`
Symbols []string `json:"symbols,omitempty"`
Fields []Field `json:"fields,omitempty"`
Values string `json:"values,omitempty"`
Items string `json:"items,omitempty"`
}

type Field struct{
Name string `json:"name"`
TypeX AbsType `json:"type"`
}

type AbsType struct {
*ProtocolType
ref string `json:"-"`
}

type ProtocolMessage struct {
Name string `json:"-"`
Doc string `json:"doc,omitempty"`
Request []Field `json:"request"`
Response string `json:"response"`
Errors []string `json:"errors,omitempty"`
One_way bool `json:"one-way,omitempty"`
}

const proto = `
{"protocol":"AvroSourceProtocol","namespace":"org.apache.flume.source.avro","doc":"* Licensed to the Apache Software Foundation (ASF) under one\n * or more contributor license agreements. See the NOTICE file\n * distributed with this work for additional information\n * regarding copyright ownership. The ASF licenses this file\n * to you under the Apache License, Version 2.0 (the\n * \"License\"); you may not use this file except in compliance\n * with the License. You may obtain a copy of the License at\n *\n * http://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by applicable law or agreed to in writing,\n * software distributed under the License is distributed on an\n * \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n * KIND, either express or implied. See the License for the\n * specific language governing permissions and limitations\n * under the License.","types":[{"type":"enum","name":"Status","symbols":["OK","FAILED","UNKNOWN"]},{"type":"record","name":"AvroFlumeEvent","fields":[{"name":"headers","type":{"type":"map","values":"string"}},{"name":"body","type":"bytes"}]}],"messages":{"append":{"request":[{"name":"event","type":"AvroFlumeEvent"}],"response":"Status"},"appendBatch":{"request":[{"name":"events","type":{"type":"array","items":"AvroFlumeEvent"}}],"response":"Status"}}}
`

func init() {
TYPES_CACHE = make(map[string]ProtocolType)
TYPES_CACHE["bytes"] = ProtocolType{Name:"bytes"}
TYPES_CACHE["enum"] = ProtocolType{Name:"enum"}
TYPES_CACHE["record"] = ProtocolType{Name:"record"}
}
func (t *AbsType) UnmarshalJSON(data []byte) error {
var nameType string
var protocolType ProtocolType
if err := json.Unmarshal(data, &nameType); err==nil {
protoType, ok := TYPES_CACHE[nameType]
if ok {
t.ref = nameType
protocolType = protoType
} else {
return fmt.Errorf("Type %s not found on protocol type cache %#v", data, TYPES_CACHE)
}
} else if err := json.Unmarshal(data, &protocolType); err!=nil {
return fmt.Errorf("Fail to Parse AbsType, %s %s", data,err )
}
t.ProtocolType = &protocolType
TYPES_CACHE[protocolType.Name] = protocolType
return nil
}

func (t *AbsType) MarshalJSON()([]byte, error) {
if len(t.ref)>0 {
return json.Marshal(t.ref)
} else {
return json.Marshal(t.ProtocolType)
}
}

func NewProtocol() (Protocol, error) {
var result Protocol
err := json.Unmarshal([]byte(proto), &result)

if err!=nil {
return result, err
}

if len(result.Name)==0 {
err = fmt.Errorf("Protocol must have a non-empty name.")
} else if len(result.Namespace) == 0 {
err = fmt.Errorf("The namespace property must be a string.")
}
result.Fullname = result.Namespace +"." + result.Name

bb, err := json.Marshal(result)

if err!=nil {
return result, err
}
hash := md5.Sum(bb)
result.MD5 = hash[:]
return result, err
}

func (p *Protocol) Json() (string, error) {
var result string
bb, err := json.Marshal(p)
if err != nil {
return result, err

}
return string(bb), nil
}


func (p *Protocol) MessageResponseCodec(messageName string) (Codec, error) {
json, err := p.MessageResponseJson(messageName)
if err!= nil {
return nil, err
}
return NewCodec(json)
}
func (p *Protocol) MessageResponseJson(messageName string) (string, error) {
field := p.Messages[messageName].Response
avroType := TYPES_CACHE[field]
json, err := json.Marshal(avroType)
return string(json), err
}
func (p *Protocol) MessageRequestCodec(messageName string) (Codec, error) {
json, err := p.MessageRequestJson(messageName)
if err!= nil {
return nil, err
}
return NewCodec(json)
}
func (p *Protocol) MessageRequestJson(messageName string) (string, error) {
field := p.Messages[messageName].Request[0]
avroType := TYPES_CACHE[field.TypeX.ref]
json, err := json.Marshal(avroType)
return string(json), err
}
func (p *Protocol) NewRecord(typeName string) (*Record, error) {
avroType := TYPES_CACHE[typeName]
json, err := json.Marshal(avroType)
if err!= nil {
return nil, err
}
return NewRecord(RecordSchema(string(json)))
}