Skip to content

Commit

Permalink
Merge branch 'main' into jnm/implement_server_PR_3454
Browse files Browse the repository at this point in the history
  • Loading branch information
jnmoyne committed Sep 13, 2022
2 parents 2a90615 + 25b6392 commit 2e2bbae
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 19 deletions.
4 changes: 2 additions & 2 deletions context.go
@@ -1,4 +1,4 @@
// Copyright 2016-2018 The NATS Authors
// Copyright 2016-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -85,7 +85,7 @@ func (nc *Conn) requestWithContext(ctx context.Context, subj string, hdr, data [

// oldRequestWithContext utilizes inbox and subscription per request.
func (nc *Conn) oldRequestWithContext(ctx context.Context, subj string, hdr, data []byte) (*Msg, error) {
inbox := nc.newInbox()
inbox := nc.NewInbox()
ch := make(chan *Msg, RequestChanLen)

s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, true, nil)
Expand Down
13 changes: 11 additions & 2 deletions go_test.mod
Expand Up @@ -3,9 +3,18 @@ module github.com/nats-io/nats.go
go 1.16

require (
github.com/golang/protobuf v1.5.2
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.9.0
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.28.1
google.golang.org/protobuf v1.23.0
)

require (
github.com/klauspost/compress v1.15.9 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.3.0 // indirect
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 // indirect
golang.org/x/sys v0.0.0-20220906135438-9e1f76180b77 // indirect
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 // indirect
)
6 changes: 3 additions & 3 deletions js.go
Expand Up @@ -1532,7 +1532,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
if o.cfg.DeliverSubject != _EMPTY_ {
deliver = o.cfg.DeliverSubject
} else if !isPullMode {
deliver = nc.newInbox()
deliver = nc.NewInbox()
cfg.DeliverSubject = deliver
}

Expand Down Expand Up @@ -1572,7 +1572,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,

if isPullMode {
nms = fmt.Sprintf(js.apiSubj(apiRequestNextT), stream, consumer)
deliver = nc.newInbox()
deliver = nc.NewInbox()
}

// In case this has a context, then create a child context that
Expand Down Expand Up @@ -1921,7 +1921,7 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
osid := sub.applyNewSID()

// Grab new inbox.
newDeliver := nc.newInbox()
newDeliver := nc.NewInbox()
sub.Subject = newDeliver

// Snapshot the new sid under sub lock.
Expand Down
49 changes: 45 additions & 4 deletions nats.go
Expand Up @@ -624,11 +624,44 @@ type Msg struct {
Header Header
Data []byte
Sub *Subscription
// Internal
next *Msg
wsz int
barrier *barrierInfo
ackd uint32
}

// Compares two msgs, ignores sub but checks all other public fields.
func (m *Msg) Equal(msg *Msg) bool {
if m == msg {
return true
}
if m == nil || msg == nil {
return false
}
if m.Subject != msg.Subject || m.Reply != msg.Reply {
return false
}
if !bytes.Equal(m.Data, msg.Data) {
return false
}
if len(m.Header) != len(msg.Header) {
return false
}
for k, v := range m.Header {
val, ok := msg.Header[k]
if !ok || len(v) != len(val) {
return false
}
for i, hdr := range v {
if hdr != val[i] {
return false
}
}
}
return true
}

func (m *Msg) headerBytes() ([]byte, error) {
var hdr []byte
if len(m.Header) == 0 {
Expand Down Expand Up @@ -2908,7 +2941,14 @@ func (nc *Conn) processMsg(data []byte) {
}

// FIXME(dlc): Should we recycle these containers?
m := &Msg{Header: h, Data: msgPayload, Subject: subj, Reply: reply, Sub: sub}
m := &Msg{
Subject: subj,
Reply: reply,
Header: h,
Data: msgPayload,
Sub: sub,
wsz: len(data) + len(subj) + len(reply),
}

// Check for message filters.
if mf != nil {
Expand Down Expand Up @@ -3753,7 +3793,7 @@ func (nc *Conn) newRequest(subj string, hdr, data []byte, timeout time.Duration)
// with the Inbox reply and return the first reply received.
// This is optimized for the case of multiple responses.
func (nc *Conn) oldRequest(subj string, hdr, data []byte, timeout time.Duration) (*Msg, error) {
inbox := nc.newInbox()
inbox := nc.NewInbox()
ch := make(chan *Msg, RequestChanLen)

s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, true, nil)
Expand Down Expand Up @@ -3792,7 +3832,8 @@ func NewInbox() string {
return string(b[:])
}

func (nc *Conn) newInbox() string {
// Create a new inbox that is prefix aware.
func (nc *Conn) NewInbox() string {
if nc.Opts.InboxPrefix == _EMPTY_ {
return NewInbox()
}
Expand All @@ -3806,7 +3847,7 @@ func (nc *Conn) newInbox() string {

// Function to init new response structures.
func (nc *Conn) initNewResp() {
nc.respSubPrefix = fmt.Sprintf("%s.", nc.newInbox())
nc.respSubPrefix = fmt.Sprintf("%s.", nc.NewInbox())
nc.respSubLen = len(nc.respSubPrefix)
nc.respSub = fmt.Sprintf("%s*", nc.respSubPrefix)
nc.respMap = make(map[string]chan *Msg)
Expand Down
10 changes: 2 additions & 8 deletions test/headers_test.go
Expand Up @@ -58,9 +58,7 @@ func TestBasicHeaders(t *testing.T) {
t.Fatalf("Did not receive response: %v", err)
}

// Blank out the sub since its not present in the original.
msg.Sub = nil
if !reflect.DeepEqual(m, msg) {
if !m.Equal(msg) {
t.Fatalf("Messages did not match! \n%+v\n%+v\n", m, msg)
}
}
Expand Down Expand Up @@ -277,12 +275,8 @@ func TestMsgHeadersCasePreserving(t *testing.T) {
if err != nil {
t.Fatalf("Did not receive response: %v", err)
}

// Blank out the sub since its not present in the original.
msg.Sub = nil

// Confirm that received message is just like the one originally sent.
if !reflect.DeepEqual(m, msg) {
if !m.Equal(msg) {
t.Fatalf("Messages did not match! \n%+v\n%+v\n", m, msg)
}

Expand Down
2 changes: 2 additions & 0 deletions test/js_test.go
Expand Up @@ -7587,6 +7587,8 @@ func TestJetStreamSubscribeContextCancel(t *testing.T) {
}

func TestJetStreamClusterStreamLeaderChangeClientErr(t *testing.T) {
t.Skip("The 2.9 server changed behavior making this test fail now")

cfg := &nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Expand Down

0 comments on commit 2e2bbae

Please sign in to comment.