Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Evaluate service imports when publishing advisories #4302

Merged
merged 3 commits into from Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 2 additions & 8 deletions server/client.go
Expand Up @@ -3127,20 +3127,14 @@ var needFlush = struct{}{}
// deliverMsg will deliver a message to a matching subscription and its underlying client.
// We process all connection/client types. mh is the part that will be protocol/client specific.
func (c *client) deliverMsg(prodIsMQTT bool, sub *subscription, acc *Account, subject, reply, mh, msg []byte, gwrply bool) bool {
// Check sub client and check echo
if sub.client == nil || c == sub.client && !sub.client.echo {
// Check sub client and check echo. Only do this if not an internal sub.
if sub.client == nil || (c == sub.client && !sub.client.echo && sub.icb == nil) {
return false
}

client := sub.client
client.mu.Lock()

// Check echo
if c == client && !client.echo {
client.mu.Unlock()
return false
}

// Check if we have a subscribe deny clause. This will trigger us to check the subject
// for a match against the denied subjects.
if client.mperms != nil && client.checkDenySub(string(subject)) {
Expand Down
217 changes: 217 additions & 0 deletions server/jetstream_test.go
Expand Up @@ -10723,6 +10723,223 @@ func TestJetStreamAccountImportBasics(t *testing.T) {
}
}

// This tests whether we are able to aggregate all JetStream advisory events
// from all accounts into a single account. Config for this test uses
// service imports and exports as that allows for gathering all events
// without having to know the account name and without separate entries
// for each account in aggregate account config.
// This test fails as it is not receiving the api audit event ($JS.EVENT.ADVISORY.API).
func TestJetStreamAccountImportJSAdvisoriesAsService(t *testing.T) {
conf := createConfFile(t, []byte(`
listen=127.0.0.1:-1
no_auth_user: pp
jetstream: {max_mem_store: 64GB, max_file_store: 10TB}
accounts {
JS {
jetstream: enabled
users: [ {user: pp, password: foo} ]
imports [
{ service: { account: AGG, subject: '$JS.EVENT.ADVISORY.ACC.JS.>' }, to: '$JS.EVENT.ADVISORY.>' }
]
}
AGG {
users: [ {user: agg, password: foo} ]
exports: [
{ service: '$JS.EVENT.ADVISORY.ACC.*.>', response: Singleton, account_token_position: 5 }
]
}
}
`))

s, _ := RunServerWithConfig(conf)
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()

// This should be the pp user, one which manages JetStream assets
ncJS, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error during connect: %v", err)
}
defer ncJS.Close()

// This is the agg user, which should aggregate all JS advisory events.
ncAgg, err := nats.Connect(s.ClientURL(), nats.UserInfo("agg", "foo"))
if err != nil {
t.Fatalf("Unexpected error during connect: %v", err)
}
defer ncAgg.Close()

js, err := ncJS.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// user from JS account should receive events on $JS.EVENT.ADVISORY.> subject
subJS, err := ncJS.SubscribeSync("$JS.EVENT.ADVISORY.>")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer subJS.Unsubscribe()

// user from AGG account should receive events on mapped $JS.EVENT.ADVISORY.ACC.JS.> subject (with account name)
subAgg, err := ncAgg.SubscribeSync("$JS.EVENT.ADVISORY.ACC.JS.>")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// add stream using JS account
// this should trigger 2 events:
// - an action event on $JS.EVENT.ADVISORY.STREAM.CREATED.ORDERS
// - an api audit event on $JS.EVENT.ADVISORY.API
_, err = js.AddStream(&nats.StreamConfig{Name: "ORDERS", Subjects: []string{"ORDERS.*"}})
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
msg, err := subJS.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if msg.Subject != "$JS.EVENT.ADVISORY.STREAM.CREATED.ORDERS" {
t.Fatalf("Unexpected subject: %q", msg.Subject)
}
msg, err = subJS.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if msg.Subject != "$JS.EVENT.ADVISORY.API" {
t.Fatalf("Unexpected subject: %q", msg.Subject)
}

// same set of events should be received by AGG account
// on subjects containing account name (ACC.JS)
msg, err = subAgg.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if msg.Subject != "$JS.EVENT.ADVISORY.ACC.JS.STREAM.CREATED.ORDERS" {
t.Fatalf("Unexpected subject: %q", msg.Subject)
}

// we get error here, since we do not get the api audit event
msg, err = subAgg.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if msg.Subject != "$JS.EVENT.ADVISORY.ACC.JS.API" {
t.Fatalf("Unexpected subject: %q", msg.Subject)
}
}

// This tests whether we are able to aggregate all JetStream advisory events
// from all accounts into a single account. Config for this test uses
// stream imports and exports as that allows for gathering all events
// as long as there is a separate stream import entry for each account
// in aggregate account config.
func TestJetStreamAccountImportJSAdvisoriesAsStream(t *testing.T) {
conf := createConfFile(t, []byte(`
listen=127.0.0.1:-1
no_auth_user: pp
jetstream: {max_mem_store: 64GB, max_file_store: 10TB}
accounts {
JS {
jetstream: enabled
users: [ {user: pp, password: foo} ]
exports [
{ stream: '$JS.EVENT.ADVISORY.>' }
]
}
AGG {
users: [ {user: agg, password: foo} ]
imports: [
{ stream: { account: JS, subject: '$JS.EVENT.ADVISORY.>' }, to: '$JS.EVENT.ADVISORY.ACC.JS.>' }
]
}
}
`))

s, _ := RunServerWithConfig(conf)
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()

// This should be the pp user, one which manages JetStream assets
ncJS, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error during connect: %v", err)
}
defer ncJS.Close()

// This is the agg user, which should aggregate all JS advisory events.
ncAgg, err := nats.Connect(s.ClientURL(), nats.UserInfo("agg", "foo"))
if err != nil {
t.Fatalf("Unexpected error during connect: %v", err)
}
defer ncAgg.Close()

js, err := ncJS.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// user from JS account should receive events on $JS.EVENT.ADVISORY.> subject
subJS, err := ncJS.SubscribeSync("$JS.EVENT.ADVISORY.>")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer subJS.Unsubscribe()

// user from AGG account should receive events on mapped $JS.EVENT.ADVISORY.ACC.JS.> subject (with account name)
subAgg, err := ncAgg.SubscribeSync("$JS.EVENT.ADVISORY.ACC.JS.>")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// add stream using JS account
// this should trigger 2 events:
// - an action event on $JS.EVENT.ADVISORY.STREAM.CREATED.ORDERS
// - an api audit event on $JS.EVENT.ADVISORY.API
_, err = js.AddStream(&nats.StreamConfig{Name: "ORDERS", Subjects: []string{"ORDERS.*"}})
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
msg, err := subJS.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if msg.Subject != "$JS.EVENT.ADVISORY.STREAM.CREATED.ORDERS" {
t.Fatalf("Unexpected subject: %q", msg.Subject)
}
msg, err = subJS.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if msg.Subject != "$JS.EVENT.ADVISORY.API" {
t.Fatalf("Unexpected subject: %q", msg.Subject)
}

// same set of events should be received by AGG account
// on subjects containing account name (ACC.JS)
msg, err = subAgg.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if msg.Subject != "$JS.EVENT.ADVISORY.ACC.JS.STREAM.CREATED.ORDERS" {
t.Fatalf("Unexpected subject: %q", msg.Subject)
}

// when using stream instead of service, we get all events
msg, err = subAgg.NextMsg(time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if msg.Subject != "$JS.EVENT.ADVISORY.ACC.JS.API" {
t.Fatalf("Unexpected subject: %q", msg.Subject)
}
}

// This is for importing all of JetStream into another account for admin purposes.
func TestJetStreamAccountImportAll(t *testing.T) {
conf := createConfFile(t, []byte(`
Expand Down