Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ClientTrace struct for JS tracing #911

Merged
merged 1 commit into from Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
50 changes: 26 additions & 24 deletions js.go
Expand Up @@ -223,7 +223,7 @@ type jsOpts struct {
// the domain that produced the pre
domain string
// enables protocol tracing
trace TraceCB
ctrace ClientTrace
shouldTrace bool
}

Expand Down Expand Up @@ -263,26 +263,16 @@ func (opt jsOptFn) configureJSContext(opts *jsOpts) error {
return opt(opts)
}

// TraceOperation indicates the direction of traffic flow to TraceCB
type TraceOperation int

const (
// TraceSent indicate the payload is being sent to subj
TraceSent TraceOperation = 0
// TraceReceived indicate the payload is being received on subj
TraceReceived TraceOperation = 1
)

// TraceCB is called to trace API interactions for the JetStream Context
type TraceCB func(op TraceOperation, subj string, payload []byte, hdr Header)
kozlovic marked this conversation as resolved.
Show resolved Hide resolved
// ClientTrace can be used to trace API interactions for the JetStream Context.
type ClientTrace struct {
RequestSent func(subj string, payload []byte)
ResponseReceived func(subj string, payload []byte, hdr Header)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could expand this to make them more granular for example, to only trace consumer lookups or for when adding retries make it possible to log an event for when there is a backoff, etc...

}

// TraceFunc enables tracing of JetStream API interactions
func TraceFunc(cb TraceCB) JSOpt {
return jsOptFn(func(js *jsOpts) error {
js.trace = cb
js.shouldTrace = true
return nil
})
func (ct ClientTrace) configureJSContext(js *jsOpts) error {
js.ctrace = ct
js.shouldTrace = true
return nil
}

// Domain changes the domain part of JetStream API prefix.
Expand Down Expand Up @@ -1527,7 +1517,10 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
}

if js.opts.shouldTrace {
js.opts.trace(TraceSent, ccSubj, j, nil)
ctrace := js.opts.ctrace
if ctrace.RequestSent != nil {
ctrace.RequestSent(ccSubj, j)
}
}
resp, err := nc.Request(ccSubj, j, js.opts.wait)
if err != nil {
Expand All @@ -1538,7 +1531,10 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
return nil, err
}
if js.opts.shouldTrace {
js.opts.trace(TraceReceived, ccSubj, resp.Data, resp.Header)
ctrace := js.opts.ctrace
if ctrace.ResponseReceived != nil {
ctrace.ResponseReceived(ccSubj, resp.Data, resp.Header)
}
}

var cinfo consumerResponse
Expand Down Expand Up @@ -2636,14 +2632,20 @@ func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer strin
// a RequestWithContext with tracing via TraceCB
func (js *js) apiRequestWithContext(ctx context.Context, subj string, data []byte) (*Msg, error) {
if js.opts.shouldTrace {
js.opts.trace(TraceSent, subj, data, nil)
ctrace := js.opts.ctrace
if ctrace.RequestSent != nil {
ctrace.RequestSent(subj, data)
}
}
resp, err := js.nc.RequestWithContext(ctx, subj, data)
if err != nil {
return nil, err
}
if js.opts.shouldTrace {
js.opts.trace(TraceReceived, subj, resp.Data, resp.Header)
ctrace := js.opts.ctrace
if ctrace.RequestSent != nil {
ctrace.ResponseReceived(subj, resp.Data, resp.Header)
}
}

return resp, nil
Expand Down
25 changes: 13 additions & 12 deletions js_test.go
Expand Up @@ -865,19 +865,20 @@ func TestJetStreamTracing(t *testing.T) {
defer nc.Close()

ctr := 0
js, err := nc.JetStream(TraceFunc(func(op TraceOperation, subj string, payload []byte, hdr Header) {
ctr++
if ctr == 1 {
if op != TraceSent || subj != "$JS.API.STREAM.CREATE.X" {
t.Fatalf("Exected sent trace to %s: got: %d %s", "$JS.API.STREAM.CREATE.X", op, subj)
js, err := nc.JetStream(&ClientTrace{
RequestSent: func(subj string, payload []byte) {
ctr++
if subj != "$JS.API.STREAM.CREATE.X" {
t.Fatalf("Expected sent trace to %s: got: %s", "$JS.API.STREAM.CREATE.X", subj)
}
return
}

if op != TraceReceived || subj != "$JS.API.STREAM.CREATE.X" {
t.Fatalf("Exected received trace to %s: got: %d %s", "$JS.API.STREAM.CREATE.X", op, subj)
}
}))
},
ResponseReceived: func(subj string, payload []byte, hdr Header) {
ctr++
if subj != "$JS.API.STREAM.CREATE.X" {
t.Fatalf("Expected received trace to %s: got: %s", "$JS.API.STREAM.CREATE.X", subj)
}
},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down