Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support tracing JS api calls #849

Merged
merged 1 commit into from Oct 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
56 changes: 52 additions & 4 deletions js.go
Expand Up @@ -216,6 +216,9 @@ type jsOpts struct {
maxap int
// the domain that produced the pre
domain string
// enables protocol tracing
trace TraceCB
shouldTrace bool
}

const (
Expand Down Expand Up @@ -254,6 +257,28 @@ func (opt jsOptFn) configureJSContext(opts *jsOpts) error {
return opt(opts)
}

// TraceOperation indicates the direction of traffic flow to TraceCB
type TraceOperation int

const (
// TraceSent indicate the payload is being sent to subj
TraceSent TraceOperation = 0
// TraceReceived indicate the payload is being received on subj
TraceReceived TraceOperation = 1
)
Copy link
Member

@wallyqs wallyqs Oct 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative to using these types could be for example to do something similar to what httptrace package does with a struct where multiple callbacks are defined. For example with a ClientTrace struct like the following, it could be made to implement the options for JSOpt to configure just a context:

type ClientTrace struct {
	RequestSent      func(subj string, payload []byte, hdr Header)
	ResponseReceived func(subj string, payload []byte, hdr Header)
	// ProtocolSent...
	// ProtocolReceived...
	// Other misc low level connect options ala net/http/httptrace maybe
}

func (ct ClientTrace) configureJSContext(js *jsOpts) error {
	js.ctrace = ct
	js.shouldTrace = true
	return nil
}

but also same type could be used for other tracing at the nats level with implementing the nats.Option:

func WithClientTrace(ct *ClientTrace) Option {
	return func(o *Options) error {
		// Trace funcs but at the NATS level
		o.ClientTrace = ct
		return nil
	}
}

Then could use either later on:

	// Define at the NATS level to trace protocols or non JS traffic as well:
	nc, err := Connect(s.ClientURL(), WithClientTrace(&ClientTrace{}))
	if err != nil {
		t.Fatalf("Unexpected error: %v", err)
	}
	defer nc.Close()

	// Trace funcs scoped at the JS level.  Implements the interface for
	// JS option so can use the same type.
	nc, _ = Connect(s.ClientURL())
	js, err := nc.JetStream(&ClientTrace{
		RequestSent: func(subj string, payload []byte, hdr Header) {
			fmt.Println(">>> SENT : ", subj, payload)
		},
		ResponseReceived: func(subj string, payload []byte, hdr Header) {
			fmt.Println("<<< RECVD: ", subj, payload)
		},
	})

(Put an example of how this would look here: wallyqs@2a8553d#diff-d3e5f2fc32ab57ad01ff7eff1dc643653af5636e517a4b0677ddcefb94f87f7bR909)

Copy link
Contributor Author

@ripienaar ripienaar Oct 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have a baked example feel free to put up a PR instead of this one, I dont mind who puts it in :) just need to be able to get tracing on things as debugging stuff atm is really tough :)

I wont have time to work on this past whats here for like week and a half

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, ok maybe let's include as is for now to iterate on the idea already in main branch.


// TraceCB is called to trace API interactions for the JetStream Context
type TraceCB func(op TraceOperation, subj string, payload []byte, hdr Header)

// TraceFunc enables tracing of JetStream API interactions
func TraceFunc(cb TraceCB) JSOpt {
return jsOptFn(func(js *jsOpts) error {
js.trace = cb
js.shouldTrace = true
return nil
})
}

// Domain changes the domain part of JetSteam API prefix.
func Domain(domain string) JSOpt {
if domain == _EMPTY_ {
Expand Down Expand Up @@ -1415,19 +1440,26 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,

var ccSubj string
if isDurable {
ccSubj = fmt.Sprintf(apiDurableCreateT, stream, cfg.Durable)
ccSubj = js.apiSubj(fmt.Sprintf(apiDurableCreateT, stream, cfg.Durable))
} else {
ccSubj = fmt.Sprintf(apiConsumerCreateT, stream)
ccSubj = js.apiSubj(fmt.Sprintf(apiConsumerCreateT, stream))
}

resp, err := nc.Request(js.apiSubj(ccSubj), j, js.opts.wait)
if js.opts.shouldTrace {
js.opts.trace(TraceSent, ccSubj, j, nil)
}
resp, err := nc.Request(ccSubj, j, js.opts.wait)
if err != nil {
cleanUpSub()
if err == ErrNoResponders {
err = ErrJetStreamNotEnabled
}
return nil, err
}
if js.opts.shouldTrace {
js.opts.trace(TraceReceived, ccSubj, resp.Data, resp.Header)
}

var cinfo consumerResponse
err = json.Unmarshal(resp.Data, &cinfo)
if err != nil {
Expand Down Expand Up @@ -2445,7 +2477,7 @@ func (js *js) getConsumerInfo(stream, consumer string) (*ConsumerInfo, error) {

func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer string) (*ConsumerInfo, error) {
ccInfoSubj := fmt.Sprintf(apiConsumerInfoT, stream, consumer)
resp, err := js.nc.RequestWithContext(ctx, js.apiSubj(ccInfoSubj), nil)
resp, err := js.apiRequestWithContext(ctx, js.apiSubj(ccInfoSubj), nil)
if err != nil {
if err == ErrNoResponders {
err = ErrJetStreamNotEnabled
Expand All @@ -2466,6 +2498,22 @@ func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer strin
return info.ConsumerInfo, nil
}

// a RequestWithContext with tracing via TraceCB
func (js *js) apiRequestWithContext(ctx context.Context, subj string, data []byte) (*Msg, error) {
if js.opts.shouldTrace {
js.opts.trace(TraceSent, subj, data, nil)
}
resp, err := js.nc.RequestWithContext(ctx, subj, data)
if err != nil {
return nil, err
}
if js.opts.shouldTrace {
js.opts.trace(TraceReceived, subj, resp.Data, resp.Header)
}

return resp, nil
}

func (m *Msg) checkReply() (*js, *jsSub, error) {
if m == nil || m.Sub == nil {
return nil, nil, ErrMsgNotBound
Expand Down
37 changes: 37 additions & 0 deletions js_test.go
Expand Up @@ -891,6 +891,43 @@ func TestJetStreamFlowControlStalled(t *testing.T) {
}
}

func TestJetStreamTracing(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

nc, err := Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

ctr := 0
js, err := nc.JetStream(TraceFunc(func(op TraceOperation, subj string, payload []byte, hdr Header) {
ctr++
if ctr == 1 {
if op != TraceSent || subj != "$JS.API.STREAM.CREATE.X" {
t.Fatalf("Exected sent trace to %s: got: %d %s", "$JS.API.STREAM.CREATE.X", op, subj)
}
return
}

if op != TraceReceived || subj != "$JS.API.STREAM.CREATE.X" {
t.Fatalf("Exected received trace to %s: got: %d %s", "$JS.API.STREAM.CREATE.X", op, subj)
}
}))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

_, err = js.AddStream(&StreamConfig{Name: "X"})
if err != nil {
t.Fatalf("add stream failed: %s", err)
}
if ctr != 2 {
t.Fatalf("did not receive all trace events: %d", ctr)
}
}

func TestJetStreamExpiredPullRequests(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
Expand Down
28 changes: 14 additions & 14 deletions jsm.go
Expand Up @@ -188,7 +188,7 @@ func (js *js) AccountInfo(opts ...JSOpt) (*AccountInfo, error) {
defer cancel()
}

resp, err := js.nc.RequestWithContext(o.ctx, js.apiSubj(apiAccountInfo), nil)
resp, err := js.apiRequestWithContext(o.ctx, js.apiSubj(apiAccountInfo), nil)
if err != nil {
// todo maybe nats server should never have no responder on this subject and always respond if they know there is no js to be had
if err == ErrNoResponders {
Expand Down Expand Up @@ -251,7 +251,7 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*C
ccSubj = fmt.Sprintf(apiConsumerCreateT, stream)
}

resp, err := js.nc.RequestWithContext(o.ctx, js.apiSubj(ccSubj), req)
resp, err := js.apiRequestWithContext(o.ctx, js.apiSubj(ccSubj), req)
if err != nil {
if err == ErrNoResponders {
err = ErrJetStreamNotEnabled
Expand Down Expand Up @@ -293,7 +293,7 @@ func (js *js) DeleteConsumer(stream, consumer string, opts ...JSOpt) error {
}

dcSubj := js.apiSubj(fmt.Sprintf(apiConsumerDeleteT, stream, consumer))
r, err := js.nc.RequestWithContext(o.ctx, dcSubj, nil)
r, err := js.apiRequestWithContext(o.ctx, dcSubj, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -376,7 +376,7 @@ func (c *consumerLister) Next() bool {
}

clSubj := c.js.apiSubj(fmt.Sprintf(apiConsumerListT, c.stream))
r, err := c.js.nc.RequestWithContext(ctx, clSubj, req)
r, err := c.js.apiRequestWithContext(ctx, clSubj, req)
if err != nil {
c.err = err
return false
Expand Down Expand Up @@ -473,7 +473,7 @@ func (c *consumerNamesLister) Next() bool {
}

clSubj := c.js.apiSubj(fmt.Sprintf(apiConsumerNamesT, c.stream))
r, err := c.js.nc.RequestWithContext(ctx, clSubj, nil)
r, err := c.js.apiRequestWithContext(ctx, clSubj, nil)
if err != nil {
c.err = err
return false
Expand Down Expand Up @@ -561,7 +561,7 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
}

csSubj := js.apiSubj(fmt.Sprintf(apiStreamCreateT, cfg.Name))
r, err := js.nc.RequestWithContext(o.ctx, csSubj, req)
r, err := js.apiRequestWithContext(o.ctx, csSubj, req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -592,7 +592,7 @@ func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) {
}

csSubj := js.apiSubj(fmt.Sprintf(apiStreamInfoT, stream))
r, err := js.nc.RequestWithContext(o.ctx, csSubj, nil)
r, err := js.apiRequestWithContext(o.ctx, csSubj, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -676,7 +676,7 @@ func (js *js) UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error
}

usSubj := js.apiSubj(fmt.Sprintf(apiStreamUpdateT, cfg.Name))
r, err := js.nc.RequestWithContext(o.ctx, usSubj, req)
r, err := js.apiRequestWithContext(o.ctx, usSubj, req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -711,7 +711,7 @@ func (js *js) DeleteStream(name string, opts ...JSOpt) error {
}

dsSubj := js.apiSubj(fmt.Sprintf(apiStreamDeleteT, name))
r, err := js.nc.RequestWithContext(o.ctx, dsSubj, nil)
r, err := js.apiRequestWithContext(o.ctx, dsSubj, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -788,7 +788,7 @@ func (js *js) getMsg(name string, mreq *apiMsgGetRequest, opts ...JSOpt) (*RawSt
}

dsSubj := js.apiSubj(fmt.Sprintf(apiMsgGetT, name))
r, err := js.nc.RequestWithContext(o.ctx, dsSubj, req)
r, err := js.apiRequestWithContext(o.ctx, dsSubj, req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -853,7 +853,7 @@ func (js *js) DeleteMsg(name string, seq uint64, opts ...JSOpt) error {
}

dsSubj := js.apiSubj(fmt.Sprintf(apiMsgDeleteT, name))
r, err := js.nc.RequestWithContext(o.ctx, dsSubj, req)
r, err := js.apiRequestWithContext(o.ctx, dsSubj, req)
if err != nil {
return err
}
Expand Down Expand Up @@ -905,7 +905,7 @@ func (js *js) purgeStream(stream string, req *streamPurgeRequest, opts ...JSOpt)
}

psSubj := js.apiSubj(fmt.Sprintf(apiStreamPurgeT, stream))
r, err := js.nc.RequestWithContext(o.ctx, psSubj, b)
r, err := js.apiRequestWithContext(o.ctx, psSubj, b)
if err != nil {
return err
}
Expand Down Expand Up @@ -970,7 +970,7 @@ func (s *streamLister) Next() bool {
}

slSubj := s.js.apiSubj(apiStreamList)
r, err := s.js.nc.RequestWithContext(ctx, slSubj, req)
r, err := s.js.apiRequestWithContext(ctx, slSubj, req)
if err != nil {
s.err = err
return false
Expand Down Expand Up @@ -1054,7 +1054,7 @@ func (l *streamNamesLister) Next() bool {
defer cancel()
}

r, err := l.js.nc.RequestWithContext(ctx, l.js.apiSubj(apiStreams), nil)
r, err := l.js.apiRequestWithContext(ctx, l.js.apiSubj(apiStreams), nil)
if err != nil {
l.err = err
return false
Expand Down