Skip to content

Commit

Permalink
Update TestJetStreamServerReencryption to also test converting ciph…
Browse files Browse the repository at this point in the history
…ers at the same time as changing keys
  • Loading branch information
neilalexander committed Jul 13, 2023
1 parent d14dcc5 commit 04c6919
Showing 1 changed file with 108 additions and 84 deletions.
192 changes: 108 additions & 84 deletions server/jetstream_test.go
Expand Up @@ -20906,104 +20906,128 @@ func TestJetStreamMaxBytesIgnored(t *testing.T) {

func TestJetStreamServerReencryption(t *testing.T) {
storeDir := t.TempDir()
tmpl1, tmpl2, tmpl3 := `
server_name: S22
listen: 127.0.0.1:-1
jetstream: {
key: %q,
cipher: aes,
store_dir: %q
}
`, `
server_name: S22
listen: 127.0.0.1:-1
jetstream: {
key: %q,
cipher: aes,
old_key: %q,
store_dir: %q
}
`, `
server_name: S22
listen: 127.0.0.1:-1
jetstream: {
key: %q,
cipher: aes,
store_dir: %q
}
`

conf1 := createConfFile(t, []byte(fmt.Sprintf(tmpl1, "firstencryptionkey", storeDir)))
conf2 := createConfFile(t, []byte(fmt.Sprintf(tmpl2, "secondencryptionkey", "firstencryptionkey", storeDir)))
conf3 := createConfFile(t, []byte(fmt.Sprintf(tmpl3, "secondencryptionkey", storeDir)))
expected := 30
for i, algo := range []struct {
from string
to string
}{
{"aes", "aes"},
{"aes", "chacha"},
{"chacha", "chacha"},
{"chacha", "aes"},
} {
t.Run(fmt.Sprintf("%s_to_%s", algo.from, algo.to), func(t *testing.T) {
streamName := fmt.Sprintf("TEST_%d", i)
subjectName := fmt.Sprintf("foo_%d", i)
expected := 30

checkStream := func(js nats.JetStreamContext) {
si, err := js.StreamInfo(streamName)
if err != nil {
t.Fatal(err)
}

checkStream := func(js nats.JetStreamContext) {
si, err := js.StreamInfo("TEST")
if err != nil {
t.Fatal(err)
}
if si.State.Msgs != uint64(expected) {
t.Fatalf("Should be %d messages but got %d messages", expected, si.State.Msgs)
}

if si.State.Msgs != uint64(expected) {
t.Fatalf("Should be %d messages but got %d messages", expected, si.State.Msgs)
}
sub, err := js.PullSubscribe(subjectName, "")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

sub, err := js.PullSubscribe("foo", "")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
c := 0
for _, m := range fetchMsgs(t, sub, expected, 5*time.Second) {
m.AckSync()
c++
}
if c != expected {
t.Fatalf("Should have read back %d messages but got %d messages", expected, c)
}
}

c := 0
for _, m := range fetchMsgs(t, sub, expected, 5*time.Second) {
m.AckSync()
c++
}
if c != expected {
t.Fatalf("Should have read back %d messages but got %d messages", expected, c)
}
}
// First off, we start up using the original encryption key and algorithm.
// We'll create a stream and populate it with some messages.
t.Run("setup", func(t *testing.T) {
conf := createConfFile(t, []byte(fmt.Sprintf(`
server_name: S22
listen: 127.0.0.1:-1
jetstream: {
key: %q,
cipher: %s,
store_dir: %q
}
`, "firstencryptionkey", algo.from, storeDir)))

func() {
s, _ := RunServerWithConfig(conf1)
defer s.Shutdown()
s, _ := RunServerWithConfig(conf)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()
nc, js := jsClientConnect(t, s)
defer nc.Close()

cfg := &nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
}
if _, err := js.AddStream(cfg); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
cfg := &nats.StreamConfig{
Name: streamName,
Subjects: []string{subjectName},
}
if _, err := js.AddStream(cfg); err != nil {
t.Fatalf("Unexpected error: %v", err)
}

for i := 0; i < expected; i++ {
if _, err := js.Publish("foo", []byte("ENCRYPTED PAYLOAD!!")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
for i := 0; i < expected; i++ {
if _, err := js.Publish(subjectName, []byte("ENCRYPTED PAYLOAD!!")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}

checkStream(js)
}()
checkStream(js)
})

func() {
s, _ := RunServerWithConfig(conf2)
defer s.Shutdown()
// Next up, we will restart the server, this time with both the new key
// and algorithm and also the old key. At startup, the server will detect
// the change in encryption key and/or algorithm and re-encrypt the stream.
t.Run("reencrypt", func(t *testing.T) {
conf := createConfFile(t, []byte(fmt.Sprintf(`
server_name: S22
listen: 127.0.0.1:-1
jetstream: {
key: %q,
cipher: %s,
old_key: %q,
store_dir: %q
}
`, "secondencryptionkey", algo.to, "firstencryptionkey", storeDir)))

nc, js := jsClientConnect(t, s)
defer nc.Close()
s, _ := RunServerWithConfig(conf)
defer s.Shutdown()

checkStream(js)
}()
nc, js := jsClientConnect(t, s)
defer nc.Close()

func() {
s, _ := RunServerWithConfig(conf3)
defer s.Shutdown()
checkStream(js)
})

nc, js := jsClientConnect(t, s)
defer nc.Close()
// Finally, we'll restart the server using only the new key and algorithm.
// At this point everything should have been re-encrypted, so we should still
// be able to access the stream.
t.Run("restart", func(t *testing.T) {
conf := createConfFile(t, []byte(fmt.Sprintf(`
server_name: S22
listen: 127.0.0.1:-1
jetstream: {
key: %q,
cipher: %s,
store_dir: %q
}
`, "secondencryptionkey", algo.to, storeDir)))

checkStream(js)
}()
s, _ := RunServerWithConfig(conf)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

checkStream(js)
})
})
}
}

0 comments on commit 04c6919

Please sign in to comment.