From 2554593026cc24f0c860c6112c5c21520b932e42 Mon Sep 17 00:00:00 2001 From: "R.I.Pienaar" Date: Thu, 14 Oct 2021 10:02:21 +0200 Subject: [PATCH] support tracing JS api calls Signed-off-by: R.I.Pienaar --- js.go | 56 ++++++++++++++++++++++++++++++++++++++++++++++++++---- js_test.go | 37 ++++++++++++++++++++++++++++++++++++ jsm.go | 28 +++++++++++++-------------- 3 files changed, 103 insertions(+), 18 deletions(-) diff --git a/js.go b/js.go index 222248f0e..f2ae35715 100644 --- a/js.go +++ b/js.go @@ -216,6 +216,9 @@ type jsOpts struct { maxap int // the domain that produced the pre domain string + // enables protocol tracing + trace TraceCB + shouldTrace bool } const ( @@ -254,6 +257,28 @@ func (opt jsOptFn) configureJSContext(opts *jsOpts) error { return opt(opts) } +// TraceOperation indicates the direction of traffic flow to TraceCB +type TraceOperation int + +const ( + // TraceSent indicate the payload is being sent to subj + TraceSent TraceOperation = 0 + // TraceReceived indicate the payload is being received on subj + TraceReceived TraceOperation = 1 +) + +// TraceCB is called to trace API interactions for the JetStream Context +type TraceCB func(op TraceOperation, subj string, payload []byte, hdr Header) + +// TraceFunc enables tracing of JetStream API interactions +func TraceFunc(cb TraceCB) JSOpt { + return jsOptFn(func(js *jsOpts) error { + js.trace = cb + js.shouldTrace = true + return nil + }) +} + // Domain changes the domain part of JetSteam API prefix. func Domain(domain string) JSOpt { if domain == _EMPTY_ { @@ -1415,12 +1440,15 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, var ccSubj string if isDurable { - ccSubj = fmt.Sprintf(apiDurableCreateT, stream, cfg.Durable) + ccSubj = js.apiSubj(fmt.Sprintf(apiDurableCreateT, stream, cfg.Durable)) } else { - ccSubj = fmt.Sprintf(apiConsumerCreateT, stream) + ccSubj = js.apiSubj(fmt.Sprintf(apiConsumerCreateT, stream)) } - resp, err := nc.Request(js.apiSubj(ccSubj), j, js.opts.wait) + if js.opts.shouldTrace { + js.opts.trace(TraceSent, ccSubj, j, nil) + } + resp, err := nc.Request(ccSubj, j, js.opts.wait) if err != nil { cleanUpSub() if err == ErrNoResponders { @@ -1428,6 +1456,10 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, } return nil, err } + if js.opts.shouldTrace { + js.opts.trace(TraceReceived, ccSubj, resp.Data, resp.Header) + } + var cinfo consumerResponse err = json.Unmarshal(resp.Data, &cinfo) if err != nil { @@ -2445,7 +2477,7 @@ func (js *js) getConsumerInfo(stream, consumer string) (*ConsumerInfo, error) { func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer string) (*ConsumerInfo, error) { ccInfoSubj := fmt.Sprintf(apiConsumerInfoT, stream, consumer) - resp, err := js.nc.RequestWithContext(ctx, js.apiSubj(ccInfoSubj), nil) + resp, err := js.apiRequestWithContext(ctx, js.apiSubj(ccInfoSubj), nil) if err != nil { if err == ErrNoResponders { err = ErrJetStreamNotEnabled @@ -2466,6 +2498,22 @@ func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer strin return info.ConsumerInfo, nil } +// a RequestWithContext with tracing via TraceCB +func (js *js) apiRequestWithContext(ctx context.Context, subj string, data []byte) (*Msg, error) { + if js.opts.shouldTrace { + js.opts.trace(TraceSent, subj, data, nil) + } + resp, err := js.nc.RequestWithContext(ctx, subj, data) + if err != nil { + return nil, err + } + if js.opts.shouldTrace { + js.opts.trace(TraceReceived, subj, resp.Data, resp.Header) + } + + return resp, nil +} + func (m *Msg) checkReply() (*js, *jsSub, error) { if m == nil || m.Sub == nil { return nil, nil, ErrMsgNotBound diff --git a/js_test.go b/js_test.go index 3a3071466..2a0645eb4 100644 --- a/js_test.go +++ b/js_test.go @@ -891,6 +891,43 @@ func TestJetStreamFlowControlStalled(t *testing.T) { } } +func TestJetStreamTracing(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + nc, err := Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + ctr := 0 + js, err := nc.JetStream(TraceFunc(func(op TraceOperation, subj string, payload []byte, hdr Header) { + ctr++ + if ctr == 1 { + if op != TraceSent || subj != "$JS.API.STREAM.CREATE.X" { + t.Fatalf("Exected sent trace to %s: got: %d %s", "$JS.API.STREAM.CREATE.X", op, subj) + } + return + } + + if op != TraceReceived || subj != "$JS.API.STREAM.CREATE.X" { + t.Fatalf("Exected received trace to %s: got: %d %s", "$JS.API.STREAM.CREATE.X", op, subj) + } + })) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + _, err = js.AddStream(&StreamConfig{Name: "X"}) + if err != nil { + t.Fatalf("add stream failed: %s", err) + } + if ctr != 2 { + t.Fatalf("did not receive all trace events: %d", ctr) + } +} + func TestJetStreamExpiredPullRequests(t *testing.T) { s := RunBasicJetStreamServer() defer s.Shutdown() diff --git a/jsm.go b/jsm.go index 382879aae..5c38ab67c 100644 --- a/jsm.go +++ b/jsm.go @@ -188,7 +188,7 @@ func (js *js) AccountInfo(opts ...JSOpt) (*AccountInfo, error) { defer cancel() } - resp, err := js.nc.RequestWithContext(o.ctx, js.apiSubj(apiAccountInfo), nil) + resp, err := js.apiRequestWithContext(o.ctx, js.apiSubj(apiAccountInfo), nil) if err != nil { // todo maybe nats server should never have no responder on this subject and always respond if they know there is no js to be had if err == ErrNoResponders { @@ -251,7 +251,7 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*C ccSubj = fmt.Sprintf(apiConsumerCreateT, stream) } - resp, err := js.nc.RequestWithContext(o.ctx, js.apiSubj(ccSubj), req) + resp, err := js.apiRequestWithContext(o.ctx, js.apiSubj(ccSubj), req) if err != nil { if err == ErrNoResponders { err = ErrJetStreamNotEnabled @@ -293,7 +293,7 @@ func (js *js) DeleteConsumer(stream, consumer string, opts ...JSOpt) error { } dcSubj := js.apiSubj(fmt.Sprintf(apiConsumerDeleteT, stream, consumer)) - r, err := js.nc.RequestWithContext(o.ctx, dcSubj, nil) + r, err := js.apiRequestWithContext(o.ctx, dcSubj, nil) if err != nil { return err } @@ -376,7 +376,7 @@ func (c *consumerLister) Next() bool { } clSubj := c.js.apiSubj(fmt.Sprintf(apiConsumerListT, c.stream)) - r, err := c.js.nc.RequestWithContext(ctx, clSubj, req) + r, err := c.js.apiRequestWithContext(ctx, clSubj, req) if err != nil { c.err = err return false @@ -473,7 +473,7 @@ func (c *consumerNamesLister) Next() bool { } clSubj := c.js.apiSubj(fmt.Sprintf(apiConsumerNamesT, c.stream)) - r, err := c.js.nc.RequestWithContext(ctx, clSubj, nil) + r, err := c.js.apiRequestWithContext(ctx, clSubj, nil) if err != nil { c.err = err return false @@ -561,7 +561,7 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) { } csSubj := js.apiSubj(fmt.Sprintf(apiStreamCreateT, cfg.Name)) - r, err := js.nc.RequestWithContext(o.ctx, csSubj, req) + r, err := js.apiRequestWithContext(o.ctx, csSubj, req) if err != nil { return nil, err } @@ -592,7 +592,7 @@ func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) { } csSubj := js.apiSubj(fmt.Sprintf(apiStreamInfoT, stream)) - r, err := js.nc.RequestWithContext(o.ctx, csSubj, nil) + r, err := js.apiRequestWithContext(o.ctx, csSubj, nil) if err != nil { return nil, err } @@ -676,7 +676,7 @@ func (js *js) UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error } usSubj := js.apiSubj(fmt.Sprintf(apiStreamUpdateT, cfg.Name)) - r, err := js.nc.RequestWithContext(o.ctx, usSubj, req) + r, err := js.apiRequestWithContext(o.ctx, usSubj, req) if err != nil { return nil, err } @@ -711,7 +711,7 @@ func (js *js) DeleteStream(name string, opts ...JSOpt) error { } dsSubj := js.apiSubj(fmt.Sprintf(apiStreamDeleteT, name)) - r, err := js.nc.RequestWithContext(o.ctx, dsSubj, nil) + r, err := js.apiRequestWithContext(o.ctx, dsSubj, nil) if err != nil { return err } @@ -788,7 +788,7 @@ func (js *js) getMsg(name string, mreq *apiMsgGetRequest, opts ...JSOpt) (*RawSt } dsSubj := js.apiSubj(fmt.Sprintf(apiMsgGetT, name)) - r, err := js.nc.RequestWithContext(o.ctx, dsSubj, req) + r, err := js.apiRequestWithContext(o.ctx, dsSubj, req) if err != nil { return nil, err } @@ -853,7 +853,7 @@ func (js *js) DeleteMsg(name string, seq uint64, opts ...JSOpt) error { } dsSubj := js.apiSubj(fmt.Sprintf(apiMsgDeleteT, name)) - r, err := js.nc.RequestWithContext(o.ctx, dsSubj, req) + r, err := js.apiRequestWithContext(o.ctx, dsSubj, req) if err != nil { return err } @@ -905,7 +905,7 @@ func (js *js) purgeStream(stream string, req *streamPurgeRequest, opts ...JSOpt) } psSubj := js.apiSubj(fmt.Sprintf(apiStreamPurgeT, stream)) - r, err := js.nc.RequestWithContext(o.ctx, psSubj, b) + r, err := js.apiRequestWithContext(o.ctx, psSubj, b) if err != nil { return err } @@ -970,7 +970,7 @@ func (s *streamLister) Next() bool { } slSubj := s.js.apiSubj(apiStreamList) - r, err := s.js.nc.RequestWithContext(ctx, slSubj, req) + r, err := s.js.apiRequestWithContext(ctx, slSubj, req) if err != nil { s.err = err return false @@ -1054,7 +1054,7 @@ func (l *streamNamesLister) Next() bool { defer cancel() } - r, err := l.js.nc.RequestWithContext(ctx, l.js.apiSubj(apiStreams), nil) + r, err := l.js.apiRequestWithContext(ctx, l.js.apiSubj(apiStreams), nil) if err != nil { l.err = err return false