Skip to content

Commit

Permalink
Allow setting headers on Kafka messages (#294)
Browse files Browse the repository at this point in the history
Fixes #293
  • Loading branch information
gmaz42 committed Nov 13, 2020
1 parent 84722bb commit 59b25c7
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 4 deletions.
16 changes: 12 additions & 4 deletions client/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ var (

// Message abstraction of a Kafka message.
type Message struct {
topic string
body interface{}
key *string
topic string
body interface{}
key *string
headers kafkaHeadersCarrier
}

func init() {
Expand All @@ -70,6 +71,13 @@ func NewMessage(t string, b interface{}) *Message {
return &Message{topic: t, body: b}
}

// SetHeader allows to set a message header.
// Multiple headers with the same key are supported.
// Headers are only set if Kafka is version 0.11+
func (m *Message) SetHeader(key, value string) {
m.headers.Set(key, value)
}

// NewMessageWithKey creates a new message with an associated key.
func NewMessageWithKey(t string, b interface{}, k string) (*Message, error) {
if k == "" {
Expand All @@ -93,7 +101,7 @@ func (p *baseProducer) ActiveBrokers() []string {
}

func (p *baseProducer) createProducerMessage(ctx context.Context, msg *Message, sp opentracing.Span) (*sarama.ProducerMessage, error) {
c := kafkaHeadersCarrier{}
c := msg.headers
err := sp.Tracer().Inject(sp.Context(), opentracing.TextMap, &c)
if err != nil {
return nil, fmt.Errorf("failed to inject tracing headers: %w", err)
Expand Down
34 changes: 34 additions & 0 deletions client/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,37 @@ func TestNewSyncProducer_Option_Failure(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, got)
}

func TestNewMessageWithHeader(t *testing.T) {
tests := []struct {
name string
data []byte
setHeaderKeys []string
setHeaderValues []string
expectedHeaderKeys []string
expectedHeaderValues []string
}{
{name: "2-headers", data: []byte("TEST"),
setHeaderKeys: []string{"header1", "header2"}, setHeaderValues: []string{"value1", "value2"},
expectedHeaderKeys: []string{"header1", "header2"}, expectedHeaderValues: []string{"value1", "value2"}},
{name: "2-headers", data: []byte("TEST"),
setHeaderKeys: []string{"header1", "header1"}, setHeaderValues: []string{"value1", "value2"},
expectedHeaderKeys: []string{"header1", "header1"}, expectedHeaderValues: []string{"value1", "value2"}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
msg := NewMessage("TOPIC", tt.data)

// set message headers
for i := 0; i < len(tt.setHeaderKeys); i++ {
msg.SetHeader(tt.setHeaderKeys[i], tt.setHeaderValues[i])
}

// verify
for i := 0; i < len(tt.expectedHeaderKeys); i++ {
assert.Equal(t, string(msg.headers[i].Key), tt.expectedHeaderKeys[i])
assert.Equal(t, string(msg.headers[i].Value), tt.expectedHeaderValues[i])
}
})
}
}

0 comments on commit 59b25c7

Please sign in to comment.