Skip to content

Commit

Permalink
Use reflection to invoke receiver fn from client. (#67)
Browse files Browse the repository at this point in the history
Signed-off-by: Scott Nichols <nicholss@google.com>
  • Loading branch information
n3wscott authored and markpeek committed Mar 8, 2019
1 parent 76d94e4 commit 965e37c
Show file tree
Hide file tree
Showing 6 changed files with 466 additions and 22 deletions.
2 changes: 1 addition & 1 deletion cmd/samples/complex/httptonats/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Example struct {
Message string `json:"message"`
}

func (r *Receiver) Receive(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error {
func (r *Receiver) Receive(event cloudevents.Event) error {
fmt.Printf("Got Event Context: %+v\n", event.Context)

data := &Example{}
Expand Down
2 changes: 1 addition & 1 deletion cmd/samples/http/receiver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Example struct {
Message string `json:"message"`
}

func gotEvent(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error {
func gotEvent(ctx context.Context, event cloudevents.Event) error {
fmt.Printf("Got Event Context: %+v\n", event.Context)
data := &Example{}
if err := event.DataAs(data); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/samples/http/sleepy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Example struct {
Message string `json:"message"`
}

func gotEvent(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error {
func gotEvent(event cloudevents.Event) error {
fmt.Printf("Got Event Context: %+v\n", event.Context)
data := &Example{}
if err := event.DataAs(data); err != nil {
Expand Down
59 changes: 40 additions & 19 deletions pkg/cloudevents/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,33 @@ import (
"github.com/cloudevents/sdk-go/pkg/cloudevents/transport"
)

// Receive is the signature of a fn to be invoked for incoming cloudevents.
// If fn returns an error, EventResponse will not be considered by the client or
// or transport.
type Receive func(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error

type Client interface {
// Send will transmit the given event over the client's configured transport.
Send(ctx context.Context, event cloudevents.Event) (*cloudevents.Event, error)

StartReceiver(ctx context.Context, fn Receive) error
StopReceiver(ctx context.Context) error
}

type ceClient struct {
transport transport.Transport
receive Receive
// StartReceiver will register the provided function for callback on receipt
// of a cloudevent. It will also start the underlying transport as it has
// been configured.
// Valid fn signatures are:
// * func()
// * func() error
// * func(context.Context)
// * func(context.Context) error
// * func(cloudevents.Event)
// * func(cloudevents.Event) error
// * func(context.Context, cloudevents.Event)
// * func(context.Context, cloudevents.Event) error
// * func(cloudevents.Event, *cloudevents.EventResponse)
// * func(cloudevents.Event, *cloudevents.EventResponse) error
// * func(context.Context, cloudevents.Event, *cloudevents.EventResponse)
// * func(context.Context, cloudevents.Event, *cloudevents.EventResponse) error
// Note: if fn returns an error, it is treated as a critical and
// EventResponse will not be processed.
StartReceiver(ctx context.Context, fn interface{}) error

eventDefaulterFns []EventDefaulter
// StopReceiver will stop the underlying transport and deregister the
// previously provided receiver fn.
StopReceiver(ctx context.Context) error
}

func New(t transport.Transport, opts ...Option) (Client, error) {
Expand All @@ -37,6 +47,13 @@ func New(t transport.Transport, opts ...Option) (Client, error) {
return c, nil
}

type ceClient struct {
transport transport.Transport
fn *receiverFn

eventDefaulterFns []EventDefaulter
}

func (c *ceClient) Send(ctx context.Context, event cloudevents.Event) (*cloudevents.Event, error) {
// Confirm we have a transport set.
if c.transport == nil {
Expand All @@ -58,8 +75,8 @@ func (c *ceClient) Send(ctx context.Context, event cloudevents.Event) (*cloudeve

// Receive is called from from the transport on event delivery.
func (c *ceClient) Receive(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error {
if c.receive != nil {
err := c.receive(ctx, event, resp)
if c.fn != nil {
err := c.fn.invoke(ctx, event, resp)
// Apply the defaulter chain to the outgoing event.
if err == nil && resp != nil && resp.Event != nil && len(c.eventDefaulterFns) > 0 {
for _, fn := range c.eventDefaulterFns {
Expand All @@ -75,15 +92,19 @@ func (c *ceClient) Receive(ctx context.Context, event cloudevents.Event, resp *c
return nil
}

func (c *ceClient) StartReceiver(ctx context.Context, fn Receive) error {
func (c *ceClient) StartReceiver(ctx context.Context, fn interface{}) error {
if c.transport == nil {
return fmt.Errorf("client not ready, transport not initialized")
}
if c.receive != nil {
if c.fn != nil {
return fmt.Errorf("client already has a receiver")
}

c.receive = fn
if fn, err := receiver(fn); err != nil {
return err
} else {
c.fn = fn
}

return c.transport.StartReceiver(ctx)
}
Expand All @@ -94,7 +115,7 @@ func (c *ceClient) StopReceiver(ctx context.Context) error {
}

err := c.transport.StopReceiver(ctx)
c.receive = nil
c.fn = nil
return err
}

Expand Down
187 changes: 187 additions & 0 deletions pkg/cloudevents/client/receiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
package client

import (
"context"
"errors"
"fmt"
"github.com/cloudevents/sdk-go/pkg/cloudevents"
"reflect"
)

// Receive is the signature of a fn to be invoked for incoming cloudevents.
// If fn returns an error, EventResponse will not be considered by the client or
// or transport.
// This is just an FYI:
type ReceiveFull func(context.Context, cloudevents.Event, *cloudevents.EventResponse) error

type receiverFn struct {
numIn int
fnValue reflect.Value

hasContextIn bool
hasEventIn bool
hasEventResponseIn bool

hasErrorOut bool
}

const (
inParamUsage = "expected a function taking either no parameters, one or more of (context.Context, cloudevents.Event, *cloudevents.EventResponse) ordered"
outParamUsage = "expected a function returning either nothing or an error"
)

var (
contextType = reflect.TypeOf((*context.Context)(nil)).Elem()
eventType = reflect.TypeOf((*cloudevents.Event)(nil)).Elem()
eventResponseType = reflect.TypeOf((*cloudevents.EventResponse)(nil)) // want the ptr type
errorType = reflect.TypeOf((*error)(nil)).Elem()
)

// receiver creates a receiverFn wrapper class that is used by the client to
// validate and invoke the provided function.
// Valid fn signatures are:
// * func()
// * func() error
// * func(context.Context)
// * func(context.Context) error
// * func(cloudevents.Event)
// * func(cloudevents.Event) error
// * func(context.Context, cloudevents.Event)
// * func(context.Context, cloudevents.Event) error
// * func(cloudevents.Event, *cloudevents.EventResponse)
// * func(cloudevents.Event, *cloudevents.EventResponse) error
// * func(context.Context, cloudevents.Event, *cloudevents.EventResponse)
// * func(context.Context, cloudevents.Event, *cloudevents.EventResponse) error
//
func receiver(fn interface{}) (*receiverFn, error) {
fnType := reflect.TypeOf(fn)
if fnType.Kind() != reflect.Func {
return nil, errors.New("must pass a function to handle events")
}

r := &receiverFn{
fnValue: reflect.ValueOf(fn),
numIn: fnType.NumIn(),
}
if err := r.validate(fnType); err != nil {
return nil, err
}

return r, nil
}

func (r *receiverFn) invoke(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error {
args := make([]reflect.Value, 0, r.numIn)

if r.numIn > 0 {
if r.hasContextIn {
args = append(args, reflect.ValueOf(ctx))
}
if r.hasEventIn {
args = append(args, reflect.ValueOf(event))
}
if r.hasEventResponseIn {
args = append(args, reflect.ValueOf(resp))
}
}
v := r.fnValue.Call(args)
if r.hasErrorOut && len(v) >= 1 {
if err, ok := v[0].Interface().(error); ok {
return err
}
}
return nil
}

// Verifies that the inputs to a function have a valid signature
// Valid input is to be [0, all] of
// context.Context, cloudevents.Event, *cloudevents.EventResponse in this order.
func (r *receiverFn) validateInParamSignature(fnType reflect.Type) error {
r.hasContextIn = false
r.hasEventIn = false
r.hasEventResponseIn = false

switch fnType.NumIn() {
case 3:
// has to be cloudevents.Event, *cloudevents.EventResponse
if !fnType.In(2).ConvertibleTo(eventResponseType) {
return fmt.Errorf("%s; cannot convert parameter 2 from %s to *cloudevents.EventResponse", inParamUsage, fnType.In(2))
} else {
r.hasEventResponseIn = true
}
fallthrough
case 2:
// can be cloudevents.Event or *cloudevents.EventResponse
if !fnType.In(1).ConvertibleTo(eventResponseType) {
if !fnType.In(1).ConvertibleTo(eventType) {
return fmt.Errorf("%s; cannot convert parameter 1 from %s to cloudevents.Event or *cloudevents.EventResponse", inParamUsage, fnType.In(1))
} else {
r.hasEventIn = true
}
} else if r.hasEventResponseIn {
return fmt.Errorf("%s; duplicate parameter of type *cloudevents.EventResponse", inParamUsage)
} else {
r.hasEventResponseIn = true
}
fallthrough
case 1:
if !fnType.In(0).ConvertibleTo(contextType) {
if !fnType.In(0).ConvertibleTo(eventResponseType) {
if !fnType.In(0).ConvertibleTo(eventType) {
return fmt.Errorf("%s; cannot convert parameter 0 from %s to context.Context, cloudevents.Event or *cloudevents.EventResponse", inParamUsage, fnType.In(0))
} else if r.hasEventIn {
return fmt.Errorf("%s; duplicate parameter of type cloudevents.Event", inParamUsage)
} else {
r.hasEventIn = true
}
} else if r.hasEventResponseIn {
return fmt.Errorf("%s; duplicate parameter of type *cloudevents.EventResponse", inParamUsage)
} else if r.hasEventIn {
return fmt.Errorf("%s; out of order parameter 0 for %s", inParamUsage, fnType.In(1))
} else {
r.hasEventResponseIn = true
}
} else {
r.hasContextIn = true
}
fallthrough
case 0:
return nil
default:
return fmt.Errorf("%s; function has too many parameters (%d)", inParamUsage, fnType.NumIn())
}
}

// Verifies that the outputs of a function have a valid signature
// Valid output signatures:
// (), (error)
func (r *receiverFn) validateOutParamSignature(fnType reflect.Type) error {
r.hasErrorOut = false
switch fnType.NumOut() {
case 1:
paramNo := fnType.NumOut() - 1
paramType := fnType.Out(paramNo)
if !paramType.ConvertibleTo(errorType) {
return fmt.Errorf("%s; cannot convert return type %d from %s to error", outParamUsage, paramNo, paramType)
} else {
r.hasErrorOut = true
}
fallthrough
case 0:
return nil
default:
return fmt.Errorf("%s; function has too many return types (%d)", outParamUsage, fnType.NumOut())
}
}

// validateReceiverFn validates that a function has the right number of in and
// out params and that they are of allowed types.
func (r *receiverFn) validate(fnType reflect.Type) error {
if err := r.validateInParamSignature(fnType); err != nil {
return err
}
if err := r.validateOutParamSignature(fnType); err != nil {
return err
}
return nil
}

0 comments on commit 965e37c

Please sign in to comment.