Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NATS Jetstream optimistic concurrency headers #1029

Open
lilytrotter opened this issue Mar 31, 2024 · 2 comments
Open

NATS Jetstream optimistic concurrency headers #1029

lilytrotter opened this issue Mar 31, 2024 · 2 comments

Comments

@lilytrotter
Copy link

Hi, folks! Is it possible to access the special NATS headers on send, for example Nats-Expected-Last-Sequence

These headers would allow event sourcing with CloudEvents. The neat thing is that consumers don't need to do anything. It's more like a quality of service between NATS client and the server.

If not currently possible, are there workarounds? Could we find a way to add support without breaking the current API?

@embano1
Copy link
Member

embano1 commented Apr 1, 2024

If I understand the current nats_jetstream protocol implementation correctly, it ignores this header during the CE conversion:

func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error {
if m.encoding != binding.EncodingStructured {
return binding.ErrNotStructured
}
return encoder.SetStructuredEvent(ctx, format.JSON, bytes.NewReader(m.Msg.Data))
}
// ReadBinary transfers a binary-mode event to an BinaryWriter.
func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) error {
if m.encoding != binding.EncodingBinary {
return binding.ErrNotBinary
}
version := m.GetVersion()
if version == nil {
return binding.ErrNotBinary
}
var err error
for k, v := range m.Msg.Header {
headerValue := v[0]
if strings.HasPrefix(k, prefix) {
attr := version.Attribute(k)
if attr != nil {
err = encoder.SetAttribute(attr, headerValue)
} else {
err = encoder.SetExtension(strings.TrimPrefix(k, prefix), headerValue)
}
} else if k == contentTypeHeader {
err = encoder.SetAttribute(version.AttributeFromKind(spec.DataContentType), headerValue)
}
if err != nil {
return err
}
}
if m.Msg.Data != nil {
err = encoder.SetData(bytes.NewBuffer(m.Msg.Data))
}
return err

IMHO it can be added without breaking the API similar to what we do in the Kafka protocol bindings.

@dan-j
Copy link
Contributor

dan-j commented Apr 18, 2024

I'd be happy with a change to enable this, but not sure on the correct approach. I'm not too familiar to how other protocol bindings work, but http just has an exported struct field:

// Message holds the Header and Body of a HTTP Request or Response.
// The Message instance *must* be constructed from NewMessage function.
// This message *cannot* be read several times. In order to read it more times, buffer it using binding/buffering methods
type Message struct {
Header nethttp.Header
BodyReader io.ReadCloser
OnFinish func(error) error
ctx context.Context
format format.Format
version spec.Version
}

If you did something similar, users can then just do the following:

var msg binding.Message

if msg, ok := msg.(*jetstreamv2.Message); ok {
  fmt.Println(msg.Header)
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants