Skip to content

Commit

Permalink
Give codecs a name
Browse files Browse the repository at this point in the history
And pass that name to the server to be used if codec is not set.

This makes the simple cases more robust and smaller.
  • Loading branch information
bep committed Aug 14, 2022
1 parent 6d259c8 commit e69c95e
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 29 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ And the server side of the above:
func main() {
server, _ := execrpc.NewServer(
execrpc.ServerOptions[model.ExampleRequest, model.ExampleResponse]{
Codec: codecs.JSONCodec[model.ExampleResponse, model.ExampleRequest]{},
Call: func(d execrpc.Dispatcher, req model.ExampleRequest) model.ExampleResponse {
return model.ExampleResponse{
Hello: "Hello " + req.Text + "!",
Expand Down
18 changes: 18 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,20 @@ import (
// is about to be shut down.
var ErrShutdown = errors.New("connection is shut down")

const (
// Signal to server about what codec to use.
envClientCodec = "EXECRPC_CLIENT_CODEC"
)

// StartClient starts a client for the given options.
func StartClient[Q, R any](opts ClientOptions[Q, R]) (*Client[Q, R], error) {
if opts.Codec == nil {
return nil, errors.New("opts: Codec is required")
}

// Pass default settings to the server.
envhelpers.SetEnvVars(&opts.Env, envClientCodec, opts.Codec.Name())

rawClient, err := StartClientRaw(opts.ClientRawOptions)
if err != nil {
return nil, err
Expand All @@ -33,6 +43,7 @@ func StartClient[Q, R any](opts ClientOptions[Q, R]) (*Client[Q, R], error) {
}, nil
}

// Client is a strongly typed RPC client.
type Client[Q, R any] struct {
rawClient *ClientRaw
codec codecs.Codec[Q, R]
Expand Down Expand Up @@ -63,10 +74,12 @@ func (c *Client[Q, R]) Execute(r Q) (R, error) {
return resp, nil
}

// Close closes the client.
func (c *Client[Q, R]) Close() error {
return c.rawClient.Close()
}

// StartClientRaw starts a untyped client client for the given options.
func StartClientRaw(opts ClientRawOptions) (*ClientRaw, error) {
if opts.Timeout == 0 {
opts.Timeout = time.Second * 10
Expand Down Expand Up @@ -115,6 +128,8 @@ func StartClientRaw(opts ClientRawOptions) (*ClientRaw, error) {
return client, nil
}

// ClientRaw is a raw RPC client.
// Raw means that the client doesn't do any type conversion, a byte slice is what you get.
type ClientRaw struct {
version uint8

Expand All @@ -135,6 +150,7 @@ type ClientRaw struct {
pending map[uint32]*call
}

// Close closes the server connection and waits for the server process to quit.
func (c *ClientRaw) Close() error {
if err := c.conn.Close(); err != nil {
return c.addErrContext("close", err)
Expand Down Expand Up @@ -261,11 +277,13 @@ func (c *ClientRaw) send(call *call) error {
return call.Request.Write(c.conn)
}

// ClientOptions are options for the client.
type ClientOptions[Q, R any] struct {
ClientRawOptions
Codec codecs.Codec[Q, R]
}

// ClientRawOptions are options for the raw part of the client.
type ClientRawOptions struct {
// Version number passed to the server.
Version uint8
Expand Down
18 changes: 9 additions & 9 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,17 @@ func TestExecTyped(t *testing.T) {
}

c.Run("JSON", func(c *qt.C) {
client := newClient(c, codecs.JSONCodec[model.ExampleRequest, model.ExampleResponse]{}, "EXECRPC_CODEC=json")
client := newClient(c, codecs.JSONCodec[model.ExampleRequest, model.ExampleResponse]{})
runBasicTestForClient(c, client)
})

c.Run("TOML", func(c *qt.C) {
client := newClient(c, codecs.TOMLCodec[model.ExampleRequest, model.ExampleResponse]{}, "EXECRPC_CODEC=toml")
client := newClient(c, codecs.TOMLCodec[model.ExampleRequest, model.ExampleResponse]{})
runBasicTestForClient(c, client)
})

c.Run("Gob", func(c *qt.C) {
client := newClient(c, codecs.GobCodec[model.ExampleRequest, model.ExampleResponse]{}, "EXECRPC_CODEC=gob")
client := newClient(c, codecs.GobCodec[model.ExampleRequest, model.ExampleResponse]{})
runBasicTestForClient(c, client)
})

Expand All @@ -109,7 +109,7 @@ func TestExecTyped(t *testing.T) {
Version: 1,
Cmd: "go",
Args: []string{"run", "./examples/servers/typed"},
Env: []string{"EXECRPC_CODEC=json", "EXECRPC_SEND_TWO_LOG_MESSAGES=true"},
Env: []string{"EXECRPC_SEND_TWO_LOG_MESSAGES=true"},
Timeout: 4 * time.Second,
OnMessage: func(msg execrpc.Message) {
logMessages = append(logMessages, msg)
Expand All @@ -128,7 +128,7 @@ func TestExecTyped(t *testing.T) {
})

c.Run("Error", func(c *qt.C) {
client := newClient(c, codecs.JSONCodec[model.ExampleRequest, model.ExampleResponse]{}, "EXECRPC_CODEC=json", "EXECRPC_CALL_SHOULD_FAIL=true")
client := newClient(c, codecs.JSONCodec[model.ExampleRequest, model.ExampleResponse]{}, "EXECRPC_CALL_SHOULD_FAIL=true")
result, err := client.Execute(model.ExampleRequest{Text: "hello"})
c.Assert(err, qt.IsNil)
c.Assert(result.Err(), qt.IsNotNil)
Expand Down Expand Up @@ -160,7 +160,7 @@ func TestExecTyped(t *testing.T) {
}

func TestExecTypedConcurrent(t *testing.T) {
client := newTestClient(t, codecs.JSONCodec[model.ExampleRequest, model.ExampleResponse]{}, "EXECRPC_CODEC=json")
client := newTestClient(t, codecs.JSONCodec[model.ExampleRequest, model.ExampleResponse]{})
var g errgroup.Group

for i := 0; i < 100; i++ {
Expand Down Expand Up @@ -195,7 +195,7 @@ func BenchmarkClient(b *testing.B) {
const word = "World"

b.Run("JSON", func(b *testing.B) {
client := newTestClient(b, codecs.JSONCodec[model.ExampleRequest, model.ExampleResponse]{}, "EXECRPC_CODEC=json")
client := newTestClient(b, codecs.JSONCodec[model.ExampleRequest, model.ExampleResponse]{})
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, err := client.Execute(model.ExampleRequest{Text: word})
Expand All @@ -207,7 +207,7 @@ func BenchmarkClient(b *testing.B) {
})

b.Run("TOML", func(b *testing.B) {
client := newTestClient(b, codecs.TOMLCodec[model.ExampleRequest, model.ExampleResponse]{}, "EXECRPC_CODEC=toml")
client := newTestClient(b, codecs.TOMLCodec[model.ExampleRequest, model.ExampleResponse]{})
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, err := client.Execute(model.ExampleRequest{Text: word})
Expand All @@ -219,7 +219,7 @@ func BenchmarkClient(b *testing.B) {
})

b.Run("Gob", func(b *testing.B) {
client := newTestClient(b, codecs.GobCodec[model.ExampleRequest, model.ExampleResponse]{}, "EXECRPC_CODEC=gob")
client := newTestClient(b, codecs.GobCodec[model.ExampleRequest, model.ExampleResponse]{})
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, err := client.Execute(model.ExampleRequest{Text: word})
Expand Down
34 changes: 33 additions & 1 deletion codecs/codecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,34 @@ import (
"bytes"
"encoding/gob"
"encoding/json"
"errors"
"strings"

"github.com/pelletier/go-toml/v2"
)

// Codec defines the interface for a two way conversion between Q and R.
// Codec defines the interface for a two way conversion between Q and R.
type Codec[Q, R any] interface {
Encode(Q) ([]byte, error)
Decode([]byte, *R) error
Name() string
}

// ErrUnknownCodec is returned when no codec is found for the given name.
var ErrUnknownCodec = errors.New("unknown codec")

// ForName returns the codec for the given name or ErrUnknownCodec if no codec is found.
func ForName[Q, R any](name string) (Codec[Q, R], error) {
switch strings.ToLower(name) {
case "toml":
return TOMLCodec[Q, R]{}, nil
case "json":
return JSONCodec[Q, R]{}, nil
case "gob":
return GobCodec[Q, R]{}, nil
default:
return nil, ErrUnknownCodec
}
}

// TOMLCodec is a Codec that uses TOML as the underlying format.
Expand All @@ -30,6 +50,10 @@ func (c TOMLCodec[Q, R]) Encode(q Q) ([]byte, error) {
return b.Bytes(), nil
}

func (c TOMLCodec[Q, R]) Name() string {
return "TOML"
}

// JSONCodec is a Codec that uses JSON as the underlying format.
type JSONCodec[Q, R any] struct{}

Expand All @@ -41,6 +65,10 @@ func (c JSONCodec[Q, R]) Encode(q Q) ([]byte, error) {
return json.Marshal(q)
}

func (c JSONCodec[Q, R]) Name() string {
return "JSON"
}

// GobCodec is a Codec that uses gob as the underlying format.
type GobCodec[Q, R any] struct{}

Expand All @@ -58,3 +86,7 @@ func (c GobCodec[Q, R]) Encode(q Q) ([]byte, error) {
}
return b.Bytes(), nil
}

func (c GobCodec[Q, R]) Name() string {
return "Gob"
}
4 changes: 4 additions & 0 deletions examples/model/model.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package model

// ExampleRequest is just a simple example request.
type ExampleRequest struct {
Text string `json:"text"`
}

// ExampleResponse is just a simple example response.
type ExampleResponse struct {
Hello string `json:"hello"`
Error *Error `json:"err"`
}

// Err is just a simple example error.
func (r ExampleResponse) Err() error {
if r.Error == nil {
// Make sure that resp.Err() == nil.
Expand All @@ -17,6 +20,7 @@ func (r ExampleResponse) Err() error {
return r.Error
}

// Error holds an error message.
type Error struct {
Msg string `json:"msg"`
}
Expand Down
13 changes: 0 additions & 13 deletions examples/servers/typed/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"os"

"github.com/bep/execrpc"
"github.com/bep/execrpc/codecs"
"github.com/bep/execrpc/examples/model"
)

Expand All @@ -16,31 +15,19 @@ func main() {

// Some test flags from the client.
var (
codecID = os.Getenv("EXECRPC_CODEC")
printOutsideServerBefore = os.Getenv("EXECRPC_PRINT_OUTSIDE_SERVER_BEFORE") != ""
printOutsideServerAfter = os.Getenv("EXECRPC_PRINT_OUTSIDE_SERVER_AFTER") != ""
printInsideServer = os.Getenv("EXECRPC_PRINT_INSIDE_SERVER") != ""
callShouldFail = os.Getenv("EXECRPC_CALL_SHOULD_FAIL") != ""
sendLogMessage = os.Getenv("EXECRPC_SEND_TWO_LOG_MESSAGES") != ""
)

var codec codecs.Codec[model.ExampleResponse, model.ExampleRequest]
switch codecID {
case "toml":
codec = codecs.TOMLCodec[model.ExampleResponse, model.ExampleRequest]{}
case "gob":
codec = codecs.GobCodec[model.ExampleResponse, model.ExampleRequest]{}
default:
codec = codecs.JSONCodec[model.ExampleResponse, model.ExampleRequest]{}
}

if printOutsideServerBefore {
fmt.Println("Printing outside server before")
}

server, err := execrpc.NewServer(
execrpc.ServerOptions[model.ExampleRequest, model.ExampleResponse]{
Codec: codec,
Call: func(d execrpc.Dispatcher, req model.ExampleRequest) model.ExampleResponse {
if printInsideServer {
fmt.Println("Printing inside server")
Expand Down
4 changes: 4 additions & 0 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@ func (m *Message) Write(w io.Writer) error {
return err
}

// Header is the header of a message.
// ID and Size are set by the system.
type Header struct {
ID uint32
Version uint8
Status uint8
Size uint32
}

// Read reads the header from the reader.
func (h *Header) Read(r io.Reader) error {
buf := make([]byte, 10)
_, err := io.ReadFull(r, buf)
Expand All @@ -49,6 +52,7 @@ func (h *Header) Read(r io.Reader) error {
return nil
}

// Write writes the header to the writer.
func (h Header) Write(w io.Writer) error {
buff := make([]byte, 10)
binary.BigEndian.PutUint32(buff[0:4], h.ID)
Expand Down
22 changes: 17 additions & 5 deletions server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package execrpc

import (
"errors"
"fmt"
"io"
"os"
Expand All @@ -22,7 +23,7 @@ const (
MessageStatusSystemReservedMax = 99
)

// NewServerRaw creates a new Server. using the given options.
// NewServerRaw creates a new Server using the given options.
func NewServerRaw(opts ServerRawOptions) (*ServerRaw, error) {
if opts.Call == nil {
return nil, fmt.Errorf("opts: Call function is required")
Expand All @@ -42,7 +43,13 @@ func NewServer[Q, R any](opts ServerOptions[Q, R]) (*Server[Q, R], error) {
return nil, fmt.Errorf("opts: Call function is required")
}
if opts.Codec == nil {
return nil, fmt.Errorf("opts: Codec is required")
if opts.Codec == nil {
var err error
opts.Codec, err = codecs.ForName[R, Q](os.Getenv(envClientCodec))
if err != nil {
return nil, errors.New("opts: Codec is required")
}
}
}

var rawServer *ServerRaw
Expand All @@ -61,7 +68,6 @@ func NewServer[Q, R any](opts ServerOptions[Q, R]) (*Server[Q, R], error) {
r := opts.Call(rawServer.dispatcher, q)
b, err := opts.Codec.Encode(r)
if err != nil {

m := Message{
Header: message.Header,
Body: []byte(fmt.Sprintf("failed to encode response: %s. Check that client and server uses the same codec.", err)),
Expand Down Expand Up @@ -93,7 +99,11 @@ func NewServer[Q, R any](opts ServerOptions[Q, R]) (*Server[Q, R], error) {

// ServerOptions is the options for a server.
type ServerOptions[Q, R any] struct {
Call func(Dispatcher, Q) R
// Call is the function that will be called when a request is received.
Call func(Dispatcher, Q) R

// Codec is the codec that will be used to encode and decode requests and responses.
// The client will tell the server what codec is in use, so in most cases you should just leave this unset.
Codec codecs.Codec[R, Q]
}

Expand Down Expand Up @@ -241,19 +251,21 @@ func (s *ServerRaw) inputOutput() error {
return err
}

// ServerRawOptions is the options for a raw portion of the server.
type ServerRawOptions struct {
// Call is the message exhcange between the client and server.
// Note that any error returned by this function will be treated as a fatal error and the server is stopped.
// Validation errors etc. should be returned in the response message.
// The Dispatcher can be used to send messages to the client outside of the request/response loop, e.g. log messages.
// Note that these messages can not have an ID.
// Note that these messages must have ID 0.
Call func(Dispatcher, Message) (Message, error)
}

type messageDispatcher struct {
s *ServerRaw
}

// Dispatcher is the interface for dispatching standalone messages to the client, e.g. log messages.
type Dispatcher interface {
// Send sends one or more message back to the client.
// This is normally used for log messages and similar,
Expand Down

0 comments on commit e69c95e

Please sign in to comment.