diff --git a/test/integration/http/blocking_test.go b/test/integration/http/blocking_test.go index ed63caae2..7814cc28a 100644 --- a/test/integration/http/blocking_test.go +++ b/test/integration/http/blocking_test.go @@ -8,11 +8,12 @@ package http import ( "context" "fmt" - "go.uber.org/atomic" "sync" "testing" "time" + "go.uber.org/atomic" + "github.com/cloudevents/sdk-go/v2/client" "github.com/google/uuid" @@ -32,12 +33,19 @@ type BlockingSenderReceiverTest struct { now time.Time event *cloudevents.Event receiverWait time.Duration + timeout time.Duration want int } +type BlockingSenderReceiverTestOutput struct { + duration time.Duration + got int +} + type BlockingSenderReceiverTestCases map[string]BlockingSenderReceiverTest -func TestBlockingSenderReceiver(t *testing.T) { +func TestNonBlockingSenderReceiver(t *testing.T) { + t.Parallel() now := time.Now() testCases := BlockingSenderReceiverTestCases{ @@ -54,6 +62,7 @@ func TestBlockingSenderReceiver(t *testing.T) { }, receiverWait: 1 * time.Second, want: 10, + timeout: 5 * time.Second, }, "50 at 5 second": { now: now, @@ -68,6 +77,7 @@ func TestBlockingSenderReceiver(t *testing.T) { }, receiverWait: 5 * time.Second, want: 50, + timeout: 15 * time.Second, }, "100 at 10 seconds": { now: now, @@ -82,6 +92,66 @@ func TestBlockingSenderReceiver(t *testing.T) { }, receiverWait: 10 * time.Second, want: 100, + timeout: 30 * time.Second, + }, + } + + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + ReceiverNonBlocking(t, tc) + }) + } +} + +func TestBlockingSenderReceiver(t *testing.T) { + t.Parallel() + now := time.Now() + + testCases := BlockingSenderReceiverTestCases{ + "10 at 100 milisecond": { + now: now, + event: &cloudevents.Event{ + Context: cloudevents.EventContextV1{ + Type: "unit.test.client.sent.10.1", + Source: *cloudevents.ParseURIRef("/unit/test/client"), + Subject: strptr("resource"), + DataContentType: cloudevents.StringOfApplicationJSON(), + }.AsV1(), + DataEncoded: toBytes(map[string]interface{}{"hello": "unittest"}), + }, + receiverWait: 100 * time.Millisecond, + want: 10, + timeout: 5 * time.Second, + }, + "50 at 20 milisecond": { + now: now, + event: &cloudevents.Event{ + Context: cloudevents.EventContextV1{ + Type: "unit.test.client.sent.50.5", + Source: *cloudevents.ParseURIRef("/unit/test/client"), + Subject: strptr("resource"), + DataContentType: cloudevents.StringOfApplicationJSON(), + }.AsV1(), + DataEncoded: toBytes(map[string]interface{}{"hello": "unittest"}), + }, + receiverWait: 20 * time.Millisecond, + want: 50, + timeout: 5 * time.Second, + }, + "100 at 10 milisecond": { + now: now, + event: &cloudevents.Event{ + Context: cloudevents.EventContextV1{ + Type: "unit.test.client.sent.100.10", + Source: *cloudevents.ParseURIRef("/unit/test/client"), + Subject: strptr("resource"), + DataContentType: cloudevents.StringOfApplicationJSON(), + }.AsV1(), + DataEncoded: toBytes(map[string]interface{}{"hello": "unittest"}), + }, + receiverWait: 10 * time.Millisecond, + want: 100, + timeout: 5 * time.Second, }, } @@ -94,7 +164,37 @@ func TestBlockingSenderReceiver(t *testing.T) { const verbose = false -func ReceiverBlocking(t *testing.T, tc BlockingSenderReceiverTest, copts ...client.Option) { +func ReceiverNonBlocking(t *testing.T, tc BlockingSenderReceiverTest) { + output := receive(t, tc, client.WithPollGoroutines(1)) + + if tc.want != output.got { + t.Errorf("expected %d, got %d", tc.want, output) + } + + // Look at how long the test took. + dm := output.duration.Milliseconds() + tw := tc.receiverWait.Milliseconds() * 110 / 100 // 110% budget. + if dm > tw { + t.Errorf("expected test duration to be around ~%d ms, actual %d ms", tc.receiverWait.Milliseconds(), dm) + } +} + +func ReceiverBlocking(t *testing.T, tc BlockingSenderReceiverTest) { + output := receive(t, tc, client.WithPollGoroutines(1), client.WithBlockingCallback()) + + if tc.want != output.got { + t.Errorf("expected %d, got %d", tc.want, output) + } + + // Look at how long the test took. + dm := output.duration.Milliseconds() + tw := tc.receiverWait.Milliseconds() * int64(tc.want) // no concurrent processing + if dm < tw { + t.Errorf("expected test duration to be over %d ms, actual %d ms", tw, dm) + } +} + +func receive(t *testing.T, tc BlockingSenderReceiverTest, copts ...client.Option) *BlockingSenderReceiverTestOutput { opts := make([]cehttp.Option, 0) opts = append(opts, cloudevents.WithPort(0)) // random port @@ -113,7 +213,7 @@ func ReceiverBlocking(t *testing.T, tc BlockingSenderReceiverTest, copts ...clie testID := uuid.New().String() tc.event.SetExtension(unitTestIDKey, testID) - recvCtx, recvCancel := context.WithTimeout(context.Background(), tc.receiverWait*3) + recvCtx, recvCancel := context.WithTimeout(context.Background(), tc.timeout) defer recvCancel() wg := new(sync.WaitGroup) @@ -141,7 +241,7 @@ func ReceiverBlocking(t *testing.T, tc BlockingSenderReceiverTest, copts ...clie then := time.Now() - sendCtx, sendCancel := context.WithTimeout(context.Background(), tc.receiverWait*2) + sendCtx, sendCancel := context.WithTimeout(context.Background(), tc.timeout) defer sendCancel() sendCtx = cloudevents.ContextWithTarget(sendCtx, fmt.Sprintf("http://localhost:%d", protocol.GetListeningPort())) @@ -159,16 +259,8 @@ func ReceiverBlocking(t *testing.T, tc BlockingSenderReceiverTest, copts ...clie time.Sleep(tc.receiverWait) // cool off just in case we have some more sleepers. - if int32(tc.want) != got.Load() { - t.Errorf("expected %d, got %d", tc.want, got.Load()) - } - - // Look at how long the test took. - - dm := duration.Milliseconds() - tw := tc.receiverWait.Milliseconds() * 110 / 100 // 110% budget. - - if dm > tw { - t.Errorf("expected test duration to be ~%d ms, actual %d ms", tc.receiverWait.Milliseconds(), dm) + return &BlockingSenderReceiverTestOutput{ + duration: duration, + got: int(got.Load()), } } diff --git a/v2/client/client.go b/v2/client/client.go index ee2d51abc..ea8fbfbb4 100644 --- a/v2/client/client.go +++ b/v2/client/client.go @@ -97,6 +97,7 @@ type ceClient struct { receiverMu sync.Mutex eventDefaulterFns []EventDefaulter pollGoroutines int + blockingCallback bool } func (c *ceClient) applyOptions(opts ...Option) error { @@ -248,14 +249,22 @@ func (c *ceClient) StartReceiver(ctx context.Context, fn interface{}) error { continue } - // Do not block on the invoker. - wg.Add(1) - go func() { + callback := func() { if err := c.invoker.Invoke(ctx, msg, respFn); err != nil { cecontext.LoggerFrom(ctx).Warn("Error while handling a message: ", err) } - wg.Done() - }() + } + + if c.blockingCallback { + callback() + } else { + // Do not block on the invoker. + wg.Add(1) + go func() { + defer wg.Done() + callback() + }() + } } }() } diff --git a/v2/client/options.go b/v2/client/options.go index d0fe9dbaa..938478162 100644 --- a/v2/client/options.go +++ b/v2/client/options.go @@ -8,6 +8,7 @@ package client import ( "context" "fmt" + "github.com/cloudevents/sdk-go/v2/binding" ) @@ -113,3 +114,15 @@ func WithInboundContextDecorator(dec func(context.Context, binding.Message) cont return nil } } + +// WithBlockingCallback makes the callback passed into StartReceiver is executed as a blocking call, +// i.e. in each poll go routine, the next event will not be received until the callback on current event completes. +// To make event processing serialized (no concurrency), use this option along with WithPollGoroutines(1) +func WithBlockingCallback() Option { + return func(i interface{}) error { + if c, ok := i.(*ceClient); ok { + c.blockingCallback = true + } + return nil + } +}