Skip to content

Commit

Permalink
Update TestJetStreamServerReencryption to also test converting ciph…
Browse files Browse the repository at this point in the history
…ers at the same time as changing keys
  • Loading branch information
neilalexander committed Jul 14, 2023
1 parent 6f1f430 commit 46321fd
Showing 1 changed file with 108 additions and 84 deletions.
192 changes: 108 additions & 84 deletions server/jetstream_test.go
Expand Up @@ -21124,104 +21124,128 @@ func TestJetStreamMaxBytesIgnored(t *testing.T) {

func TestJetStreamServerReencryption(t *testing.T) {
storeDir := t.TempDir()
tmpl1, tmpl2, tmpl3 := `
server_name: S22
listen: 127.0.0.1:-1
jetstream: {
key: %q,
cipher: aes,
store_dir: %q
}
`, `
server_name: S22
listen: 127.0.0.1:-1
jetstream: {
key: %q,
cipher: aes,
old_key: %q,
store_dir: %q
}
`, `
server_name: S22
listen: 127.0.0.1:-1
jetstream: {
key: %q,
cipher: aes,
store_dir: %q
}
`

conf1 := createConfFile(t, []byte(fmt.Sprintf(tmpl1, "firstencryptionkey", storeDir)))
conf2 := createConfFile(t, []byte(fmt.Sprintf(tmpl2, "secondencryptionkey", "firstencryptionkey", storeDir)))
conf3 := createConfFile(t, []byte(fmt.Sprintf(tmpl3, "secondencryptionkey", storeDir)))
expected := 30
for i, algo := range []struct {
from string
to string
}{
{"aes", "aes"},
{"aes", "chacha"},
{"chacha", "chacha"},
{"chacha", "aes"},
} {
t.Run(fmt.Sprintf("%s_to_%s", algo.from, algo.to), func(t *testing.T) {
streamName := fmt.Sprintf("TEST_%d", i)
subjectName := fmt.Sprintf("foo_%d", i)
expected := 30

checkStream := func(js nats.JetStreamContext) {
si, err := js.StreamInfo(streamName)
if err != nil {
t.Fatal(err)
}

checkStream := func(js nats.JetStreamContext) {
si, err := js.StreamInfo("TEST")
if err != nil {
t.Fatal(err)
}
if si.State.Msgs != uint64(expected) {
t.Fatalf("Should be %d messages but got %d messages", expected, si.State.Msgs)
}

if si.State.Msgs != uint64(expected) {
t.Fatalf("Should be %d messages but got %d messages", expected, si.State.Msgs)
}
sub, err := js.PullSubscribe(subjectName, "")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

sub, err := js.PullSubscribe("foo", "")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
c := 0
for _, m := range fetchMsgs(t, sub, expected, 5*time.Second) {
m.AckSync()
c++
}
if c != expected {
t.Fatalf("Should have read back %d messages but got %d messages", expected, c)
}
}

c := 0
for _, m := range fetchMsgs(t, sub, expected, 5*time.Second) {
m.AckSync()
c++
}
if c != expected {
t.Fatalf("Should have read back %d messages but got %d messages", expected, c)
}
}
// First off, we start up using the original encryption key and algorithm.
// We'll create a stream and populate it with some messages.
t.Run("setup", func(t *testing.T) {
conf := createConfFile(t, []byte(fmt.Sprintf(`
server_name: S22
listen: 127.0.0.1:-1
jetstream: {
key: %q,
cipher: %s,
store_dir: %q
}
`, "firstencryptionkey", algo.from, storeDir)))

func() {
s, _ := RunServerWithConfig(conf1)
defer s.Shutdown()
s, _ := RunServerWithConfig(conf)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()
nc, js := jsClientConnect(t, s)
defer nc.Close()

cfg := &nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
}
if _, err := js.AddStream(cfg); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
cfg := &nats.StreamConfig{
Name: streamName,
Subjects: []string{subjectName},
}
if _, err := js.AddStream(cfg); err != nil {
t.Fatalf("Unexpected error: %v", err)
}

for i := 0; i < expected; i++ {
if _, err := js.Publish("foo", []byte("ENCRYPTED PAYLOAD!!")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
for i := 0; i < expected; i++ {
if _, err := js.Publish(subjectName, []byte("ENCRYPTED PAYLOAD!!")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}

checkStream(js)
}()
checkStream(js)
})

func() {
s, _ := RunServerWithConfig(conf2)
defer s.Shutdown()
// Next up, we will restart the server, this time with both the new key
// and algorithm and also the old key. At startup, the server will detect
// the change in encryption key and/or algorithm and re-encrypt the stream.
t.Run("reencrypt", func(t *testing.T) {
conf := createConfFile(t, []byte(fmt.Sprintf(`
server_name: S22
listen: 127.0.0.1:-1
jetstream: {
key: %q,
cipher: %s,
old_key: %q,
store_dir: %q
}
`, "secondencryptionkey", algo.to, "firstencryptionkey", storeDir)))

nc, js := jsClientConnect(t, s)
defer nc.Close()
s, _ := RunServerWithConfig(conf)
defer s.Shutdown()

checkStream(js)
}()
nc, js := jsClientConnect(t, s)
defer nc.Close()

func() {
s, _ := RunServerWithConfig(conf3)
defer s.Shutdown()
checkStream(js)
})

nc, js := jsClientConnect(t, s)
defer nc.Close()
// Finally, we'll restart the server using only the new key and algorithm.
// At this point everything should have been re-encrypted, so we should still
// be able to access the stream.
t.Run("restart", func(t *testing.T) {
conf := createConfFile(t, []byte(fmt.Sprintf(`
server_name: S22
listen: 127.0.0.1:-1
jetstream: {
key: %q,
cipher: %s,
store_dir: %q
}
`, "secondencryptionkey", algo.to, storeDir)))

checkStream(js)
}()
s, _ := RunServerWithConfig(conf)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

checkStream(js)
})
})
}
}

0 comments on commit 46321fd

Please sign in to comment.