Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] User Information requests at $SYS.REQ.USER.INFO #3671

Merged
merged 4 commits into from Nov 28, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 6 additions & 1 deletion server/accounts.go
Expand Up @@ -1567,9 +1567,14 @@ func (a *Account) checkStreamImportsForCycles(to string, visited map[string]bool
// SetServiceImportSharing will allow sharing of information about requests with the export account.
// Used for service latency tracking at the moment.
func (a *Account) SetServiceImportSharing(destination *Account, to string, allow bool) error {
return a.setServiceImportSharing(destination, to, true, allow)
}

// setServiceImportSharing will allow sharing of information about requests with the export account.
func (a *Account) setServiceImportSharing(destination *Account, to string, check, allow bool) error {
a.mu.Lock()
defer a.mu.Unlock()
if a.isClaimAccount() {
if check && a.isClaimAccount() {
return fmt.Errorf("claim based accounts can not be updated directly")
}
for _, si := range a.imports.services {
Expand Down
21 changes: 14 additions & 7 deletions server/auth_test.go
Expand Up @@ -294,19 +294,26 @@ func TestUserConnectionDeadline(t *testing.T) {

ctx, cancel := context.WithTimeout(context.Background(), time.Second)

nc, err := nats.Connect(s.ClientURL(), nats.UserInfo("valid", ""), nats.NoReconnect(), nats.ErrorHandler(func(nc *nats.Conn, _ *nats.Subscription, err error) {
dcerr = err
cancel()
}))
nc, err := nats.Connect(
s.ClientURL(),
nats.UserInfo("valid", _EMPTY_),
nats.NoReconnect(),
nats.ErrorHandler(func(nc *nats.Conn, _ *nats.Subscription, err error) {
dcerr = err
cancel()
}))
if err != nil {
t.Fatalf("Expected client to connect, got: %s", err)
}

<-ctx.Done()

if nc.IsConnected() {
t.Fatalf("Expected to be disconnected")
}
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
if nc.IsConnected() {
return fmt.Errorf("Expected to be disconnected")
}
return nil
})

if dcerr == nil || dcerr.Error() != "nats: authentication expired" {
t.Fatalf("Expected a auth expired error: got: %v", dcerr)
Expand Down
73 changes: 65 additions & 8 deletions server/client.go
Expand Up @@ -960,6 +960,61 @@ func (c *client) setPermissions(perms *Permissions) {
}
}

// Build public permissions from internal ones.
// Used for user info requests.
func (c *client) publicPermissions() *Permissions {
c.mu.Lock()
defer c.mu.Unlock()

if c.perms == nil {
return nil
}
perms := &Permissions{
Publish: &SubjectPermission{},
Subscribe: &SubjectPermission{},
}

_subs := [32]*subscription{}

// Publish
if c.perms.pub.allow != nil {
subs := _subs[:0]
c.perms.pub.allow.All(&subs)
for _, sub := range subs {
perms.Publish.Allow = append(perms.Publish.Allow, string(sub.subject))
}
}
if c.perms.pub.deny != nil {
subs := _subs[:0]
c.perms.pub.deny.All(&subs)
for _, sub := range subs {
perms.Publish.Deny = append(perms.Publish.Deny, string(sub.subject))
}
}
// Subsribe
if c.perms.sub.allow != nil {
subs := _subs[:0]
c.perms.sub.allow.All(&subs)
for _, sub := range subs {
perms.Subscribe.Allow = append(perms.Subscribe.Allow, string(sub.subject))
}
}
if c.perms.sub.deny != nil {
subs := _subs[:0]
c.perms.sub.deny.All(&subs)
for _, sub := range subs {
perms.Subscribe.Deny = append(perms.Subscribe.Deny, string(sub.subject))
}
}
// Responses.
if c.perms.resp != nil {
rp := *c.perms.resp
perms.Response = &rp
}

return perms
}

type denyType int

const (
Expand Down Expand Up @@ -5302,29 +5357,31 @@ func (c *client) doTLSHandshake(typ string, solicit bool, url *url.URL, tlsConfi
// Lock should be held.
func (c *client) getRawAuthUser() string {
switch {
case c.opts.Nkey != "":
case c.opts.Nkey != _EMPTY_:
return c.opts.Nkey
case c.opts.Username != "":
case c.opts.Username != _EMPTY_:
return c.opts.Username
case c.opts.JWT != "":
case c.opts.JWT != _EMPTY_:
return c.pubKey
case c.opts.Token != "":
case c.opts.Token != _EMPTY_:
return c.opts.Token
default:
return ""
return _EMPTY_
}
}

// getAuthUser returns the auth user for the client.
// Lock should be held.
func (c *client) getAuthUser() string {
switch {
case c.opts.Nkey != "":
case c.opts.Nkey != _EMPTY_:
return fmt.Sprintf("Nkey %q", c.opts.Nkey)
case c.opts.Username != "":
case c.opts.Username != _EMPTY_:
return fmt.Sprintf("User %q", c.opts.Username)
case c.opts.JWT != "":
case c.opts.JWT != _EMPTY_:
return fmt.Sprintf("JWT User %q", c.pubKey)
case c.opts.Token != _EMPTY_:
return fmt.Sprintf("Token %q", c.opts.Token)
default:
return `User "N/A"`
}
Expand Down
76 changes: 76 additions & 0 deletions server/client_test.go
Expand Up @@ -2605,3 +2605,79 @@ func TestClientAuthRequiredNoAuthUser(t *testing.T) {
t.Fatalf("Expected AuthRequired to be false due to 'no_auth_user'")
}
}

func TestClientUserInfoReq(t *testing.T) {
conf := createConfFile(t, []byte(`
listen: 127.0.0.1:-1
PERMS = {
publish = { allow: "$SYS.REQ.>", deny: "$SYS.REQ.ACCOUNT.>" }
subscribe = "_INBOX.>"
allow_responses: true
}
accounts: {
A: { users: [ { user: dlc, password: pass, permissions: $PERMS } ] }
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}
no_auth_user: dlc
`))
defer removeFile(t, conf)

s, _ := RunServerWithConfig(conf)
defer s.Shutdown()

nc, err := nats.Connect(s.ClientURL())
require_NoError(t, err)
defer nc.Close()

resp, err := nc.Request("$SYS.REQ.USER.INFO", nil, time.Second)
require_NoError(t, err)

response := ServerAPIResponse{Data: &UserInfo{}}
err = json.Unmarshal(resp.Data, &response)
require_NoError(t, err)

userInfo := response.Data.(*UserInfo)

dlc := &UserInfo{
UserID: "dlc",
Account: "A",
Permissions: &Permissions{
Publish: &SubjectPermission{
Allow: []string{"$SYS.REQ.>"},
Deny: []string{"$SYS.REQ.ACCOUNT.>"},
},
Subscribe: &SubjectPermission{
Allow: []string{"_INBOX.>"},
},
Response: &ResponsePermission{
MaxMsgs: DEFAULT_ALLOW_RESPONSE_MAX_MSGS,
Expires: DEFAULT_ALLOW_RESPONSE_EXPIRATION,
},
},
}
if !reflect.DeepEqual(dlc, userInfo) {
t.Fatalf("User info for %q did not match", "dlc")
}

// Make sure system users work ok too.
nc, err = nats.Connect(s.ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
require_NoError(t, err)
defer nc.Close()

resp, err = nc.Request("$SYS.REQ.USER.INFO", nil, time.Second)
require_NoError(t, err)

response = ServerAPIResponse{Data: &UserInfo{}}
err = json.Unmarshal(resp.Data, &response)
require_NoError(t, err)

userInfo = response.Data.(*UserInfo)

admin := &UserInfo{
UserID: "admin",
Account: "$SYS",
}
if !reflect.DeepEqual(admin, userInfo) {
t.Fatalf("User info for %q did not match", "admin")
}
}
63 changes: 63 additions & 0 deletions server/events.go
Expand Up @@ -62,6 +62,10 @@ const (
remoteLatencyEventSubj = "$SYS.LATENCY.M2.%s"
inboxRespSubj = "$SYS._INBOX.%s.%s"

// Used to return information to a user on bound account and user permissions.
userDirectInfoSubj = "$SYS.REQ.USER.INFO"
userDirectReqSubj = "$SYS.REQ.USER.%s.INFO"

// FIXME(dlc) - Should account scope, even with wc for now, but later on
// we can then shard as needed.
accNumSubsReqSubj = "$SYS.REQ.ACCOUNT.NSUBS"
Expand Down Expand Up @@ -1030,6 +1034,13 @@ func (s *Server) initEventTracking() {
}
}

// User info.
// TODO(dlc) - Can be internal and not forwarded since bound server for the client connection
// is only one that will answer. This breaks tests since we still forward on remote server connect.
if _, err := s.sysSubscribe(fmt.Sprintf(userDirectReqSubj, "*"), s.userInfoReq); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
}

// For now only the STATZ subject has an account specific ping equivalent.
if _, err := s.sysSubscribe(fmt.Sprintf(accPingReqSubj, "STATZ"),
func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
Expand Down Expand Up @@ -1064,6 +1075,38 @@ func (s *Server) initEventTracking() {
}
}

// UserInfo returns basic information to a user about bound account and user permissions.
// For account information they will need to ping that separately, and this allows security
// controls on each subsystem if desired, e.g. account info, jetstream account info, etc.
type UserInfo struct {
UserID string `json:"user"`
Account string `json:"account"`
Permissions *Permissions `json:"permissions,omitempty"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also include Time of Day restrictions and CIDR block restrictions? Expiration might be useful here too. wdyt?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we could add in limits and expiration. Will look into adding those.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me add expiration, I think we hold on the others since that could open up a bunch of options on what is reported that blurs between user and accounts.

Will add expiration, that is a good add for sure.

}

// Process a user info request.
func (s *Server) userInfoReq(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
if !s.EventsEnabled() || reply == _EMPTY_ {
return
}

response := &ServerAPIResponse{Server: &ServerInfo{}}

ci, _, _, _, err := s.getRequestInfo(c, msg)
if err != nil {
response.Error = &ApiError{Code: http.StatusBadRequest}
s.sendInternalResponse(reply, response)
return
}

response.Data = &UserInfo{
UserID: ci.User,
Account: ci.Account,
Permissions: c.publicPermissions(),
}
s.sendInternalResponse(reply, response)
}

// register existing accounts with any system exports.
func (s *Server) registerSystemImportsForExisting() {
var accounts []*Account
Expand Down Expand Up @@ -1117,6 +1160,20 @@ func (s *Server) addSystemAccountExports(sacc *Account) {
}
}

// User info export.
userInfoSubj := fmt.Sprintf(userDirectReqSubj, "*")
if !sacc.hasServiceExportMatching(userInfoSubj) {
if err := sacc.AddServiceExport(userInfoSubj, nil); err != nil {
s.Errorf("Error adding system service export for %q: %v", userInfoSubj, err)
}
mappedSubj := fmt.Sprintf(userDirectReqSubj, sacc.GetName())
if err := sacc.AddServiceImport(sacc, userDirectInfoSubj, mappedSubj); err != nil {
s.Errorf("Error setting up system service import %s: %v", mappedSubj, err)
}
// Make sure to share details.
sacc.setServiceImportSharing(sacc, mappedSubj, false, true)
}

// Register any accounts that existed prior.
s.registerSystemImportsForExisting()

Expand Down Expand Up @@ -1721,6 +1778,12 @@ func (s *Server) registerSystemImports(a *Account) {
importSrvc(fmt.Sprintf(accPingReqSubj, "CONNZ"), mappedConnzSubj)
importSrvc(fmt.Sprintf(serverPingReqSubj, "CONNZ"), mappedConnzSubj)
importSrvc(fmt.Sprintf(accPingReqSubj, "STATZ"), fmt.Sprintf(accDirectReqSubj, a.Name, "STATZ"))

// This is for user's looking up their own info.
mappedSubject := fmt.Sprintf(userDirectReqSubj, a.Name)
importSrvc(userDirectInfoSubj, mappedSubject)
// Make sure to share details.
a.setServiceImportSharing(sacc, mappedSubject, false, true)
}

// Setup tracking for this account. This allows us to track global account activity.
Expand Down
14 changes: 7 additions & 7 deletions server/events_test.go
Expand Up @@ -1268,7 +1268,7 @@ func TestAccountReqMonitoring(t *testing.T) {
// query SUBSZ for account
resp, err := ncSys.Request(subsz, nil, time.Second)
require_NoError(t, err)
require_Contains(t, string(resp.Data), `"num_subscriptions":4,`)
require_Contains(t, string(resp.Data), `"num_subscriptions":5,`)
// create a subscription
sub, err := nc.Subscribe("foo", func(msg *nats.Msg) {})
require_NoError(t, err)
Expand All @@ -1278,7 +1278,7 @@ func TestAccountReqMonitoring(t *testing.T) {
// query SUBSZ for account
resp, err = ncSys.Request(subsz, nil, time.Second)
require_NoError(t, err)
require_Contains(t, string(resp.Data), `"num_subscriptions":5,`, `"subject":"foo"`)
require_Contains(t, string(resp.Data), `"num_subscriptions":6,`, `"subject":"foo"`)
// query connections for account
resp, err = ncSys.Request(connz, nil, time.Second)
require_NoError(t, err)
Expand Down Expand Up @@ -1418,15 +1418,15 @@ func TestAccountReqInfo(t *testing.T) {
t.Fatalf("Unmarshalling failed: %v", err)
} else if len(info.Exports) != 1 {
t.Fatalf("Unexpected value: %v", info.Exports)
} else if len(info.Imports) != 3 {
} else if len(info.Imports) != 4 {
t.Fatalf("Unexpected value: %+v", info.Imports)
} else if info.Exports[0].Subject != "req.*" {
t.Fatalf("Unexpected value: %v", info.Exports)
} else if info.Exports[0].Type != jwt.Service {
t.Fatalf("Unexpected value: %v", info.Exports)
} else if info.Exports[0].ResponseType != jwt.ResponseTypeSingleton {
t.Fatalf("Unexpected value: %v", info.Exports)
} else if info.SubCnt != 3 {
} else if info.SubCnt != 4 {
t.Fatalf("Unexpected value: %v", info.SubCnt)
} else {
checkCommon(&info, &srv, pub1, ajwt1)
Expand All @@ -1439,7 +1439,7 @@ func TestAccountReqInfo(t *testing.T) {
t.Fatalf("Unmarshalling failed: %v", err)
} else if len(info.Exports) != 0 {
t.Fatalf("Unexpected value: %v", info.Exports)
} else if len(info.Imports) != 4 {
} else if len(info.Imports) != 5 {
t.Fatalf("Unexpected value: %+v", info.Imports)
}
// Here we need to find our import
Expand All @@ -1457,7 +1457,7 @@ func TestAccountReqInfo(t *testing.T) {
t.Fatalf("Unexpected value: %+v", si)
} else if si.Account != pub1 {
t.Fatalf("Unexpected value: %+v", si)
} else if info.SubCnt != 4 {
} else if info.SubCnt != 5 {
t.Fatalf("Unexpected value: %+v", si)
} else {
checkCommon(&info, &srv, pub2, ajwt2)
Expand Down Expand Up @@ -1664,7 +1664,7 @@ func TestSystemAccountWithGateways(t *testing.T) {

// If this tests fails with wrong number after 10 seconds we may have
// added a new inititial subscription for the eventing system.
checkExpectedSubs(t, 47, sa)
checkExpectedSubs(t, 50, sa)

// Create a client on B and see if we receive the event
urlb := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port)
Expand Down