diff --git a/js.go b/js.go index d526978b2..190ed3233 100644 --- a/js.go +++ b/js.go @@ -223,7 +223,7 @@ type jsOpts struct { // the domain that produced the pre domain string // enables protocol tracing - trace TraceCB + ctrace ClientTrace shouldTrace bool } @@ -263,26 +263,16 @@ func (opt jsOptFn) configureJSContext(opts *jsOpts) error { return opt(opts) } -// TraceOperation indicates the direction of traffic flow to TraceCB -type TraceOperation int - -const ( - // TraceSent indicate the payload is being sent to subj - TraceSent TraceOperation = 0 - // TraceReceived indicate the payload is being received on subj - TraceReceived TraceOperation = 1 -) - -// TraceCB is called to trace API interactions for the JetStream Context -type TraceCB func(op TraceOperation, subj string, payload []byte, hdr Header) +// ClientTrace can be used to trace API interactions for the JetStream Context. +type ClientTrace struct { + RequestSent func(subj string, payload []byte) + ResponseReceived func(subj string, payload []byte, hdr Header) +} -// TraceFunc enables tracing of JetStream API interactions -func TraceFunc(cb TraceCB) JSOpt { - return jsOptFn(func(js *jsOpts) error { - js.trace = cb - js.shouldTrace = true - return nil - }) +func (ct ClientTrace) configureJSContext(js *jsOpts) error { + js.ctrace = ct + js.shouldTrace = true + return nil } // Domain changes the domain part of JetStream API prefix. @@ -1527,7 +1517,10 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, } if js.opts.shouldTrace { - js.opts.trace(TraceSent, ccSubj, j, nil) + ctrace := js.opts.ctrace + if ctrace.RequestSent != nil { + ctrace.RequestSent(ccSubj, j) + } } resp, err := nc.Request(ccSubj, j, js.opts.wait) if err != nil { @@ -1538,7 +1531,10 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, return nil, err } if js.opts.shouldTrace { - js.opts.trace(TraceReceived, ccSubj, resp.Data, resp.Header) + ctrace := js.opts.ctrace + if ctrace.ResponseReceived != nil { + ctrace.ResponseReceived(ccSubj, resp.Data, resp.Header) + } } var cinfo consumerResponse @@ -2636,14 +2632,20 @@ func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer strin // a RequestWithContext with tracing via TraceCB func (js *js) apiRequestWithContext(ctx context.Context, subj string, data []byte) (*Msg, error) { if js.opts.shouldTrace { - js.opts.trace(TraceSent, subj, data, nil) + ctrace := js.opts.ctrace + if ctrace.RequestSent != nil { + ctrace.RequestSent(subj, data) + } } resp, err := js.nc.RequestWithContext(ctx, subj, data) if err != nil { return nil, err } if js.opts.shouldTrace { - js.opts.trace(TraceReceived, subj, resp.Data, resp.Header) + ctrace := js.opts.ctrace + if ctrace.RequestSent != nil { + ctrace.ResponseReceived(subj, resp.Data, resp.Header) + } } return resp, nil diff --git a/js_test.go b/js_test.go index 5fac93f9c..904e1fdef 100644 --- a/js_test.go +++ b/js_test.go @@ -865,19 +865,20 @@ func TestJetStreamTracing(t *testing.T) { defer nc.Close() ctr := 0 - js, err := nc.JetStream(TraceFunc(func(op TraceOperation, subj string, payload []byte, hdr Header) { - ctr++ - if ctr == 1 { - if op != TraceSent || subj != "$JS.API.STREAM.CREATE.X" { - t.Fatalf("Exected sent trace to %s: got: %d %s", "$JS.API.STREAM.CREATE.X", op, subj) + js, err := nc.JetStream(&ClientTrace{ + RequestSent: func(subj string, payload []byte) { + ctr++ + if subj != "$JS.API.STREAM.CREATE.X" { + t.Fatalf("Expected sent trace to %s: got: %s", "$JS.API.STREAM.CREATE.X", subj) } - return - } - - if op != TraceReceived || subj != "$JS.API.STREAM.CREATE.X" { - t.Fatalf("Exected received trace to %s: got: %d %s", "$JS.API.STREAM.CREATE.X", op, subj) - } - })) + }, + ResponseReceived: func(subj string, payload []byte, hdr Header) { + ctr++ + if subj != "$JS.API.STREAM.CREATE.X" { + t.Fatalf("Expected received trace to %s: got: %s", "$JS.API.STREAM.CREATE.X", subj) + } + }, + }) if err != nil { t.Fatalf("Unexpected error: %v", err) }