Skip to content

Commit

Permalink
SQS Parallel processing support (#275)
Browse files Browse the repository at this point in the history
Signed-off-by: Martin Bechtle <m.bechtle@thebeat.co>
  • Loading branch information
MartinBechtle committed Nov 23, 2020
1 parent 59b25c7 commit 51c8861
Show file tree
Hide file tree
Showing 17 changed files with 328 additions and 120 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Expand Up @@ -22,4 +22,7 @@ gotestsum-report.xml
gotestsum-report.xml

# coverage file
coverage.txt
coverage.txt

# patron binaries
/cmd/patron/patron
3 changes: 2 additions & 1 deletion README.md
Expand Up @@ -152,7 +152,8 @@ Detailed examples can be found in the [examples](/examples) folder with the foll
- [Kafka Component, HTTP Component, HTTP Authentication, Kafka Tracing](/examples/kafka/main.go)
- [Kafka Component, AMQP Tracing](/examples/amqp/main.go)
- [AMQP Component, AWS SNS](/examples/sns/main.go)
- [AWS SQS](/examples/sqs/main.go)
- [AWS SQS consumer performing gRPC request](/examples/sqs/main.go)
- [AWS SQS consumer, highly customised](/examples/sqs-simple/main.go)
- [gRPC](/examples/grpc/main.go)

## Processors
Expand Down
4 changes: 4 additions & 0 deletions component/async/amqp/amqp.go
Expand Up @@ -157,6 +157,10 @@ type consumer struct {
conn *amqp.Connection
}

func (c *consumer) OutOfOrder() bool {
return true
}

// Consume starts of consuming a AMQP queue.
func (c *consumer) Consume(ctx context.Context) (<-chan async.Message, <-chan error, error) {
deliveries, err := c.consume()
Expand Down
4 changes: 3 additions & 1 deletion component/async/amqp/amqp_test.go
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/opentracing/opentracing-go/mocktracer"
"github.com/streadway/amqp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var validExch, _ = NewExchange("e", amqp.ExchangeDirect)
Expand Down Expand Up @@ -130,7 +131,8 @@ func TestFactory_Create(t *testing.T) {
assert.Nil(t, got)
} else {
assert.NoError(t, err)
assert.NotNil(t, got)
require.NotNil(t, got)
assert.True(t, got.OutOfOrder())
}
})
}
Expand Down
1 change: 1 addition & 0 deletions component/async/async.go
Expand Up @@ -43,6 +43,7 @@ type ConsumerFactory interface {
// Consumer interface which every specific consumer has to implement.
type Consumer interface {
Consume(context.Context) (<-chan Message, <-chan error, error)
OutOfOrder() bool
Close() error
}

Expand Down
48 changes: 46 additions & 2 deletions component/async/component.go
Expand Up @@ -40,6 +40,9 @@ type Component struct {
cf ConsumerFactory
retries int
retryWait time.Duration
concurrency int
jobs chan Message
jobErr chan error
}

// Builder gathers all required properties in order to construct a component
Expand All @@ -51,6 +54,7 @@ type Builder struct {
cf ConsumerFactory
retries uint
retryWait time.Duration
concurrency uint
}

// New initializes a new builder for a component with the given name
Expand Down Expand Up @@ -95,6 +99,15 @@ func (cb *Builder) WithRetries(retries uint) *Builder {
return cb
}

// WithConcurrency specifies the number of worker goroutines for processing messages in parallel
// default value is '1'
// do NOT enable concurrency value for in-order consumers, such as Kafka or FIFO SQS
func (cb *Builder) WithConcurrency(concurrency uint) *Builder {
log.Infof(propSetMSG, "concurrency", cb.name)
cb.concurrency = concurrency
return cb
}

// WithRetryWait specifies the duration for the component to wait between retries
// default value is '0'
// it will append an error to the builder if the value is smaller than '0'.
Expand All @@ -121,6 +134,15 @@ func (cb *Builder) Create() (*Component, error) {
failStrategy: cb.failStrategy,
retries: int(cb.retries),
retryWait: cb.retryWait,
concurrency: int(cb.concurrency),
jobs: make(chan Message),
jobErr: make(chan error),
}

if cb.concurrency > 1 {
for w := 1; w <= c.concurrency; w++ {
go c.worker()
}
}

return c, nil
Expand All @@ -145,11 +167,15 @@ func (c *Component) Run(ctx context.Context) error {
}
}

close(c.jobs)
return err
}

func (c *Component) processing(ctx context.Context) error {
cns, err := c.cf.Create()
if c.concurrency > 1 && !cns.OutOfOrder() {
return fmt.Errorf("async component creation: cannot create in-order component with concurrency > 1")
}
if err != nil {
return fmt.Errorf("failed to create consumer: %w", err)
}
Expand All @@ -169,7 +195,7 @@ func (c *Component) processing(ctx context.Context) error {
select {
case msg := <-chMsg:
log.FromContext(msg.Context()).Debug("consumer received a new message")
err := c.processMessage(msg)
err := c.dispatchMessage(msg)
if err != nil {
return err
}
Expand All @@ -180,19 +206,37 @@ func (c *Component) processing(ctx context.Context) error {
return cns.Close()
case err := <-chErr:
return fmt.Errorf("an error occurred during message consumption: %w", err)
case err := <-c.jobErr:
return fmt.Errorf("an error occurred during concurrent message consumption: %w", err)
}
}
}

func (c *Component) dispatchMessage(msg Message) error {
if c.concurrency > 1 {
c.jobs <- msg
return nil
}
return c.processMessage(msg)
}

func (c *Component) processMessage(msg Message) error {
err := c.proc(msg)
if err != nil {
return c.executeFailureStrategy(msg, err)
}

return msg.Ack()
}

func (c *Component) worker() {
for msg := range c.jobs {
err := c.processMessage(msg)
if err != nil {
c.jobErr <- err
}
}
}

var errInvalidFS = errors.New("invalid failure strategy")

func (c *Component) executeFailureStrategy(msg Message, err error) error {
Expand Down
91 changes: 74 additions & 17 deletions component/async/component_test.go
Expand Up @@ -3,11 +3,12 @@ package async
import (
"context"
"errors"
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestNew(t *testing.T) {
Expand Down Expand Up @@ -75,12 +76,13 @@ func TestNew(t *testing.T) {
}

type proxyBuilder struct {
proc mockProcessor
cnr mockConsumer
cf ConsumerFactory
fs FailStrategy
retries int
retryWait time.Duration
proc mockProcessor
cnr mockConsumer
cf ConsumerFactory
fs FailStrategy
retries int
retryWait time.Duration
concurrency uint
}

func run(ctx context.Context, t *testing.T, builder *proxyBuilder) error {
Expand All @@ -92,20 +94,41 @@ func run(ctx context.Context, t *testing.T, builder *proxyBuilder) error {
WithFailureStrategy(builder.fs).
WithRetries(uint(builder.retries)).
WithRetryWait(builder.retryWait).
WithConcurrency(builder.concurrency).
Create()
assert.NoError(t, err)
return cmp.Run(ctx)
}

// TestCreate_ReturnsError expects an error when concurrency > 1 and component does not allow out of order processing
func TestCreate_ReturnsError(t *testing.T) {
cnr := mockConsumer{}
builder := proxyBuilder{
cnr: cnr,
cf: &mockConsumerFactory{c: &cnr},
concurrency: 2,
}
cmp, err := New("test", builder.cf, builder.proc.Process).
WithFailureStrategy(builder.fs).
WithRetries(uint(builder.retries)).
WithRetryWait(builder.retryWait).
WithConcurrency(builder.concurrency).
Create()
require.NotNil(t, cmp)
require.NoError(t, err)
got := cmp.processing(context.Background())
want := "async component creation: cannot create in-order component with concurrency > 1"
assert.EqualError(t, got, want)
}

// TestRun_ReturnsError expects a consumer consume Error
func TestRun_ReturnsError(t *testing.T) {
builder := proxyBuilder{
cnr: mockConsumer{consumeError: true},
}
err := run(context.Background(), t, &builder)

assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), errConsumer.Error()))
assert.True(t, errors.Is(err, errConsumer))
assert.Equal(t, 0, builder.proc.execs)
}

Expand Down Expand Up @@ -144,7 +167,7 @@ func TestRun_Process_Error_NackExitStrategy(t *testing.T) {
err := run(ctx, t, &builder)

assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), errProcess.Error()))
assert.True(t, errors.Is(err, errProcess))
assert.Equal(t, 1, builder.proc.execs)
}

Expand Down Expand Up @@ -202,7 +225,31 @@ func TestRun_ProcessError_WithNackError(t *testing.T) {
err := run(ctx, t, &builder)

assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), errNack.Error()))
assert.True(t, errors.Is(err, errNack))
assert.Equal(t, 1, builder.proc.execs)
}

// TestRun_ParallelProcessError_WithNackError expects a PROC ERROR
// same as TestRun_ProcessError_WithNackError, just with concurrency
func TestRun_ParallelProcessError_WithNackError(t *testing.T) {
builder := proxyBuilder{
proc: mockProcessor{errReturn: true},
cnr: mockConsumer{
chMsg: make(chan Message, 10),
chErr: make(chan error, 10),
outOfOrder: true,
},
fs: NackStrategy,
concurrency: 10,
}

ctx := context.Background()
builder.cnr.chMsg <- &mockMessage{ctx: ctx, nackError: true}

err := run(ctx, t, &builder)

assert.Error(t, err)
assert.True(t, errors.Is(err, errNack))
assert.Equal(t, 1, builder.proc.execs)
}

Expand Down Expand Up @@ -260,7 +307,7 @@ func TestRun_ProcessError_WithAckError(t *testing.T) {
err := run(ctx, t, &builder)

assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), errAck.Error()))
assert.True(t, errors.Is(err, errAck))
assert.Equal(t, 1, builder.proc.execs)
}

Expand Down Expand Up @@ -298,8 +345,7 @@ func TestRun_ConsumeError(t *testing.T) {
builder.cnr.chErr <- errConsumer
err := run(ctx, t, &builder)

assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), errConsumer.Error()))
assert.True(t, errors.Is(err, errConsumer))
assert.Equal(t, 0, builder.proc.execs)
}

Expand Down Expand Up @@ -398,7 +444,7 @@ func (mm *mockMessage) Context() context.Context {
}

// Decode is not called in our tests, because the mockProcessor will ignore the message decoding
func (mm *mockMessage) Decode(v interface{}) error {
func (mm *mockMessage) Decode(interface{}) error {
return nil
}

Expand Down Expand Up @@ -430,19 +476,28 @@ func (mm *mockMessage) Payload() []byte {

type mockProcessor struct {
errReturn bool
mux sync.Mutex
execs int
}

var errProcess = errors.New("PROC ERROR")

func (mp *mockProcessor) Process(msg Message) error {
func (mp *mockProcessor) Process(Message) error {
mp.mux.Lock()
mp.execs++
mp.mux.Unlock()
if mp.errReturn {
return errProcess
}
return nil
}

func (mp *mockProcessor) GetExecs() int {
mp.mux.Lock()
defer mp.mux.Unlock()
return mp.execs
}

type mockConsumerFactory struct {
c Consumer
errRet bool
Expand All @@ -464,9 +519,11 @@ type mockConsumer struct {
clsError bool
chMsg chan Message
chErr chan error
outOfOrder bool
}

func (mc *mockConsumer) SetTimeout(timeout time.Duration) {
func (mc *mockConsumer) OutOfOrder() bool {
return mc.outOfOrder
}

var errConsumer = errors.New("CONSUMER ERROR")
Expand Down
4 changes: 4 additions & 0 deletions component/async/kafka/group/group.go
Expand Up @@ -45,6 +45,10 @@ func New(name, group string, topics, brokers []string, oo ...kafka.OptionFunc) (
return &Factory{name: name, group: group, topics: topics, brokers: brokers, oo: oo}, nil
}

func (c *consumer) OutOfOrder() bool {
return false
}

// Create a new consumer.
func (f *Factory) Create() (async.Consumer, error) {

Expand Down

0 comments on commit 51c8861

Please sign in to comment.