diff --git a/go_test.mod b/go_test.mod index a9a586e30..29445fbc1 100644 --- a/go_test.mod +++ b/go_test.mod @@ -4,16 +4,16 @@ go 1.17 require ( github.com/golang/protobuf v1.4.2 - github.com/nats-io/nats-server/v2 v2.7.2 + github.com/nats-io/nats-server/v2 v2.7.5-0.20220329171936-1aeaaf0ca3db github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 google.golang.org/protobuf v1.23.0 ) require ( - github.com/klauspost/compress v1.13.4 // indirect - github.com/minio/highwayhash v1.0.1 // indirect - github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 // indirect + github.com/klauspost/compress v1.14.4 // indirect + github.com/minio/highwayhash v1.0.2 // indirect + github.com/nats-io/jwt/v2 v2.2.1-0.20220323195037-3472a33220ba // indirect golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce // indirect golang.org/x/sys v0.0.0-20220111092808-5a964db01320 // indirect golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect diff --git a/go_test.sum b/go_test.sum index f7a98a561..fd276dc4b 100644 --- a/go_test.sum +++ b/go_test.sum @@ -5,21 +5,19 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= -github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= -github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s= -github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= -github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0= -github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= -github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY= -github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= -github.com/nats-io/nats-server/v2 v2.7.2 h1:+LEN8m0+jdCkiGc884WnDuxR+qj80/5arj+szKuRpRI= -github.com/nats-io/nats-server/v2 v2.7.2/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8= -github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/klauspost/compress v1.14.4 h1:eijASRJcobkVtSt81Olfh7JX43osYLwy5krOJo6YEu4= +github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= +github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= +github.com/nats-io/jwt/v2 v2.2.1-0.20220323195037-3472a33220ba h1:NZi4+xOauRDb4znbGDeJqdS1Gh448BaQ3NS9F1UnwN0= +github.com/nats-io/jwt/v2 v2.2.1-0.20220323195037-3472a33220ba/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= +github.com/nats-io/nats-server/v2 v2.7.5-0.20220329171936-1aeaaf0ca3db h1:rhq+z4vyrg8kTIuWtCohRv1OWNrwiXlFwes7PNBbihQ= +github.com/nats-io/nats-server/v2 v2.7.5-0.20220329171936-1aeaaf0ca3db/go.mod h1:K1n2PmYKV0i57e6RUGOtIo+2iNHJJoooA90QWnkf0CU= +github.com/nats-io/nats.go v1.13.1-0.20220318132711-e0e03e374228/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= diff --git a/js.go b/js.go index d526978b2..678059bcc 100644 --- a/js.go +++ b/js.go @@ -187,6 +187,9 @@ const ( // Default number of retries DefaultPubRetryAttempts = 2 + + // defaultAsyncPubAckInflight is the number of async pub acks inflight. + defaultAsyncPubAckInflight = 4 * 1024 ) // Types of control messages, so far heartbeat and flow control @@ -218,12 +221,12 @@ type jsOpts struct { wait time.Duration // For async publish error handling. aecb MsgErrHandler - // Maximum in flight. - maxap int + // Max async pub ack in flight + maxpa int // the domain that produced the pre domain string // enables protocol tracing - trace TraceCB + ctrace ClientTrace shouldTrace bool } @@ -238,8 +241,9 @@ func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) { js := &js{ nc: nc, opts: &jsOpts{ - pre: defaultAPIPrefix, - wait: defaultRequestWait, + pre: defaultAPIPrefix, + wait: defaultRequestWait, + maxpa: defaultAsyncPubAckInflight, }, } @@ -263,26 +267,16 @@ func (opt jsOptFn) configureJSContext(opts *jsOpts) error { return opt(opts) } -// TraceOperation indicates the direction of traffic flow to TraceCB -type TraceOperation int - -const ( - // TraceSent indicate the payload is being sent to subj - TraceSent TraceOperation = 0 - // TraceReceived indicate the payload is being received on subj - TraceReceived TraceOperation = 1 -) - -// TraceCB is called to trace API interactions for the JetStream Context -type TraceCB func(op TraceOperation, subj string, payload []byte, hdr Header) +// ClientTrace can be used to trace API interactions for the JetStream Context. +type ClientTrace struct { + RequestSent func(subj string, payload []byte) + ResponseReceived func(subj string, payload []byte, hdr Header) +} -// TraceFunc enables tracing of JetStream API interactions -func TraceFunc(cb TraceCB) JSOpt { - return jsOptFn(func(js *jsOpts) error { - js.trace = cb - js.shouldTrace = true - return nil - }) +func (ct ClientTrace) configureJSContext(js *jsOpts) error { + js.ctrace = ct + js.shouldTrace = true + return nil } // Domain changes the domain part of JetStream API prefix. @@ -581,9 +575,9 @@ func (js *js) registerPAF(id string, paf *pubAckFuture) (int, int) { paf.js = js js.pafs[id] = paf np := len(js.pafs) - maxap := js.opts.maxap + maxpa := js.opts.maxpa js.mu.Unlock() - return np, maxap + return np, maxpa } // Lock should be held. @@ -635,7 +629,7 @@ func (js *js) handleAsyncReply(m *Msg) { delete(js.pafs, id) // Check on anyone stalled and waiting. - if js.stc != nil && len(js.pafs) < js.opts.maxap { + if js.stc != nil && len(js.pafs) < js.opts.maxpa { close(js.stc) js.stc = nil } @@ -706,7 +700,7 @@ func PublishAsyncMaxPending(max int) JSOpt { if max < 1 { return errors.New("nats: max ack pending should be >= 1") } - js.maxap = max + js.maxpa = max return nil }) } @@ -1527,7 +1521,10 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, } if js.opts.shouldTrace { - js.opts.trace(TraceSent, ccSubj, j, nil) + ctrace := js.opts.ctrace + if ctrace.RequestSent != nil { + ctrace.RequestSent(ccSubj, j) + } } resp, err := nc.Request(ccSubj, j, js.opts.wait) if err != nil { @@ -1538,7 +1535,10 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, return nil, err } if js.opts.shouldTrace { - js.opts.trace(TraceReceived, ccSubj, resp.Data, resp.Header) + ctrace := js.opts.ctrace + if ctrace.ResponseReceived != nil { + ctrace.ResponseReceived(ccSubj, resp.Data, resp.Header) + } } var cinfo consumerResponse @@ -2636,14 +2636,20 @@ func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer strin // a RequestWithContext with tracing via TraceCB func (js *js) apiRequestWithContext(ctx context.Context, subj string, data []byte) (*Msg, error) { if js.opts.shouldTrace { - js.opts.trace(TraceSent, subj, data, nil) + ctrace := js.opts.ctrace + if ctrace.RequestSent != nil { + ctrace.RequestSent(subj, data) + } } resp, err := js.nc.RequestWithContext(ctx, subj, data) if err != nil { return nil, err } if js.opts.shouldTrace { - js.opts.trace(TraceReceived, subj, resp.Data, resp.Header) + ctrace := js.opts.ctrace + if ctrace.RequestSent != nil { + ctrace.ResponseReceived(subj, resp.Data, resp.Header) + } } return resp, nil diff --git a/js_test.go b/js_test.go index 5fac93f9c..904e1fdef 100644 --- a/js_test.go +++ b/js_test.go @@ -865,19 +865,20 @@ func TestJetStreamTracing(t *testing.T) { defer nc.Close() ctr := 0 - js, err := nc.JetStream(TraceFunc(func(op TraceOperation, subj string, payload []byte, hdr Header) { - ctr++ - if ctr == 1 { - if op != TraceSent || subj != "$JS.API.STREAM.CREATE.X" { - t.Fatalf("Exected sent trace to %s: got: %d %s", "$JS.API.STREAM.CREATE.X", op, subj) + js, err := nc.JetStream(&ClientTrace{ + RequestSent: func(subj string, payload []byte) { + ctr++ + if subj != "$JS.API.STREAM.CREATE.X" { + t.Fatalf("Expected sent trace to %s: got: %s", "$JS.API.STREAM.CREATE.X", subj) } - return - } - - if op != TraceReceived || subj != "$JS.API.STREAM.CREATE.X" { - t.Fatalf("Exected received trace to %s: got: %d %s", "$JS.API.STREAM.CREATE.X", op, subj) - } - })) + }, + ResponseReceived: func(subj string, payload []byte, hdr Header) { + ctr++ + if subj != "$JS.API.STREAM.CREATE.X" { + t.Fatalf("Expected received trace to %s: got: %s", "$JS.API.STREAM.CREATE.X", subj) + } + }, + }) if err != nil { t.Fatalf("Unexpected error: %v", err) }