Skip to content

Commit

Permalink
feat(client): Add option to make receive callback blocking (#771)
Browse files Browse the repository at this point in the history
* Add option to make receive callback blocking

Signed-off-by: James Yu <jyu@confluent.io>

* make test run in parallel

Signed-off-by: James Yu <jyu@confluent.io>
  • Loading branch information
smilence-yu committed May 9, 2022
1 parent 5decd2e commit 66eacad
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 21 deletions.
124 changes: 108 additions & 16 deletions test/integration/http/blocking_test.go
Expand Up @@ -8,11 +8,12 @@ package http
import (
"context"
"fmt"
"go.uber.org/atomic"
"sync"
"testing"
"time"

"go.uber.org/atomic"

"github.com/cloudevents/sdk-go/v2/client"

"github.com/google/uuid"
Expand All @@ -32,12 +33,19 @@ type BlockingSenderReceiverTest struct {
now time.Time
event *cloudevents.Event
receiverWait time.Duration
timeout time.Duration
want int
}

type BlockingSenderReceiverTestOutput struct {
duration time.Duration
got int
}

type BlockingSenderReceiverTestCases map[string]BlockingSenderReceiverTest

func TestBlockingSenderReceiver(t *testing.T) {
func TestNonBlockingSenderReceiver(t *testing.T) {
t.Parallel()
now := time.Now()

testCases := BlockingSenderReceiverTestCases{
Expand All @@ -54,6 +62,7 @@ func TestBlockingSenderReceiver(t *testing.T) {
},
receiverWait: 1 * time.Second,
want: 10,
timeout: 5 * time.Second,
},
"50 at 5 second": {
now: now,
Expand All @@ -68,6 +77,7 @@ func TestBlockingSenderReceiver(t *testing.T) {
},
receiverWait: 5 * time.Second,
want: 50,
timeout: 15 * time.Second,
},
"100 at 10 seconds": {
now: now,
Expand All @@ -82,6 +92,66 @@ func TestBlockingSenderReceiver(t *testing.T) {
},
receiverWait: 10 * time.Second,
want: 100,
timeout: 30 * time.Second,
},
}

for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
ReceiverNonBlocking(t, tc)
})
}
}

func TestBlockingSenderReceiver(t *testing.T) {
t.Parallel()
now := time.Now()

testCases := BlockingSenderReceiverTestCases{
"10 at 100 milisecond": {
now: now,
event: &cloudevents.Event{
Context: cloudevents.EventContextV1{
Type: "unit.test.client.sent.10.1",
Source: *cloudevents.ParseURIRef("/unit/test/client"),
Subject: strptr("resource"),
DataContentType: cloudevents.StringOfApplicationJSON(),
}.AsV1(),
DataEncoded: toBytes(map[string]interface{}{"hello": "unittest"}),
},
receiverWait: 100 * time.Millisecond,
want: 10,
timeout: 5 * time.Second,
},
"50 at 20 milisecond": {
now: now,
event: &cloudevents.Event{
Context: cloudevents.EventContextV1{
Type: "unit.test.client.sent.50.5",
Source: *cloudevents.ParseURIRef("/unit/test/client"),
Subject: strptr("resource"),
DataContentType: cloudevents.StringOfApplicationJSON(),
}.AsV1(),
DataEncoded: toBytes(map[string]interface{}{"hello": "unittest"}),
},
receiverWait: 20 * time.Millisecond,
want: 50,
timeout: 5 * time.Second,
},
"100 at 10 milisecond": {
now: now,
event: &cloudevents.Event{
Context: cloudevents.EventContextV1{
Type: "unit.test.client.sent.100.10",
Source: *cloudevents.ParseURIRef("/unit/test/client"),
Subject: strptr("resource"),
DataContentType: cloudevents.StringOfApplicationJSON(),
}.AsV1(),
DataEncoded: toBytes(map[string]interface{}{"hello": "unittest"}),
},
receiverWait: 10 * time.Millisecond,
want: 100,
timeout: 5 * time.Second,
},
}

Expand All @@ -94,7 +164,37 @@ func TestBlockingSenderReceiver(t *testing.T) {

const verbose = false

func ReceiverBlocking(t *testing.T, tc BlockingSenderReceiverTest, copts ...client.Option) {
func ReceiverNonBlocking(t *testing.T, tc BlockingSenderReceiverTest) {
output := receive(t, tc, client.WithPollGoroutines(1))

if tc.want != output.got {
t.Errorf("expected %d, got %d", tc.want, output)
}

// Look at how long the test took.
dm := output.duration.Milliseconds()
tw := tc.receiverWait.Milliseconds() * 110 / 100 // 110% budget.
if dm > tw {
t.Errorf("expected test duration to be around ~%d ms, actual %d ms", tc.receiverWait.Milliseconds(), dm)
}
}

func ReceiverBlocking(t *testing.T, tc BlockingSenderReceiverTest) {
output := receive(t, tc, client.WithPollGoroutines(1), client.WithBlockingCallback())

if tc.want != output.got {
t.Errorf("expected %d, got %d", tc.want, output)
}

// Look at how long the test took.
dm := output.duration.Milliseconds()
tw := tc.receiverWait.Milliseconds() * int64(tc.want) // no concurrent processing
if dm < tw {
t.Errorf("expected test duration to be over %d ms, actual %d ms", tw, dm)
}
}

func receive(t *testing.T, tc BlockingSenderReceiverTest, copts ...client.Option) *BlockingSenderReceiverTestOutput {
opts := make([]cehttp.Option, 0)
opts = append(opts, cloudevents.WithPort(0)) // random port

Expand All @@ -113,7 +213,7 @@ func ReceiverBlocking(t *testing.T, tc BlockingSenderReceiverTest, copts ...clie
testID := uuid.New().String()
tc.event.SetExtension(unitTestIDKey, testID)

recvCtx, recvCancel := context.WithTimeout(context.Background(), tc.receiverWait*3)
recvCtx, recvCancel := context.WithTimeout(context.Background(), tc.timeout)
defer recvCancel()

wg := new(sync.WaitGroup)
Expand Down Expand Up @@ -141,7 +241,7 @@ func ReceiverBlocking(t *testing.T, tc BlockingSenderReceiverTest, copts ...clie

then := time.Now()

sendCtx, sendCancel := context.WithTimeout(context.Background(), tc.receiverWait*2)
sendCtx, sendCancel := context.WithTimeout(context.Background(), tc.timeout)
defer sendCancel()
sendCtx = cloudevents.ContextWithTarget(sendCtx, fmt.Sprintf("http://localhost:%d", protocol.GetListeningPort()))

Expand All @@ -159,16 +259,8 @@ func ReceiverBlocking(t *testing.T, tc BlockingSenderReceiverTest, copts ...clie

time.Sleep(tc.receiverWait) // cool off just in case we have some more sleepers.

if int32(tc.want) != got.Load() {
t.Errorf("expected %d, got %d", tc.want, got.Load())
}

// Look at how long the test took.

dm := duration.Milliseconds()
tw := tc.receiverWait.Milliseconds() * 110 / 100 // 110% budget.

if dm > tw {
t.Errorf("expected test duration to be ~%d ms, actual %d ms", tc.receiverWait.Milliseconds(), dm)
return &BlockingSenderReceiverTestOutput{
duration: duration,
got: int(got.Load()),
}
}
19 changes: 14 additions & 5 deletions v2/client/client.go
Expand Up @@ -97,6 +97,7 @@ type ceClient struct {
receiverMu sync.Mutex
eventDefaulterFns []EventDefaulter
pollGoroutines int
blockingCallback bool
}

func (c *ceClient) applyOptions(opts ...Option) error {
Expand Down Expand Up @@ -248,14 +249,22 @@ func (c *ceClient) StartReceiver(ctx context.Context, fn interface{}) error {
continue
}

// Do not block on the invoker.
wg.Add(1)
go func() {
callback := func() {
if err := c.invoker.Invoke(ctx, msg, respFn); err != nil {
cecontext.LoggerFrom(ctx).Warn("Error while handling a message: ", err)
}
wg.Done()
}()
}

if c.blockingCallback {
callback()
} else {
// Do not block on the invoker.
wg.Add(1)
go func() {
defer wg.Done()
callback()
}()
}
}
}()
}
Expand Down
13 changes: 13 additions & 0 deletions v2/client/options.go
Expand Up @@ -8,6 +8,7 @@ package client
import (
"context"
"fmt"

"github.com/cloudevents/sdk-go/v2/binding"
)

Expand Down Expand Up @@ -113,3 +114,15 @@ func WithInboundContextDecorator(dec func(context.Context, binding.Message) cont
return nil
}
}

// WithBlockingCallback makes the callback passed into StartReceiver is executed as a blocking call,
// i.e. in each poll go routine, the next event will not be received until the callback on current event completes.
// To make event processing serialized (no concurrency), use this option along with WithPollGoroutines(1)
func WithBlockingCallback() Option {
return func(i interface{}) error {
if c, ok := i.(*ceClient); ok {
c.blockingCallback = true
}
return nil
}
}

0 comments on commit 66eacad

Please sign in to comment.