Skip to content

Commit

Permalink
Merge pull request #767 from ripienaar/custom_inbox
Browse files Browse the repository at this point in the history
support custom inbox prefixes
  • Loading branch information
ripienaar committed Aug 3, 2021
2 parents 2b2bb8f + 7d68c97 commit ceb3147
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 22 deletions.
74 changes: 52 additions & 22 deletions nats.go
Expand Up @@ -435,6 +435,9 @@ type Options struct {
// For websocket connections, indicates to the server that the connection
// supports compression. If the server does too, then data will be compressed.
Compression bool

// InboxPrefix allows the default _INBOX prefix to be customized
InboxPrefix string
}

const (
Expand Down Expand Up @@ -496,11 +499,13 @@ type Conn struct {
ws bool // true if a websocket connection

// New style response handler
respSub string // The wildcard subject
respScanf string // The scanf template to extract mux token
respMux *Subscription // A single response subscription
respMap map[string]chan *Msg // Request map for the response msg channels
respRand *rand.Rand // Used for generating suffix
respSub string // The wildcard subject
respSubPrefix string // the wildcard prefix including trailing .
respSubLen int // the length of the wildcard prefix excluding trailing .
respScanf string // The scanf template to extract mux token
respMux *Subscription // A single response subscription
respMap map[string]chan *Msg // Request map for the response msg channels
respRand *rand.Rand // Used for generating suffix
}

type natsReader struct {
Expand Down Expand Up @@ -1103,6 +1108,17 @@ func Compression(enabled bool) Option {
}
}

// CustomInboxPrefix configures the request + reply inbox prefix
func CustomInboxPrefix(p string) Option {
return func(o *Options) error {
if p == "" || strings.Contains(p, ">") || strings.Contains(p, "*") || strings.HasSuffix(p, ".") {
return fmt.Errorf("nats: invald custom prefix")
}
o.InboxPrefix = p
return nil
}
}

// Handler processing

// SetDisconnectHandler will set the disconnect event handler.
Expand Down Expand Up @@ -3340,7 +3356,8 @@ func (nc *Conn) createNewRequestAndSend(subj string, hdr, data []byte) (chan *Ms
// Create new literal Inbox and map to a chan msg.
mch := make(chan *Msg, RequestChanLen)
respInbox := nc.newRespInbox()
token := respInbox[respInboxPrefixLen:]
token := respInbox[nc.respSubLen:]

nc.respMap[token] = mch
if nc.respMux == nil {
// Create the response subscription we will use for all new style responses.
Expand Down Expand Up @@ -3447,7 +3464,7 @@ func (nc *Conn) newRequest(subj string, hdr, data []byte, timeout time.Duration)
// with the Inbox reply and return the first reply received.
// This is optimized for the case of multiple responses.
func (nc *Conn) oldRequest(subj string, hdr, data []byte, timeout time.Duration) (*Msg, error) {
inbox := NewInbox()
inbox := nc.newInbox()
ch := make(chan *Msg, RequestChanLen)

s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, true, nil)
Expand All @@ -3467,12 +3484,10 @@ func (nc *Conn) oldRequest(subj string, hdr, data []byte, timeout time.Duration)

// InboxPrefix is the prefix for all inbox subjects.
const (
InboxPrefix = "_INBOX."
inboxPrefixLen = len(InboxPrefix)
respInboxPrefixLen = inboxPrefixLen + nuidSize + 1
replySuffixLen = 8 // Gives us 62^8
rdigits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
base = 62
InboxPrefix = "_INBOX."
inboxPrefixLen = len(InboxPrefix)
rdigits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
base = 62
)

// NewInbox will return an inbox string which can be used for directed replies from
Expand All @@ -3487,10 +3502,23 @@ func NewInbox() string {
return string(b[:])
}

func (nc *Conn) newInbox() string {
if nc.Opts.InboxPrefix == _EMPTY_ {
return NewInbox()
}

var sb strings.Builder
sb.WriteString(nc.Opts.InboxPrefix)
sb.WriteByte('.')
sb.WriteString(nuid.Next())
return sb.String()
}

// Function to init new response structures.
func (nc *Conn) initNewResp() {
// _INBOX wildcard
nc.respSub = fmt.Sprintf("%s.*", NewInbox())
nc.respSubPrefix = fmt.Sprintf("%s.", nc.newInbox())
nc.respSubLen = len(nc.respSubPrefix)
nc.respSub = fmt.Sprintf("%s*", nc.respSubPrefix)
nc.respMap = make(map[string]chan *Msg)
nc.respRand = rand.New(rand.NewSource(time.Now().UnixNano()))
}
Expand All @@ -3502,15 +3530,17 @@ func (nc *Conn) newRespInbox() string {
if nc.respMap == nil {
nc.initNewResp()
}
var b [respInboxPrefixLen + replySuffixLen]byte
pres := b[:respInboxPrefixLen]
copy(pres, nc.respSub)

var sb strings.Builder
sb.WriteString(nc.respSubPrefix)

rn := nc.respRand.Int63()
for i, l := respInboxPrefixLen, rn; i < len(b); i++ {
b[i] = rdigits[l%base]
l /= base
for i := 0; i < nuidSize; i++ {
sb.WriteByte(rdigits[rn%base])
rn /= base
}
return string(b[:])

return sb.String()
}

// NewRespInbox is the new format used for _INBOX.
Expand Down
43 changes: 43 additions & 0 deletions nats_test.go
Expand Up @@ -2659,3 +2659,46 @@ func TestMsg_RespondMsg(t *testing.T) {
t.Fatalf("did not get correct response: %q", resp.Data)
}
}

func TestCustomInboxPrefix(t *testing.T) {
opts := &Options{}
for _, p := range []string{"$BOB.", "$BOB.*", "$BOB.>", ">", ".", "", "BOB.*.X", "BOB.>.X"} {
err := CustomInboxPrefix(p)(opts)
if err == nil {
t.Fatalf("Expeted error for %q", p)
}
}

s := RunServerOnPort(-1)
defer s.Shutdown()

nc, err := Connect(s.ClientURL(), CustomInboxPrefix("$BOB"))
if err != nil {
t.Fatalf("Expected to connect to server, got %v", err)
}
defer nc.Close()

sub, err := nc.Subscribe(NewInbox(), func(msg *Msg) {
if !strings.HasPrefix(msg.Reply, "$BOB.") {
t.Fatalf("invalid inbox subject %q received", msg.Reply)
}

if len(strings.Split(msg.Reply, ".")) != 3 {
t.Fatalf("invalid number tokens in %s", msg.Reply)
}

msg.Respond([]byte("ok"))
})
if err != nil {
t.Fatalf("subscribe failed: %s", err)
}

resp, err := nc.Request(sub.Subject, nil, time.Second)
if err != nil {
t.Fatalf("request failed: %s", err)
}

if !bytes.Equal(resp.Data, []byte("ok")) {
t.Fatalf("did not receive ok: %q", resp.Data)
}
}

0 comments on commit ceb3147

Please sign in to comment.