Skip to content

Commit

Permalink
Evaluate service imports when publishing advisories (#4302)
Browse files Browse the repository at this point in the history
This should fix #4275 by allowing advisories to be copied into other
accounts via service imports.

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
derekcollison committed Jul 12, 2023
2 parents 9cfe8b8 + 1434ee7 commit 0c8552c
Show file tree
Hide file tree
Showing 2 changed files with 219 additions and 8 deletions.
10 changes: 2 additions & 8 deletions server/client.go
Expand Up @@ -3127,20 +3127,14 @@ var needFlush = struct{}{}
// deliverMsg will deliver a message to a matching subscription and its underlying client.
// We process all connection/client types. mh is the part that will be protocol/client specific.
func (c *client) deliverMsg(prodIsMQTT bool, sub *subscription, acc *Account, subject, reply, mh, msg []byte, gwrply bool) bool {
// Check sub client and check echo
if sub.client == nil || c == sub.client && !sub.client.echo {
// Check sub client and check echo. Only do this if not a service import.
if sub.client == nil || (c == sub.client && !sub.client.echo && !sub.si) {
return false
}

client := sub.client
client.mu.Lock()

// Check echo
if c == client && !client.echo {
client.mu.Unlock()
return false
}

// Check if we have a subscribe deny clause. This will trigger us to check the subject
// for a match against the denied subjects.
if client.mperms != nil && client.checkDenySub(string(subject)) {
Expand Down
217 changes: 217 additions & 0 deletions server/jetstream_test.go
Expand Up @@ -10723,6 +10723,223 @@ func TestJetStreamAccountImportBasics(t *testing.T) {
}
}

// This tests whether we are able to aggregate all JetStream advisory events
// from all accounts into a single account. Config for this test uses
// service imports and exports as that allows for gathering all events
// without having to know the account name and without separate entries
// for each account in aggregate account config.
// This test fails as it is not receiving the api audit event ($JS.EVENT.ADVISORY.API).
func TestJetStreamAccountImportJSAdvisoriesAsService(t *testing.T) {
conf := createConfFile(t, []byte(`
listen=127.0.0.1:-1
no_auth_user: pp
jetstream: {max_mem_store: 64GB, max_file_store: 10TB}
accounts {
JS {
jetstream: enabled
users: [ {user: pp, password: foo} ]
imports [
{ service: { account: AGG, subject: '$JS.EVENT.ADVISORY.ACC.JS.>' }, to: '$JS.EVENT.ADVISORY.>' }
]
}
AGG {
users: [ {user: agg, password: foo} ]
exports: [
{ service: '$JS.EVENT.ADVISORY.ACC.*.>', response: Singleton, account_token_position: 5 }
]
}
}
`))

s, _ := RunServerWithConfig(conf)
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()

// This should be the pp user, one which manages JetStream assets
ncJS, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error during connect: %v", err)
}
defer ncJS.Close()

// This is the agg user, which should aggregate all JS advisory events.
ncAgg, err := nats.Connect(s.ClientURL(), nats.UserInfo("agg", "foo"))
if err != nil {
t.Fatalf("Unexpected error during connect: %v", err)
}
defer ncAgg.Close()

js, err := ncJS.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// user from JS account should receive events on $JS.EVENT.ADVISORY.> subject
subJS, err := ncJS.SubscribeSync("$JS.EVENT.ADVISORY.>")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer subJS.Unsubscribe()

// user from AGG account should receive events on mapped $JS.EVENT.ADVISORY.ACC.JS.> subject (with account name)
subAgg, err := ncAgg.SubscribeSync("$JS.EVENT.ADVISORY.ACC.JS.>")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// add stream using JS account
// this should trigger 2 events:
// - an action event on $JS.EVENT.ADVISORY.STREAM.CREATED.ORDERS
// - an api audit event on $JS.EVENT.ADVISORY.API
_, err = js.AddStream(&nats.StreamConfig{Name: "ORDERS", Subjects: []string{"ORDERS.*"}})
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
msg, err := subJS.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if msg.Subject != "$JS.EVENT.ADVISORY.STREAM.CREATED.ORDERS" {
t.Fatalf("Unexpected subject: %q", msg.Subject)
}
msg, err = subJS.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if msg.Subject != "$JS.EVENT.ADVISORY.API" {
t.Fatalf("Unexpected subject: %q", msg.Subject)
}

// same set of events should be received by AGG account
// on subjects containing account name (ACC.JS)
msg, err = subAgg.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if msg.Subject != "$JS.EVENT.ADVISORY.ACC.JS.STREAM.CREATED.ORDERS" {
t.Fatalf("Unexpected subject: %q", msg.Subject)
}

// we get error here, since we do not get the api audit event
msg, err = subAgg.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if msg.Subject != "$JS.EVENT.ADVISORY.ACC.JS.API" {
t.Fatalf("Unexpected subject: %q", msg.Subject)
}
}

// This tests whether we are able to aggregate all JetStream advisory events
// from all accounts into a single account. Config for this test uses
// stream imports and exports as that allows for gathering all events
// as long as there is a separate stream import entry for each account
// in aggregate account config.
func TestJetStreamAccountImportJSAdvisoriesAsStream(t *testing.T) {
conf := createConfFile(t, []byte(`
listen=127.0.0.1:-1
no_auth_user: pp
jetstream: {max_mem_store: 64GB, max_file_store: 10TB}
accounts {
JS {
jetstream: enabled
users: [ {user: pp, password: foo} ]
exports [
{ stream: '$JS.EVENT.ADVISORY.>' }
]
}
AGG {
users: [ {user: agg, password: foo} ]
imports: [
{ stream: { account: JS, subject: '$JS.EVENT.ADVISORY.>' }, to: '$JS.EVENT.ADVISORY.ACC.JS.>' }
]
}
}
`))

s, _ := RunServerWithConfig(conf)
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()

// This should be the pp user, one which manages JetStream assets
ncJS, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error during connect: %v", err)
}
defer ncJS.Close()

// This is the agg user, which should aggregate all JS advisory events.
ncAgg, err := nats.Connect(s.ClientURL(), nats.UserInfo("agg", "foo"))
if err != nil {
t.Fatalf("Unexpected error during connect: %v", err)
}
defer ncAgg.Close()

js, err := ncJS.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// user from JS account should receive events on $JS.EVENT.ADVISORY.> subject
subJS, err := ncJS.SubscribeSync("$JS.EVENT.ADVISORY.>")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer subJS.Unsubscribe()

// user from AGG account should receive events on mapped $JS.EVENT.ADVISORY.ACC.JS.> subject (with account name)
subAgg, err := ncAgg.SubscribeSync("$JS.EVENT.ADVISORY.ACC.JS.>")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// add stream using JS account
// this should trigger 2 events:
// - an action event on $JS.EVENT.ADVISORY.STREAM.CREATED.ORDERS
// - an api audit event on $JS.EVENT.ADVISORY.API
_, err = js.AddStream(&nats.StreamConfig{Name: "ORDERS", Subjects: []string{"ORDERS.*"}})
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
msg, err := subJS.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if msg.Subject != "$JS.EVENT.ADVISORY.STREAM.CREATED.ORDERS" {
t.Fatalf("Unexpected subject: %q", msg.Subject)
}
msg, err = subJS.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if msg.Subject != "$JS.EVENT.ADVISORY.API" {
t.Fatalf("Unexpected subject: %q", msg.Subject)
}

// same set of events should be received by AGG account
// on subjects containing account name (ACC.JS)
msg, err = subAgg.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if msg.Subject != "$JS.EVENT.ADVISORY.ACC.JS.STREAM.CREATED.ORDERS" {
t.Fatalf("Unexpected subject: %q", msg.Subject)
}

// when using stream instead of service, we get all events
msg, err = subAgg.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if msg.Subject != "$JS.EVENT.ADVISORY.ACC.JS.API" {
t.Fatalf("Unexpected subject: %q", msg.Subject)
}
}

// This is for importing all of JetStream into another account for admin purposes.
func TestJetStreamAccountImportAll(t *testing.T) {
conf := createConfFile(t, []byte(`
Expand Down

0 comments on commit 0c8552c

Please sign in to comment.