Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some Updates #1073

Merged
merged 5 commits into from Sep 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions context.go
@@ -1,4 +1,4 @@
// Copyright 2016-2018 The NATS Authors
// Copyright 2016-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -85,7 +85,7 @@ func (nc *Conn) requestWithContext(ctx context.Context, subj string, hdr, data [

// oldRequestWithContext utilizes inbox and subscription per request.
func (nc *Conn) oldRequestWithContext(ctx context.Context, subj string, hdr, data []byte) (*Msg, error) {
inbox := nc.newInbox()
inbox := nc.NewInbox()
ch := make(chan *Msg, RequestChanLen)

s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, true, nil)
Expand Down
8 changes: 4 additions & 4 deletions go_test.mod
Expand Up @@ -4,7 +4,7 @@ go 1.17

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.8.5-0.20220803150712-d7847c97c116
github.com/nats-io/nats-server/v2 v2.9.0
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
Expand All @@ -14,7 +14,7 @@ require (
github.com/klauspost/compress v1.15.9 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.3.0 // indirect
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
golang.org/x/time v0.0.0-20220411224347-583f2d630306 // indirect
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 // indirect
golang.org/x/sys v0.0.0-20220906135438-9e1f76180b77 // indirect
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 // indirect
)
18 changes: 9 additions & 9 deletions go_test.sum
Expand Up @@ -20,9 +20,9 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI=
github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.8.5-0.20220803150712-d7847c97c116 h1:NoZ5jkLgMNijnDh96QENq4M06AF34GXlvaYtHGXm/Jk=
github.com/nats-io/nats-server/v2 v2.8.5-0.20220803150712-d7847c97c116/go.mod h1:3Yg3ApyQxPlAs1KKHKV5pobV5VtZk+TtOiUJx/iqkkg=
github.com/nats-io/nats.go v1.16.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats-server/v2 v2.9.0 h1:DLWu+7/VgGOoChcDKytnUZPAmudpv7o/MhKmNrnH1RE=
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
github.com/nats-io/nats-server/v2 v2.9.0/go.mod h1:BWKY6217RvhI+FDoOLZ2BH+hOC37xeKRBlQ1Lz7teKI=
github.com/nats-io/nats.go v1.16.1-0.20220906180156-a1017eec10b0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand All @@ -34,21 +34,21 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
go.uber.org/automaxprocs v1.5.1/go.mod h1:BF4eumQw0P9GtnuxxovUd06vwm1o18oMzFtK66vU6XU=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e h1:T8NU3HyQ8ClP4SEE+KbFlg6n0NhuTsN4MyznaarGsZM=
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 h1:Y/gsMcFOcR+6S6f3YeMKl5g+dZMEWqcz5Czj/GWYbkM=
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220906135438-9e1f76180b77 h1:C1tElbkWrsSkn3IRl1GCW/gETw1TywWIPgwZtXTZbYg=
golang.org/x/sys v0.0.0-20220906135438-9e1f76180b77/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20220411224347-583f2d630306 h1:+gHMid33q6pen7kv9xvT+JRinntgeXO2AeZVd0AWD3w=
golang.org/x/time v0.0.0-20220411224347-583f2d630306/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 h1:ftMN5LMiBFjbzleLqtoBZk7KdJwhuybIU+FckUHgoyQ=
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
6 changes: 3 additions & 3 deletions js.go
Expand Up @@ -1532,7 +1532,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
if o.cfg.DeliverSubject != _EMPTY_ {
deliver = o.cfg.DeliverSubject
} else if !isPullMode {
deliver = nc.newInbox()
deliver = nc.NewInbox()
cfg.DeliverSubject = deliver
}

Expand Down Expand Up @@ -1572,7 +1572,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,

if isPullMode {
nms = fmt.Sprintf(js.apiSubj(apiRequestNextT), stream, consumer)
deliver = nc.newInbox()
deliver = nc.NewInbox()
}

// In case this has a context, then create a child context that
Expand Down Expand Up @@ -1921,7 +1921,7 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
osid := sub.applyNewSID()

// Grab new inbox.
newDeliver := nc.newInbox()
newDeliver := nc.NewInbox()
sub.Subject = newDeliver

// Snapshot the new sid under sub lock.
Expand Down
49 changes: 45 additions & 4 deletions nats.go
Expand Up @@ -624,11 +624,44 @@ type Msg struct {
Header Header
Data []byte
Sub *Subscription
// Internal
next *Msg
wsz int
barrier *barrierInfo
ackd uint32
}

// Compares two msgs, ignores sub but checks all other public fields.
func (m *Msg) Equal(msg *Msg) bool {
if m == msg {
return true
}
if m == nil || msg == nil {
return false
}
if m.Subject != msg.Subject || m.Reply != msg.Reply {
return false
}
if !bytes.Equal(m.Data, msg.Data) {
return false
}
if len(m.Header) != len(msg.Header) {
return false
}
for k, v := range m.Header {
val, ok := msg.Header[k]
if !ok || len(v) != len(val) {
return false
}
for i, hdr := range v {
if hdr != val[i] {
return false
}
}
}
return true
}

func (m *Msg) headerBytes() ([]byte, error) {
var hdr []byte
if len(m.Header) == 0 {
Expand Down Expand Up @@ -2908,7 +2941,14 @@ func (nc *Conn) processMsg(data []byte) {
}

// FIXME(dlc): Should we recycle these containers?
m := &Msg{Header: h, Data: msgPayload, Subject: subj, Reply: reply, Sub: sub}
m := &Msg{
Subject: subj,
Reply: reply,
Header: h,
Data: msgPayload,
Sub: sub,
wsz: len(data) + len(subj) + len(reply),
kozlovic marked this conversation as resolved.
Show resolved Hide resolved
}

// Check for message filters.
if mf != nil {
Expand Down Expand Up @@ -3753,7 +3793,7 @@ func (nc *Conn) newRequest(subj string, hdr, data []byte, timeout time.Duration)
// with the Inbox reply and return the first reply received.
// This is optimized for the case of multiple responses.
func (nc *Conn) oldRequest(subj string, hdr, data []byte, timeout time.Duration) (*Msg, error) {
inbox := nc.newInbox()
inbox := nc.NewInbox()
ch := make(chan *Msg, RequestChanLen)

s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, true, nil)
Expand Down Expand Up @@ -3792,7 +3832,8 @@ func NewInbox() string {
return string(b[:])
}

func (nc *Conn) newInbox() string {
// Create a new inbox that is prefix aware.
func (nc *Conn) NewInbox() string {
if nc.Opts.InboxPrefix == _EMPTY_ {
return NewInbox()
}
Expand All @@ -3806,7 +3847,7 @@ func (nc *Conn) newInbox() string {

// Function to init new response structures.
func (nc *Conn) initNewResp() {
nc.respSubPrefix = fmt.Sprintf("%s.", nc.newInbox())
nc.respSubPrefix = fmt.Sprintf("%s.", nc.NewInbox())
nc.respSubLen = len(nc.respSubPrefix)
nc.respSub = fmt.Sprintf("%s*", nc.respSubPrefix)
nc.respMap = make(map[string]chan *Msg)
Expand Down
10 changes: 2 additions & 8 deletions test/headers_test.go
Expand Up @@ -58,9 +58,7 @@ func TestBasicHeaders(t *testing.T) {
t.Fatalf("Did not receive response: %v", err)
}

// Blank out the sub since its not present in the original.
msg.Sub = nil
if !reflect.DeepEqual(m, msg) {
if !m.Equal(msg) {
t.Fatalf("Messages did not match! \n%+v\n%+v\n", m, msg)
}
}
Expand Down Expand Up @@ -277,12 +275,8 @@ func TestMsgHeadersCasePreserving(t *testing.T) {
if err != nil {
t.Fatalf("Did not receive response: %v", err)
}

// Blank out the sub since its not present in the original.
msg.Sub = nil

// Confirm that received message is just like the one originally sent.
if !reflect.DeepEqual(m, msg) {
if !m.Equal(msg) {
t.Fatalf("Messages did not match! \n%+v\n%+v\n", m, msg)
}

Expand Down
2 changes: 2 additions & 0 deletions test/js_test.go
Expand Up @@ -7587,6 +7587,8 @@ func TestJetStreamSubscribeContextCancel(t *testing.T) {
}

func TestJetStreamClusterStreamLeaderChangeClientErr(t *testing.T) {
t.Skip("The 2.9 server changed behavior making this test fail now")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is failing? And does that mean that the test is no longer relevant or that there is a breaking change? I don't remember changes in server that would explain that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remember we were over zealous in responding from non-leaders in the server and changed that behavior to make some of the server tests pass consistently.

This test now does have timeouts for calls to StreamInfo.


cfg := &nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Expand Down