Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] User Information requests at $SYS.REQ.USER.INFO #3671

Merged
merged 4 commits into from Nov 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 6 additions & 1 deletion server/accounts.go
Expand Up @@ -1567,9 +1567,14 @@ func (a *Account) checkStreamImportsForCycles(to string, visited map[string]bool
// SetServiceImportSharing will allow sharing of information about requests with the export account.
// Used for service latency tracking at the moment.
func (a *Account) SetServiceImportSharing(destination *Account, to string, allow bool) error {
return a.setServiceImportSharing(destination, to, true, allow)
}

// setServiceImportSharing will allow sharing of information about requests with the export account.
func (a *Account) setServiceImportSharing(destination *Account, to string, check, allow bool) error {
a.mu.Lock()
defer a.mu.Unlock()
if a.isClaimAccount() {
if check && a.isClaimAccount() {
return fmt.Errorf("claim based accounts can not be updated directly")
}
for _, si := range a.imports.services {
Expand Down
21 changes: 14 additions & 7 deletions server/auth_test.go
Expand Up @@ -294,19 +294,26 @@ func TestUserConnectionDeadline(t *testing.T) {

ctx, cancel := context.WithTimeout(context.Background(), time.Second)

nc, err := nats.Connect(s.ClientURL(), nats.UserInfo("valid", ""), nats.NoReconnect(), nats.ErrorHandler(func(nc *nats.Conn, _ *nats.Subscription, err error) {
dcerr = err
cancel()
}))
nc, err := nats.Connect(
s.ClientURL(),
nats.UserInfo("valid", _EMPTY_),
nats.NoReconnect(),
nats.ErrorHandler(func(nc *nats.Conn, _ *nats.Subscription, err error) {
dcerr = err
cancel()
}))
if err != nil {
t.Fatalf("Expected client to connect, got: %s", err)
}

<-ctx.Done()

if nc.IsConnected() {
t.Fatalf("Expected to be disconnected")
}
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
if nc.IsConnected() {
return fmt.Errorf("Expected to be disconnected")
}
return nil
})

if dcerr == nil || dcerr.Error() != "nats: authentication expired" {
t.Fatalf("Expected a auth expired error: got: %v", dcerr)
Expand Down
88 changes: 80 additions & 8 deletions server/client.go
Expand Up @@ -250,6 +250,7 @@ type client struct {
darray []string
pcd map[*client]struct{}
atmr *time.Timer
expires time.Time
ping pinfo
msgb [msgScratchSize]byte
last time.Time
Expand Down Expand Up @@ -960,6 +961,61 @@ func (c *client) setPermissions(perms *Permissions) {
}
}

// Build public permissions from internal ones.
// Used for user info requests.
func (c *client) publicPermissions() *Permissions {
c.mu.Lock()
defer c.mu.Unlock()

if c.perms == nil {
return nil
}
perms := &Permissions{
Publish: &SubjectPermission{},
Subscribe: &SubjectPermission{},
}

_subs := [32]*subscription{}

// Publish
if c.perms.pub.allow != nil {
subs := _subs[:0]
c.perms.pub.allow.All(&subs)
for _, sub := range subs {
perms.Publish.Allow = append(perms.Publish.Allow, string(sub.subject))
}
}
if c.perms.pub.deny != nil {
subs := _subs[:0]
c.perms.pub.deny.All(&subs)
for _, sub := range subs {
perms.Publish.Deny = append(perms.Publish.Deny, string(sub.subject))
}
}
// Subsribe
if c.perms.sub.allow != nil {
subs := _subs[:0]
c.perms.sub.allow.All(&subs)
for _, sub := range subs {
perms.Subscribe.Allow = append(perms.Subscribe.Allow, string(sub.subject))
}
}
if c.perms.sub.deny != nil {
subs := _subs[:0]
c.perms.sub.deny.All(&subs)
for _, sub := range subs {
perms.Subscribe.Deny = append(perms.Subscribe.Deny, string(sub.subject))
}
}
// Responses.
if c.perms.resp != nil {
rp := *c.perms.resp
perms.Response = &rp
}

return perms
}

type denyType int

const (
Expand Down Expand Up @@ -4656,9 +4712,23 @@ func (c *client) awaitingAuth() bool {
func (c *client) setExpirationTimer(d time.Duration) {
c.mu.Lock()
c.atmr = time.AfterFunc(d, c.authExpired)
// This is an JWT expiration.
if c.flags.isSet(connectReceived) {
c.expires = time.Now().Add(d).Truncate(time.Second)
}
c.mu.Unlock()
}

// Return when this client expires via a claim, or 0 if not set.
func (c *client) claimExpiration() time.Duration {
c.mu.Lock()
defer c.mu.Unlock()
if c.expires.IsZero() {
return 0
}
return time.Until(c.expires).Truncate(time.Second)
}

// Possibly flush the connection and then close the low level connection.
// The boolean `minimalFlush` indicates if the flush operation should have a
// minimal write deadline.
Expand Down Expand Up @@ -5302,29 +5372,31 @@ func (c *client) doTLSHandshake(typ string, solicit bool, url *url.URL, tlsConfi
// Lock should be held.
func (c *client) getRawAuthUser() string {
switch {
case c.opts.Nkey != "":
case c.opts.Nkey != _EMPTY_:
return c.opts.Nkey
case c.opts.Username != "":
case c.opts.Username != _EMPTY_:
return c.opts.Username
case c.opts.JWT != "":
case c.opts.JWT != _EMPTY_:
return c.pubKey
case c.opts.Token != "":
case c.opts.Token != _EMPTY_:
return c.opts.Token
default:
return ""
return _EMPTY_
}
}

// getAuthUser returns the auth user for the client.
// Lock should be held.
func (c *client) getAuthUser() string {
switch {
case c.opts.Nkey != "":
case c.opts.Nkey != _EMPTY_:
return fmt.Sprintf("Nkey %q", c.opts.Nkey)
case c.opts.Username != "":
case c.opts.Username != _EMPTY_:
return fmt.Sprintf("User %q", c.opts.Username)
case c.opts.JWT != "":
case c.opts.JWT != _EMPTY_:
return fmt.Sprintf("JWT User %q", c.pubKey)
case c.opts.Token != _EMPTY_:
return fmt.Sprintf("Token %q", c.opts.Token)
default:
return `User "N/A"`
}
Expand Down
76 changes: 76 additions & 0 deletions server/client_test.go
Expand Up @@ -2605,3 +2605,79 @@ func TestClientAuthRequiredNoAuthUser(t *testing.T) {
t.Fatalf("Expected AuthRequired to be false due to 'no_auth_user'")
}
}

func TestClientUserInfoReq(t *testing.T) {
conf := createConfFile(t, []byte(`
listen: 127.0.0.1:-1
PERMS = {
publish = { allow: "$SYS.REQ.>", deny: "$SYS.REQ.ACCOUNT.>" }
subscribe = "_INBOX.>"
allow_responses: true
}
accounts: {
A: { users: [ { user: dlc, password: pass, permissions: $PERMS } ] }
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}
no_auth_user: dlc
`))
defer removeFile(t, conf)

s, _ := RunServerWithConfig(conf)
defer s.Shutdown()

nc, err := nats.Connect(s.ClientURL())
require_NoError(t, err)
defer nc.Close()

resp, err := nc.Request("$SYS.REQ.USER.INFO", nil, time.Second)
require_NoError(t, err)

response := ServerAPIResponse{Data: &UserInfo{}}
err = json.Unmarshal(resp.Data, &response)
require_NoError(t, err)

userInfo := response.Data.(*UserInfo)

dlc := &UserInfo{
UserID: "dlc",
Account: "A",
Permissions: &Permissions{
Publish: &SubjectPermission{
Allow: []string{"$SYS.REQ.>"},
Deny: []string{"$SYS.REQ.ACCOUNT.>"},
},
Subscribe: &SubjectPermission{
Allow: []string{"_INBOX.>"},
},
Response: &ResponsePermission{
MaxMsgs: DEFAULT_ALLOW_RESPONSE_MAX_MSGS,
Expires: DEFAULT_ALLOW_RESPONSE_EXPIRATION,
},
},
}
if !reflect.DeepEqual(dlc, userInfo) {
t.Fatalf("User info for %q did not match", "dlc")
}

// Make sure system users work ok too.
nc, err = nats.Connect(s.ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
require_NoError(t, err)
defer nc.Close()

resp, err = nc.Request("$SYS.REQ.USER.INFO", nil, time.Second)
require_NoError(t, err)

response = ServerAPIResponse{Data: &UserInfo{}}
err = json.Unmarshal(resp.Data, &response)
require_NoError(t, err)

userInfo = response.Data.(*UserInfo)

admin := &UserInfo{
UserID: "admin",
Account: "$SYS",
}
if !reflect.DeepEqual(admin, userInfo) {
t.Fatalf("User info for %q did not match", "admin")
}
}
65 changes: 65 additions & 0 deletions server/events.go
Expand Up @@ -62,6 +62,10 @@ const (
remoteLatencyEventSubj = "$SYS.LATENCY.M2.%s"
inboxRespSubj = "$SYS._INBOX.%s.%s"

// Used to return information to a user on bound account and user permissions.
userDirectInfoSubj = "$SYS.REQ.USER.INFO"
userDirectReqSubj = "$SYS.REQ.USER.%s.INFO"

// FIXME(dlc) - Should account scope, even with wc for now, but later on
// we can then shard as needed.
accNumSubsReqSubj = "$SYS.REQ.ACCOUNT.NSUBS"
Expand Down Expand Up @@ -1030,6 +1034,13 @@ func (s *Server) initEventTracking() {
}
}

// User info.
// TODO(dlc) - Can be internal and not forwarded since bound server for the client connection
// is only one that will answer. This breaks tests since we still forward on remote server connect.
if _, err := s.sysSubscribe(fmt.Sprintf(userDirectReqSubj, "*"), s.userInfoReq); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
}

// For now only the STATZ subject has an account specific ping equivalent.
if _, err := s.sysSubscribe(fmt.Sprintf(accPingReqSubj, "STATZ"),
func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
Expand Down Expand Up @@ -1064,6 +1075,40 @@ func (s *Server) initEventTracking() {
}
}

// UserInfo returns basic information to a user about bound account and user permissions.
// For account information they will need to ping that separately, and this allows security
// controls on each subsystem if desired, e.g. account info, jetstream account info, etc.
type UserInfo struct {
UserID string `json:"user"`
Account string `json:"account"`
Permissions *Permissions `json:"permissions,omitempty"`
Expires time.Duration `json:"expires,omitempty"`
}

// Process a user info request.
func (s *Server) userInfoReq(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
if !s.EventsEnabled() || reply == _EMPTY_ {
return
}

response := &ServerAPIResponse{Server: &ServerInfo{}}

ci, _, _, _, err := s.getRequestInfo(c, msg)
if err != nil {
response.Error = &ApiError{Code: http.StatusBadRequest}
s.sendInternalResponse(reply, response)
return
}

response.Data = &UserInfo{
UserID: ci.User,
Account: ci.Account,
Permissions: c.publicPermissions(),
Expires: c.claimExpiration(),
}
s.sendInternalResponse(reply, response)
}

// register existing accounts with any system exports.
func (s *Server) registerSystemImportsForExisting() {
var accounts []*Account
Expand Down Expand Up @@ -1117,6 +1162,20 @@ func (s *Server) addSystemAccountExports(sacc *Account) {
}
}

// User info export.
userInfoSubj := fmt.Sprintf(userDirectReqSubj, "*")
if !sacc.hasServiceExportMatching(userInfoSubj) {
if err := sacc.AddServiceExport(userInfoSubj, nil); err != nil {
s.Errorf("Error adding system service export for %q: %v", userInfoSubj, err)
}
mappedSubj := fmt.Sprintf(userDirectReqSubj, sacc.GetName())
if err := sacc.AddServiceImport(sacc, userDirectInfoSubj, mappedSubj); err != nil {
s.Errorf("Error setting up system service import %s: %v", mappedSubj, err)
}
// Make sure to share details.
sacc.setServiceImportSharing(sacc, mappedSubj, false, true)
}

// Register any accounts that existed prior.
s.registerSystemImportsForExisting()

Expand Down Expand Up @@ -1721,6 +1780,12 @@ func (s *Server) registerSystemImports(a *Account) {
importSrvc(fmt.Sprintf(accPingReqSubj, "CONNZ"), mappedConnzSubj)
importSrvc(fmt.Sprintf(serverPingReqSubj, "CONNZ"), mappedConnzSubj)
importSrvc(fmt.Sprintf(accPingReqSubj, "STATZ"), fmt.Sprintf(accDirectReqSubj, a.Name, "STATZ"))

// This is for user's looking up their own info.
mappedSubject := fmt.Sprintf(userDirectReqSubj, a.Name)
importSrvc(userDirectInfoSubj, mappedSubject)
// Make sure to share details.
a.setServiceImportSharing(sacc, mappedSubject, false, true)
}

// Setup tracking for this account. This allows us to track global account activity.
Expand Down