Skip to content

Commit

Permalink
Merge pull request #911 from nats-io/with-client-trace
Browse files Browse the repository at this point in the history
Add ClientTrace struct for JS tracing

Signed-off-by: Waldemar Quevedo <wally@nats.io>
  • Loading branch information
wallyqs committed Mar 29, 2022
2 parents 5753a5f + f658a93 commit 49bed2a
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 59 deletions.
8 changes: 4 additions & 4 deletions go_test.mod
Expand Up @@ -4,16 +4,16 @@ go 1.17

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.7.2
github.com/nats-io/nats-server/v2 v2.7.5-0.20220329171936-1aeaaf0ca3db
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
)

require (
github.com/klauspost/compress v1.13.4 // indirect
github.com/minio/highwayhash v1.0.1 // indirect
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 // indirect
github.com/klauspost/compress v1.14.4 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.2.1-0.20220323195037-3472a33220ba // indirect
golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce // indirect
golang.org/x/sys v0.0.0-20220111092808-5a964db01320 // indirect
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect
Expand Down
20 changes: 9 additions & 11 deletions go_test.sum
Expand Up @@ -5,21 +5,19 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s=
github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0=
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY=
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.7.2 h1:+LEN8m0+jdCkiGc884WnDuxR+qj80/5arj+szKuRpRI=
github.com/nats-io/nats-server/v2 v2.7.2/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8=
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/klauspost/compress v1.14.4 h1:eijASRJcobkVtSt81Olfh7JX43osYLwy5krOJo6YEu4=
github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.2.1-0.20220323195037-3472a33220ba h1:NZi4+xOauRDb4znbGDeJqdS1Gh448BaQ3NS9F1UnwN0=
github.com/nats-io/jwt/v2 v2.2.1-0.20220323195037-3472a33220ba/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.7.5-0.20220329171936-1aeaaf0ca3db h1:rhq+z4vyrg8kTIuWtCohRv1OWNrwiXlFwes7PNBbihQ=
github.com/nats-io/nats-server/v2 v2.7.5-0.20220329171936-1aeaaf0ca3db/go.mod h1:K1n2PmYKV0i57e6RUGOtIo+2iNHJJoooA90QWnkf0CU=
github.com/nats-io/nats.go v1.13.1-0.20220318132711-e0e03e374228/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand Down
70 changes: 38 additions & 32 deletions js.go
Expand Up @@ -187,6 +187,9 @@ const (

// Default number of retries
DefaultPubRetryAttempts = 2

// defaultAsyncPubAckInflight is the number of async pub acks inflight.
defaultAsyncPubAckInflight = 4 * 1024
)

// Types of control messages, so far heartbeat and flow control
Expand Down Expand Up @@ -218,12 +221,12 @@ type jsOpts struct {
wait time.Duration
// For async publish error handling.
aecb MsgErrHandler
// Maximum in flight.
maxap int
// Max async pub ack in flight
maxpa int
// the domain that produced the pre
domain string
// enables protocol tracing
trace TraceCB
ctrace ClientTrace
shouldTrace bool
}

Expand All @@ -238,8 +241,9 @@ func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) {
js := &js{
nc: nc,
opts: &jsOpts{
pre: defaultAPIPrefix,
wait: defaultRequestWait,
pre: defaultAPIPrefix,
wait: defaultRequestWait,
maxpa: defaultAsyncPubAckInflight,
},
}

Expand All @@ -263,26 +267,16 @@ func (opt jsOptFn) configureJSContext(opts *jsOpts) error {
return opt(opts)
}

// TraceOperation indicates the direction of traffic flow to TraceCB
type TraceOperation int

const (
// TraceSent indicate the payload is being sent to subj
TraceSent TraceOperation = 0
// TraceReceived indicate the payload is being received on subj
TraceReceived TraceOperation = 1
)

// TraceCB is called to trace API interactions for the JetStream Context
type TraceCB func(op TraceOperation, subj string, payload []byte, hdr Header)
// ClientTrace can be used to trace API interactions for the JetStream Context.
type ClientTrace struct {
RequestSent func(subj string, payload []byte)
ResponseReceived func(subj string, payload []byte, hdr Header)
}

// TraceFunc enables tracing of JetStream API interactions
func TraceFunc(cb TraceCB) JSOpt {
return jsOptFn(func(js *jsOpts) error {
js.trace = cb
js.shouldTrace = true
return nil
})
func (ct ClientTrace) configureJSContext(js *jsOpts) error {
js.ctrace = ct
js.shouldTrace = true
return nil
}

// Domain changes the domain part of JetStream API prefix.
Expand Down Expand Up @@ -581,9 +575,9 @@ func (js *js) registerPAF(id string, paf *pubAckFuture) (int, int) {
paf.js = js
js.pafs[id] = paf
np := len(js.pafs)
maxap := js.opts.maxap
maxpa := js.opts.maxpa
js.mu.Unlock()
return np, maxap
return np, maxpa
}

// Lock should be held.
Expand Down Expand Up @@ -635,7 +629,7 @@ func (js *js) handleAsyncReply(m *Msg) {
delete(js.pafs, id)

// Check on anyone stalled and waiting.
if js.stc != nil && len(js.pafs) < js.opts.maxap {
if js.stc != nil && len(js.pafs) < js.opts.maxpa {
close(js.stc)
js.stc = nil
}
Expand Down Expand Up @@ -706,7 +700,7 @@ func PublishAsyncMaxPending(max int) JSOpt {
if max < 1 {
return errors.New("nats: max ack pending should be >= 1")
}
js.maxap = max
js.maxpa = max
return nil
})
}
Expand Down Expand Up @@ -1527,7 +1521,10 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
}

if js.opts.shouldTrace {
js.opts.trace(TraceSent, ccSubj, j, nil)
ctrace := js.opts.ctrace
if ctrace.RequestSent != nil {
ctrace.RequestSent(ccSubj, j)
}
}
resp, err := nc.Request(ccSubj, j, js.opts.wait)
if err != nil {
Expand All @@ -1538,7 +1535,10 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
return nil, err
}
if js.opts.shouldTrace {
js.opts.trace(TraceReceived, ccSubj, resp.Data, resp.Header)
ctrace := js.opts.ctrace
if ctrace.ResponseReceived != nil {
ctrace.ResponseReceived(ccSubj, resp.Data, resp.Header)
}
}

var cinfo consumerResponse
Expand Down Expand Up @@ -2636,14 +2636,20 @@ func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer strin
// a RequestWithContext with tracing via TraceCB
func (js *js) apiRequestWithContext(ctx context.Context, subj string, data []byte) (*Msg, error) {
if js.opts.shouldTrace {
js.opts.trace(TraceSent, subj, data, nil)
ctrace := js.opts.ctrace
if ctrace.RequestSent != nil {
ctrace.RequestSent(subj, data)
}
}
resp, err := js.nc.RequestWithContext(ctx, subj, data)
if err != nil {
return nil, err
}
if js.opts.shouldTrace {
js.opts.trace(TraceReceived, subj, resp.Data, resp.Header)
ctrace := js.opts.ctrace
if ctrace.RequestSent != nil {
ctrace.ResponseReceived(subj, resp.Data, resp.Header)
}
}

return resp, nil
Expand Down
25 changes: 13 additions & 12 deletions js_test.go
Expand Up @@ -865,19 +865,20 @@ func TestJetStreamTracing(t *testing.T) {
defer nc.Close()

ctr := 0
js, err := nc.JetStream(TraceFunc(func(op TraceOperation, subj string, payload []byte, hdr Header) {
ctr++
if ctr == 1 {
if op != TraceSent || subj != "$JS.API.STREAM.CREATE.X" {
t.Fatalf("Exected sent trace to %s: got: %d %s", "$JS.API.STREAM.CREATE.X", op, subj)
js, err := nc.JetStream(&ClientTrace{
RequestSent: func(subj string, payload []byte) {
ctr++
if subj != "$JS.API.STREAM.CREATE.X" {
t.Fatalf("Expected sent trace to %s: got: %s", "$JS.API.STREAM.CREATE.X", subj)
}
return
}

if op != TraceReceived || subj != "$JS.API.STREAM.CREATE.X" {
t.Fatalf("Exected received trace to %s: got: %d %s", "$JS.API.STREAM.CREATE.X", op, subj)
}
}))
},
ResponseReceived: func(subj string, payload []byte, hdr Header) {
ctr++
if subj != "$JS.API.STREAM.CREATE.X" {
t.Fatalf("Expected received trace to %s: got: %s", "$JS.API.STREAM.CREATE.X", subj)
}
},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down

0 comments on commit 49bed2a

Please sign in to comment.