Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support custom inbox prefixes #767

Merged
merged 2 commits into from Aug 3, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
64 changes: 48 additions & 16 deletions nats.go
Expand Up @@ -433,6 +433,9 @@ type Options struct {
// For websocket connections, indicates to the server that the connection
// supports compression. If the server does too, then data will be compressed.
Compression bool

// InboxPrefix allows the default _INBOX prefix to be customized
InboxPrefix string
}

const (
Expand Down Expand Up @@ -494,11 +497,13 @@ type Conn struct {
ws bool // true if a websocket connection

// New style response handler
respSub string // The wildcard subject
respScanf string // The scanf template to extract mux token
respMux *Subscription // A single response subscription
respMap map[string]chan *Msg // Request map for the response msg channels
respRand *rand.Rand // Used for generating suffix
respSub string // The wildcard subject
respSubPrefix string // the wildcard prefix including trailing .
respSubLen int // the length of the wildcard prefix excluding trailing .
respScanf string // The scanf template to extract mux token
respMux *Subscription // A single response subscription
respMap map[string]chan *Msg // Request map for the response msg channels
respRand *rand.Rand // Used for generating suffix
}

type natsReader struct {
Expand Down Expand Up @@ -1101,6 +1106,17 @@ func Compression(enabled bool) Option {
}
}

// CustomInboxPrefix configures the request + reply inbox prefix
func CustomInboxPrefix(p string) Option {
return func(o *Options) error {
if p == "" || strings.HasSuffix(p, ">") || strings.HasSuffix(p, "*") || strings.HasSuffix(p, ".") || strings.HasPrefix(p, ">") || strings.HasPrefix(p, "*") {
ripienaar marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("nats: invald custom prefix")
}
o.InboxPrefix = p
return nil
}
}

// Handler processing

// SetDisconnectHandler will set the disconnect event handler.
Expand Down Expand Up @@ -3343,7 +3359,8 @@ func (nc *Conn) createNewRequestAndSend(subj string, hdr, data []byte) (chan *Ms
// Create new literal Inbox and map to a chan msg.
mch := make(chan *Msg, RequestChanLen)
respInbox := nc.newRespInbox()
token := respInbox[respInboxPrefixLen:]
token := respInbox[nc.respSubLen+1:]
ripienaar marked this conversation as resolved.
Show resolved Hide resolved

nc.respMap[token] = mch
if nc.respMux == nil {
// Create the response subscription we will use for all new style responses.
Expand Down Expand Up @@ -3450,7 +3467,7 @@ func (nc *Conn) newRequest(subj string, hdr, data []byte, timeout time.Duration)
// with the Inbox reply and return the first reply received.
// This is optimized for the case of multiple responses.
func (nc *Conn) oldRequest(subj string, hdr, data []byte, timeout time.Duration) (*Msg, error) {
inbox := NewInbox()
inbox := nc.newInbox()
ch := make(chan *Msg, RequestChanLen)

s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, true, nil)
Expand Down Expand Up @@ -3490,10 +3507,23 @@ func NewInbox() string {
return string(b[:])
}

func (nc *Conn) newInbox() string {
if nc.Opts.InboxPrefix == _EMPTY_ {
return NewInbox()
}

var sb strings.Builder
sb.WriteString(nc.Opts.InboxPrefix)
sb.WriteByte('.')
sb.WriteString(nuid.Next())
return sb.String()
}

// Function to init new response structures.
func (nc *Conn) initNewResp() {
// _INBOX wildcard
nc.respSub = fmt.Sprintf("%s.*", NewInbox())
nc.respSubPrefix = fmt.Sprintf("%s.", nc.newInbox())
nc.respSubLen = len(nc.respSubPrefix) - 1
ripienaar marked this conversation as resolved.
Show resolved Hide resolved
nc.respSub = fmt.Sprintf("%s*", nc.respSubPrefix)
nc.respMap = make(map[string]chan *Msg)
nc.respRand = rand.New(rand.NewSource(time.Now().UnixNano()))
}
Expand All @@ -3505,15 +3535,17 @@ func (nc *Conn) newRespInbox() string {
if nc.respMap == nil {
nc.initNewResp()
}
var b [respInboxPrefixLen + replySuffixLen]byte
pres := b[:respInboxPrefixLen]
copy(pres, nc.respSub)

ripienaar marked this conversation as resolved.
Show resolved Hide resolved
var sb strings.Builder
sb.WriteString(nc.respSubPrefix)

rn := nc.respRand.Int63()
for i, l := respInboxPrefixLen, rn; i < len(b); i++ {
b[i] = rdigits[l%base]
l /= base
for i := 0; i < nuidSize; i++ {
sb.WriteByte(rdigits[rn%base])
rn /= base
}
return string(b[:])

return sb.String()
}

// NewRespInbox is the new format used for _INBOX.
Expand Down
43 changes: 43 additions & 0 deletions nats_test.go
Expand Up @@ -2659,3 +2659,46 @@ func TestMsg_RespondMsg(t *testing.T) {
t.Fatalf("did not get correct response: %q", resp.Data)
}
}

func TestCustomInboxPrefix(t *testing.T) {
opts := &Options{}
for _, p := range []string{"$BOB.", "$BOB.*", "$BOB.>", ">", ".", ""} {
err := CustomInboxPrefix(p)(opts)
if err == nil {
t.Fatalf("Expeted error for %q", p)
}
}

s := RunServerOnPort(-1)
defer s.Shutdown()

nc, err := Connect(s.ClientURL(), CustomInboxPrefix("$BOB"))
if err != nil {
t.Fatalf("Expected to connect to server, got %v", err)
}
defer nc.Close()

sub, err := nc.Subscribe(NewInbox(), func(msg *Msg) {
if !strings.HasPrefix(msg.Reply, "$BOB.") {
t.Fatalf("invalid inbox subject %q received", msg.Reply)
}

if len(strings.Split(msg.Reply, ".")) != 3 {
t.Fatalf("invalid number tokens in %s", msg.Reply)
}

msg.Respond([]byte("ok"))
})
if err != nil {
t.Fatalf("subscribe failed: %s", err)
}

resp, err := nc.Request(sub.Subject, nil, time.Second)
if err != nil {
t.Fatalf("request failed: %s", err)
}

if !bytes.Equal(resp.Data, []byte("ok")) {
t.Fatalf("did not receive ok: %q", resp.Data)
}
}