Skip to content

Commit

Permalink
Merge pull request #3671 from nats-io/user-info
Browse files Browse the repository at this point in the history
[ADDED] User Information requests at $SYS.REQ.USER.INFO
  • Loading branch information
derekcollison committed Nov 28, 2022
2 parents be0558c + 8365fb3 commit 14f40b9
Show file tree
Hide file tree
Showing 13 changed files with 428 additions and 154 deletions.
7 changes: 6 additions & 1 deletion server/accounts.go
Expand Up @@ -1567,9 +1567,14 @@ func (a *Account) checkStreamImportsForCycles(to string, visited map[string]bool
// SetServiceImportSharing will allow sharing of information about requests with the export account.
// Used for service latency tracking at the moment.
func (a *Account) SetServiceImportSharing(destination *Account, to string, allow bool) error {
return a.setServiceImportSharing(destination, to, true, allow)
}

// setServiceImportSharing will allow sharing of information about requests with the export account.
func (a *Account) setServiceImportSharing(destination *Account, to string, check, allow bool) error {
a.mu.Lock()
defer a.mu.Unlock()
if a.isClaimAccount() {
if check && a.isClaimAccount() {
return fmt.Errorf("claim based accounts can not be updated directly")
}
for _, si := range a.imports.services {
Expand Down
21 changes: 14 additions & 7 deletions server/auth_test.go
Expand Up @@ -294,19 +294,26 @@ func TestUserConnectionDeadline(t *testing.T) {

ctx, cancel := context.WithTimeout(context.Background(), time.Second)

nc, err := nats.Connect(s.ClientURL(), nats.UserInfo("valid", ""), nats.NoReconnect(), nats.ErrorHandler(func(nc *nats.Conn, _ *nats.Subscription, err error) {
dcerr = err
cancel()
}))
nc, err := nats.Connect(
s.ClientURL(),
nats.UserInfo("valid", _EMPTY_),
nats.NoReconnect(),
nats.ErrorHandler(func(nc *nats.Conn, _ *nats.Subscription, err error) {
dcerr = err
cancel()
}))
if err != nil {
t.Fatalf("Expected client to connect, got: %s", err)
}

<-ctx.Done()

if nc.IsConnected() {
t.Fatalf("Expected to be disconnected")
}
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
if nc.IsConnected() {
return fmt.Errorf("Expected to be disconnected")
}
return nil
})

if dcerr == nil || dcerr.Error() != "nats: authentication expired" {
t.Fatalf("Expected a auth expired error: got: %v", dcerr)
Expand Down
88 changes: 80 additions & 8 deletions server/client.go
Expand Up @@ -250,6 +250,7 @@ type client struct {
darray []string
pcd map[*client]struct{}
atmr *time.Timer
expires time.Time
ping pinfo
msgb [msgScratchSize]byte
last time.Time
Expand Down Expand Up @@ -960,6 +961,61 @@ func (c *client) setPermissions(perms *Permissions) {
}
}

// Build public permissions from internal ones.
// Used for user info requests.
func (c *client) publicPermissions() *Permissions {
c.mu.Lock()
defer c.mu.Unlock()

if c.perms == nil {
return nil
}
perms := &Permissions{
Publish: &SubjectPermission{},
Subscribe: &SubjectPermission{},
}

_subs := [32]*subscription{}

// Publish
if c.perms.pub.allow != nil {
subs := _subs[:0]
c.perms.pub.allow.All(&subs)
for _, sub := range subs {
perms.Publish.Allow = append(perms.Publish.Allow, string(sub.subject))
}
}
if c.perms.pub.deny != nil {
subs := _subs[:0]
c.perms.pub.deny.All(&subs)
for _, sub := range subs {
perms.Publish.Deny = append(perms.Publish.Deny, string(sub.subject))
}
}
// Subsribe
if c.perms.sub.allow != nil {
subs := _subs[:0]
c.perms.sub.allow.All(&subs)
for _, sub := range subs {
perms.Subscribe.Allow = append(perms.Subscribe.Allow, string(sub.subject))
}
}
if c.perms.sub.deny != nil {
subs := _subs[:0]
c.perms.sub.deny.All(&subs)
for _, sub := range subs {
perms.Subscribe.Deny = append(perms.Subscribe.Deny, string(sub.subject))
}
}
// Responses.
if c.perms.resp != nil {
rp := *c.perms.resp
perms.Response = &rp
}

return perms
}

type denyType int

const (
Expand Down Expand Up @@ -4656,9 +4712,23 @@ func (c *client) awaitingAuth() bool {
func (c *client) setExpirationTimer(d time.Duration) {
c.mu.Lock()
c.atmr = time.AfterFunc(d, c.authExpired)
// This is an JWT expiration.
if c.flags.isSet(connectReceived) {
c.expires = time.Now().Add(d).Truncate(time.Second)
}
c.mu.Unlock()
}

// Return when this client expires via a claim, or 0 if not set.
func (c *client) claimExpiration() time.Duration {
c.mu.Lock()
defer c.mu.Unlock()
if c.expires.IsZero() {
return 0
}
return time.Until(c.expires).Truncate(time.Second)
}

// Possibly flush the connection and then close the low level connection.
// The boolean `minimalFlush` indicates if the flush operation should have a
// minimal write deadline.
Expand Down Expand Up @@ -5302,29 +5372,31 @@ func (c *client) doTLSHandshake(typ string, solicit bool, url *url.URL, tlsConfi
// Lock should be held.
func (c *client) getRawAuthUser() string {
switch {
case c.opts.Nkey != "":
case c.opts.Nkey != _EMPTY_:
return c.opts.Nkey
case c.opts.Username != "":
case c.opts.Username != _EMPTY_:
return c.opts.Username
case c.opts.JWT != "":
case c.opts.JWT != _EMPTY_:
return c.pubKey
case c.opts.Token != "":
case c.opts.Token != _EMPTY_:
return c.opts.Token
default:
return ""
return _EMPTY_
}
}

// getAuthUser returns the auth user for the client.
// Lock should be held.
func (c *client) getAuthUser() string {
switch {
case c.opts.Nkey != "":
case c.opts.Nkey != _EMPTY_:
return fmt.Sprintf("Nkey %q", c.opts.Nkey)
case c.opts.Username != "":
case c.opts.Username != _EMPTY_:
return fmt.Sprintf("User %q", c.opts.Username)
case c.opts.JWT != "":
case c.opts.JWT != _EMPTY_:
return fmt.Sprintf("JWT User %q", c.pubKey)
case c.opts.Token != _EMPTY_:
return fmt.Sprintf("Token %q", c.opts.Token)
default:
return `User "N/A"`
}
Expand Down
76 changes: 76 additions & 0 deletions server/client_test.go
Expand Up @@ -2605,3 +2605,79 @@ func TestClientAuthRequiredNoAuthUser(t *testing.T) {
t.Fatalf("Expected AuthRequired to be false due to 'no_auth_user'")
}
}

func TestClientUserInfoReq(t *testing.T) {
conf := createConfFile(t, []byte(`
listen: 127.0.0.1:-1
PERMS = {
publish = { allow: "$SYS.REQ.>", deny: "$SYS.REQ.ACCOUNT.>" }
subscribe = "_INBOX.>"
allow_responses: true
}
accounts: {
A: { users: [ { user: dlc, password: pass, permissions: $PERMS } ] }
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}
no_auth_user: dlc
`))
defer removeFile(t, conf)

s, _ := RunServerWithConfig(conf)
defer s.Shutdown()

nc, err := nats.Connect(s.ClientURL())
require_NoError(t, err)
defer nc.Close()

resp, err := nc.Request("$SYS.REQ.USER.INFO", nil, time.Second)
require_NoError(t, err)

response := ServerAPIResponse{Data: &UserInfo{}}
err = json.Unmarshal(resp.Data, &response)
require_NoError(t, err)

userInfo := response.Data.(*UserInfo)

dlc := &UserInfo{
UserID: "dlc",
Account: "A",
Permissions: &Permissions{
Publish: &SubjectPermission{
Allow: []string{"$SYS.REQ.>"},
Deny: []string{"$SYS.REQ.ACCOUNT.>"},
},
Subscribe: &SubjectPermission{
Allow: []string{"_INBOX.>"},
},
Response: &ResponsePermission{
MaxMsgs: DEFAULT_ALLOW_RESPONSE_MAX_MSGS,
Expires: DEFAULT_ALLOW_RESPONSE_EXPIRATION,
},
},
}
if !reflect.DeepEqual(dlc, userInfo) {
t.Fatalf("User info for %q did not match", "dlc")
}

// Make sure system users work ok too.
nc, err = nats.Connect(s.ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
require_NoError(t, err)
defer nc.Close()

resp, err = nc.Request("$SYS.REQ.USER.INFO", nil, time.Second)
require_NoError(t, err)

response = ServerAPIResponse{Data: &UserInfo{}}
err = json.Unmarshal(resp.Data, &response)
require_NoError(t, err)

userInfo = response.Data.(*UserInfo)

admin := &UserInfo{
UserID: "admin",
Account: "$SYS",
}
if !reflect.DeepEqual(admin, userInfo) {
t.Fatalf("User info for %q did not match", "admin")
}
}
65 changes: 65 additions & 0 deletions server/events.go
Expand Up @@ -62,6 +62,10 @@ const (
remoteLatencyEventSubj = "$SYS.LATENCY.M2.%s"
inboxRespSubj = "$SYS._INBOX.%s.%s"

// Used to return information to a user on bound account and user permissions.
userDirectInfoSubj = "$SYS.REQ.USER.INFO"
userDirectReqSubj = "$SYS.REQ.USER.%s.INFO"

// FIXME(dlc) - Should account scope, even with wc for now, but later on
// we can then shard as needed.
accNumSubsReqSubj = "$SYS.REQ.ACCOUNT.NSUBS"
Expand Down Expand Up @@ -1030,6 +1034,13 @@ func (s *Server) initEventTracking() {
}
}

// User info.
// TODO(dlc) - Can be internal and not forwarded since bound server for the client connection
// is only one that will answer. This breaks tests since we still forward on remote server connect.
if _, err := s.sysSubscribe(fmt.Sprintf(userDirectReqSubj, "*"), s.userInfoReq); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
}

// For now only the STATZ subject has an account specific ping equivalent.
if _, err := s.sysSubscribe(fmt.Sprintf(accPingReqSubj, "STATZ"),
func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
Expand Down Expand Up @@ -1064,6 +1075,40 @@ func (s *Server) initEventTracking() {
}
}

// UserInfo returns basic information to a user about bound account and user permissions.
// For account information they will need to ping that separately, and this allows security
// controls on each subsystem if desired, e.g. account info, jetstream account info, etc.
type UserInfo struct {
UserID string `json:"user"`
Account string `json:"account"`
Permissions *Permissions `json:"permissions,omitempty"`
Expires time.Duration `json:"expires,omitempty"`
}

// Process a user info request.
func (s *Server) userInfoReq(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
if !s.EventsEnabled() || reply == _EMPTY_ {
return
}

response := &ServerAPIResponse{Server: &ServerInfo{}}

ci, _, _, _, err := s.getRequestInfo(c, msg)
if err != nil {
response.Error = &ApiError{Code: http.StatusBadRequest}
s.sendInternalResponse(reply, response)
return
}

response.Data = &UserInfo{
UserID: ci.User,
Account: ci.Account,
Permissions: c.publicPermissions(),
Expires: c.claimExpiration(),
}
s.sendInternalResponse(reply, response)
}

// register existing accounts with any system exports.
func (s *Server) registerSystemImportsForExisting() {
var accounts []*Account
Expand Down Expand Up @@ -1117,6 +1162,20 @@ func (s *Server) addSystemAccountExports(sacc *Account) {
}
}

// User info export.
userInfoSubj := fmt.Sprintf(userDirectReqSubj, "*")
if !sacc.hasServiceExportMatching(userInfoSubj) {
if err := sacc.AddServiceExport(userInfoSubj, nil); err != nil {
s.Errorf("Error adding system service export for %q: %v", userInfoSubj, err)
}
mappedSubj := fmt.Sprintf(userDirectReqSubj, sacc.GetName())
if err := sacc.AddServiceImport(sacc, userDirectInfoSubj, mappedSubj); err != nil {
s.Errorf("Error setting up system service import %s: %v", mappedSubj, err)
}
// Make sure to share details.
sacc.setServiceImportSharing(sacc, mappedSubj, false, true)
}

// Register any accounts that existed prior.
s.registerSystemImportsForExisting()

Expand Down Expand Up @@ -1721,6 +1780,12 @@ func (s *Server) registerSystemImports(a *Account) {
importSrvc(fmt.Sprintf(accPingReqSubj, "CONNZ"), mappedConnzSubj)
importSrvc(fmt.Sprintf(serverPingReqSubj, "CONNZ"), mappedConnzSubj)
importSrvc(fmt.Sprintf(accPingReqSubj, "STATZ"), fmt.Sprintf(accDirectReqSubj, a.Name, "STATZ"))

// This is for user's looking up their own info.
mappedSubject := fmt.Sprintf(userDirectReqSubj, a.Name)
importSrvc(userDirectInfoSubj, mappedSubject)
// Make sure to share details.
a.setServiceImportSharing(sacc, mappedSubject, false, true)
}

// Setup tracking for this account. This allows us to track global account activity.
Expand Down

0 comments on commit 14f40b9

Please sign in to comment.