Skip to content

Commit

Permalink
end of dev
Browse files Browse the repository at this point in the history
  • Loading branch information
Sébastien GLON committed Aug 26, 2016
1 parent d02902a commit 07e1f81
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 288 deletions.
6 changes: 0 additions & 6 deletions Transceiver.go
Expand Up @@ -5,7 +5,6 @@ import (
"net"
"encoding/binary"
"fmt"
"os"
"io"
)

Expand Down Expand Up @@ -75,18 +74,13 @@ func (t *NettyTransceiver) Pack(frame *bytes.Buffer, requests []bytes.Buffer) {
}

func (t *NettyTransceiver) Unpack(frame []byte) ([]io.Reader, error) {

nettyNumberFame := binary.BigEndian.Uint32(frame[4:8])
result := make([]io.Reader, nettyNumberFame)
startFrame := uint32(8)
i:=uint32(0)
for i < nettyNumberFame {


messageSize := uint32(binary.BigEndian.Uint32(frame[startFrame:startFrame+4]))
fmt.Fprintf(os.Stdout, "\nnettyNumberFrame %v %v ", startFrame, frame[startFrame:startFrame+4])
message := frame[startFrame+4:startFrame+4+messageSize]
fmt.Fprintf(os.Stdout, "\nmessage: %v", message)
startFrame = startFrame+4+messageSize
br := bytes.NewReader(message)
result[i] = br
Expand Down
24 changes: 12 additions & 12 deletions ipc_test.go → examples/flume/client.go
@@ -1,45 +1,45 @@
package goavro

package main
import (
"testing"
"github.com/sebglon/goavro"
"net"
"log"

)

func TestRequestor(t *testing.T) {
func main() {
//t.SkipNow()
rAddr, err := net.ResolveTCPAddr("tcp", "10.98.80.113:63001")
conn, err := net.DialTCP("tcp", nil, rAddr)
if err != nil {
t.Fatal(err)
log.Fatal(err)
}
defer conn.Close()

transceiver := NewNettyTransceiver(conn)
protocol, err := NewProtocol()
transceiver := goavro.NewNettyTransceiver(conn)
protocol, err := goavro.NewProtocol()
if err != nil {
t.Fatal(err)
log.Fatal(err)
}

flumeRecord, errFlume := protocol.NewRecord("AvroFlumeEvent")
if errFlume != nil {
t.Fatal(errFlume)
log.Fatal(errFlume)
}
headers := make(map[string]interface{})
headers["host_header"] = "127.0.0.1"
flumeRecord.Set("headers", headers)
flumeRecord.Set("body", []byte("2016-08-02 14:45:38|flume.composantTechnique_IS_UNDEFINED|flume.application_IS_UNDEFINED|flume.client_IS_UNDEFINED|flume.plateforme_IS_UNDEFINED|instance_IS_UNDEFINED|logname_IS_UNDEFINED|WARN |test.LogGenerator|test !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"))
requestor := NewRequestor(protocol, transceiver)
requestor := goavro.NewRequestor(protocol, transceiver)
err = requestor.Request("append", flumeRecord)

if err != nil {
t.Fatal("Request: ", err)
log.Fatal("Request: ", err)
}

err = requestor.Request("append", flumeRecord)

if err != nil {
t.Fatal("Request: ", err)
log.Fatal("Request: ", err)
}
}

264 changes: 0 additions & 264 deletions ipc.go

This file was deleted.

14 changes: 14 additions & 0 deletions protocol.go
Expand Up @@ -118,6 +118,20 @@ func (p *Protocol) Json() (string, error) {
return string(bb), nil
}


func (p *Protocol) MessageResponseCodec(messageName string) (Codec, error) {
json, err := p.MessageResponseJson(messageName)
if err!= nil {
return nil, err
}
return NewCodec(json)
}
func (p *Protocol) MessageResponseJson(messageName string) (string, error) {
field := p.Messages[messageName].Response
avroType := TYPES_CACHE[field]
json, err := json.Marshal(avroType)
return string(json), err
}
func (p *Protocol) MessageRequestCodec(messageName string) (Codec, error) {
json, err := p.MessageRequestJson(messageName)
if err!= nil {
Expand Down
2 changes: 1 addition & 1 deletion protocol_test.go
Expand Up @@ -82,7 +82,7 @@ func TestToJson(t *testing.T) {
t.Fatal("%#v", err)
}
if result!= jsonCompact(proto) {
t.Errorf("Proto to Json not equals; Expected %#v, actual %#v",jsonCompact(proto), result)
t.Errorf("Proto to Json not equals; Expected \n%#v\nactual \n%#v",jsonCompact(proto), result)
}
}

Expand Down

0 comments on commit 07e1f81

Please sign in to comment.