diff --git a/nats.go b/nats.go index 5f4ddf5e9..136bbad6d 100644 --- a/nats.go +++ b/nats.go @@ -433,6 +433,9 @@ type Options struct { // For websocket connections, indicates to the server that the connection // supports compression. If the server does too, then data will be compressed. Compression bool + + // InboxPrefix allows the default _INBOX prefix to be customized + InboxPrefix string } const ( @@ -494,11 +497,13 @@ type Conn struct { ws bool // true if a websocket connection // New style response handler - respSub string // The wildcard subject - respScanf string // The scanf template to extract mux token - respMux *Subscription // A single response subscription - respMap map[string]chan *Msg // Request map for the response msg channels - respRand *rand.Rand // Used for generating suffix + respSub string // The wildcard subject + respSubPrefix string // the wildcard prefix including trailing . + respSubLen int // the length of the wildcard prefix excluding trailing . + respScanf string // The scanf template to extract mux token + respMux *Subscription // A single response subscription + respMap map[string]chan *Msg // Request map for the response msg channels + respRand *rand.Rand // Used for generating suffix } type natsReader struct { @@ -1101,6 +1106,17 @@ func Compression(enabled bool) Option { } } +// CustomInboxPrefix configures the request + reply inbox prefix +func CustomInboxPrefix(p string) Option { + return func(o *Options) error { + if p == "" || strings.Contains(p, ">") || strings.Contains(p, "*") || strings.HasSuffix(p, ".") { + return fmt.Errorf("nats: invald custom prefix") + } + o.InboxPrefix = p + return nil + } +} + // Handler processing // SetDisconnectHandler will set the disconnect event handler. @@ -3343,7 +3359,8 @@ func (nc *Conn) createNewRequestAndSend(subj string, hdr, data []byte) (chan *Ms // Create new literal Inbox and map to a chan msg. mch := make(chan *Msg, RequestChanLen) respInbox := nc.newRespInbox() - token := respInbox[respInboxPrefixLen:] + token := respInbox[nc.respSubLen:] + nc.respMap[token] = mch if nc.respMux == nil { // Create the response subscription we will use for all new style responses. @@ -3450,7 +3467,7 @@ func (nc *Conn) newRequest(subj string, hdr, data []byte, timeout time.Duration) // with the Inbox reply and return the first reply received. // This is optimized for the case of multiple responses. func (nc *Conn) oldRequest(subj string, hdr, data []byte, timeout time.Duration) (*Msg, error) { - inbox := NewInbox() + inbox := nc.newInbox() ch := make(chan *Msg, RequestChanLen) s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, true, nil) @@ -3470,12 +3487,10 @@ func (nc *Conn) oldRequest(subj string, hdr, data []byte, timeout time.Duration) // InboxPrefix is the prefix for all inbox subjects. const ( - InboxPrefix = "_INBOX." - inboxPrefixLen = len(InboxPrefix) - respInboxPrefixLen = inboxPrefixLen + nuidSize + 1 - replySuffixLen = 8 // Gives us 62^8 - rdigits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" - base = 62 + InboxPrefix = "_INBOX." + inboxPrefixLen = len(InboxPrefix) + rdigits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + base = 62 ) // NewInbox will return an inbox string which can be used for directed replies from @@ -3490,10 +3505,23 @@ func NewInbox() string { return string(b[:]) } +func (nc *Conn) newInbox() string { + if nc.Opts.InboxPrefix == _EMPTY_ { + return NewInbox() + } + + var sb strings.Builder + sb.WriteString(nc.Opts.InboxPrefix) + sb.WriteByte('.') + sb.WriteString(nuid.Next()) + return sb.String() +} + // Function to init new response structures. func (nc *Conn) initNewResp() { - // _INBOX wildcard - nc.respSub = fmt.Sprintf("%s.*", NewInbox()) + nc.respSubPrefix = fmt.Sprintf("%s.", nc.newInbox()) + nc.respSubLen = len(nc.respSubPrefix) + nc.respSub = fmt.Sprintf("%s*", nc.respSubPrefix) nc.respMap = make(map[string]chan *Msg) nc.respRand = rand.New(rand.NewSource(time.Now().UnixNano())) } @@ -3505,15 +3533,17 @@ func (nc *Conn) newRespInbox() string { if nc.respMap == nil { nc.initNewResp() } - var b [respInboxPrefixLen + replySuffixLen]byte - pres := b[:respInboxPrefixLen] - copy(pres, nc.respSub) + + var sb strings.Builder + sb.WriteString(nc.respSubPrefix) + rn := nc.respRand.Int63() - for i, l := respInboxPrefixLen, rn; i < len(b); i++ { - b[i] = rdigits[l%base] - l /= base + for i := 0; i < nuidSize; i++ { + sb.WriteByte(rdigits[rn%base]) + rn /= base } - return string(b[:]) + + return sb.String() } // NewRespInbox is the new format used for _INBOX. diff --git a/nats_test.go b/nats_test.go index 7f8c840bc..342c48206 100644 --- a/nats_test.go +++ b/nats_test.go @@ -2659,3 +2659,46 @@ func TestMsg_RespondMsg(t *testing.T) { t.Fatalf("did not get correct response: %q", resp.Data) } } + +func TestCustomInboxPrefix(t *testing.T) { + opts := &Options{} + for _, p := range []string{"$BOB.", "$BOB.*", "$BOB.>", ">", ".", "", "BOB.*.X", "BOB.>.X"} { + err := CustomInboxPrefix(p)(opts) + if err == nil { + t.Fatalf("Expeted error for %q", p) + } + } + + s := RunServerOnPort(-1) + defer s.Shutdown() + + nc, err := Connect(s.ClientURL(), CustomInboxPrefix("$BOB")) + if err != nil { + t.Fatalf("Expected to connect to server, got %v", err) + } + defer nc.Close() + + sub, err := nc.Subscribe(NewInbox(), func(msg *Msg) { + if !strings.HasPrefix(msg.Reply, "$BOB.") { + t.Fatalf("invalid inbox subject %q received", msg.Reply) + } + + if len(strings.Split(msg.Reply, ".")) != 3 { + t.Fatalf("invalid number tokens in %s", msg.Reply) + } + + msg.Respond([]byte("ok")) + }) + if err != nil { + t.Fatalf("subscribe failed: %s", err) + } + + resp, err := nc.Request(sub.Subject, nil, time.Second) + if err != nil { + t.Fatalf("request failed: %s", err) + } + + if !bytes.Equal(resp.Data, []byte("ok")) { + t.Fatalf("did not receive ok: %q", resp.Data) + } +}