Skip to content

Commit

Permalink
Update TestJetStreamServerReencryption to also test converting ciph…
Browse files Browse the repository at this point in the history
…ers at the same time as changing keys

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Jul 26, 2023
1 parent 3df08c3 commit bc78e86
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 87 deletions.
4 changes: 2 additions & 2 deletions server/filestore.go
Expand Up @@ -994,9 +994,9 @@ func (mb *msgBlock) convertCipher() error {
// Reset the cache since we just read everything in.
mb.cache = nil

// Generate new keys based on our
// Generate new keys. If we error for some reason then we will put
// the old keyfile back.
if err := fs.genEncryptionKeysForBlock(mb); err != nil {
// Put the old keyfile back.
keyFile := filepath.Join(mdir, fmt.Sprintf(keyScan, mb.index))
os.WriteFile(keyFile, ekey, defaultFilePerms)
return err
Expand Down
192 changes: 108 additions & 84 deletions server/jetstream_test.go
Expand Up @@ -21211,104 +21211,128 @@ func TestJetStreamLastSequenceBySubjectConcurrent(t *testing.T) {

func TestJetStreamServerReencryption(t *testing.T) {
storeDir := t.TempDir()
tmpl1, tmpl2, tmpl3 := `
server_name: S22
listen: 127.0.0.1:-1
jetstream: {
key: %q,
cipher: aes,
store_dir: %q
}
`, `
server_name: S22
listen: 127.0.0.1:-1
jetstream: {
key: %q,
cipher: aes,
old_key: %q,
store_dir: %q
}
`, `
server_name: S22
listen: 127.0.0.1:-1
jetstream: {
key: %q,
cipher: aes,
store_dir: %q
}
`

conf1 := createConfFile(t, []byte(fmt.Sprintf(tmpl1, "firstencryptionkey", storeDir)))
conf2 := createConfFile(t, []byte(fmt.Sprintf(tmpl2, "secondencryptionkey", "firstencryptionkey", storeDir)))
conf3 := createConfFile(t, []byte(fmt.Sprintf(tmpl3, "secondencryptionkey", storeDir)))
expected := 30
for i, algo := range []struct {
from string
to string
}{
{"aes", "aes"},
{"aes", "chacha"},
{"chacha", "chacha"},
{"chacha", "aes"},
} {
t.Run(fmt.Sprintf("%s_to_%s", algo.from, algo.to), func(t *testing.T) {
streamName := fmt.Sprintf("TEST_%d", i)
subjectName := fmt.Sprintf("foo_%d", i)
expected := 30

checkStream := func(js nats.JetStreamContext) {
si, err := js.StreamInfo(streamName)
if err != nil {
t.Fatal(err)
}

checkStream := func(js nats.JetStreamContext) {
si, err := js.StreamInfo("TEST")
if err != nil {
t.Fatal(err)
}
if si.State.Msgs != uint64(expected) {
t.Fatalf("Should be %d messages but got %d messages", expected, si.State.Msgs)
}

if si.State.Msgs != uint64(expected) {
t.Fatalf("Should be %d messages but got %d messages", expected, si.State.Msgs)
}
sub, err := js.PullSubscribe(subjectName, "")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

sub, err := js.PullSubscribe("foo", "")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
c := 0
for _, m := range fetchMsgs(t, sub, expected, 5*time.Second) {
m.AckSync()
c++
}
if c != expected {
t.Fatalf("Should have read back %d messages but got %d messages", expected, c)
}
}

c := 0
for _, m := range fetchMsgs(t, sub, expected, 5*time.Second) {
m.AckSync()
c++
}
if c != expected {
t.Fatalf("Should have read back %d messages but got %d messages", expected, c)
}
}
// First off, we start up using the original encryption key and algorithm.
// We'll create a stream and populate it with some messages.
t.Run("setup", func(t *testing.T) {
conf := createConfFile(t, []byte(fmt.Sprintf(`
server_name: S22
listen: 127.0.0.1:-1
jetstream: {
key: %q,
cipher: %s,
store_dir: %q
}
`, "firstencryptionkey", algo.from, storeDir)))

func() {
s, _ := RunServerWithConfig(conf1)
defer s.Shutdown()
s, _ := RunServerWithConfig(conf)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()
nc, js := jsClientConnect(t, s)
defer nc.Close()

cfg := &nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
}
if _, err := js.AddStream(cfg); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
cfg := &nats.StreamConfig{
Name: streamName,
Subjects: []string{subjectName},
}
if _, err := js.AddStream(cfg); err != nil {
t.Fatalf("Unexpected error: %v", err)
}

for i := 0; i < expected; i++ {
if _, err := js.Publish("foo", []byte("ENCRYPTED PAYLOAD!!")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
for i := 0; i < expected; i++ {
if _, err := js.Publish(subjectName, []byte("ENCRYPTED PAYLOAD!!")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}

checkStream(js)
}()
checkStream(js)
})

func() {
s, _ := RunServerWithConfig(conf2)
defer s.Shutdown()
// Next up, we will restart the server, this time with both the new key
// and algorithm and also the old key. At startup, the server will detect
// the change in encryption key and/or algorithm and re-encrypt the stream.
t.Run("reencrypt", func(t *testing.T) {
conf := createConfFile(t, []byte(fmt.Sprintf(`
server_name: S22
listen: 127.0.0.1:-1
jetstream: {
key: %q,
cipher: %s,
prev_key: %q,
store_dir: %q
}
`, "secondencryptionkey", algo.to, "firstencryptionkey", storeDir)))

nc, js := jsClientConnect(t, s)
defer nc.Close()
s, _ := RunServerWithConfig(conf)
defer s.Shutdown()

checkStream(js)
}()
nc, js := jsClientConnect(t, s)
defer nc.Close()

func() {
s, _ := RunServerWithConfig(conf3)
defer s.Shutdown()
checkStream(js)
})

nc, js := jsClientConnect(t, s)
defer nc.Close()
// Finally, we'll restart the server using only the new key and algorithm.
// At this point everything should have been re-encrypted, so we should still
// be able to access the stream.
t.Run("restart", func(t *testing.T) {
conf := createConfFile(t, []byte(fmt.Sprintf(`
server_name: S22
listen: 127.0.0.1:-1
jetstream: {
key: %q,
cipher: %s,
store_dir: %q
}
`, "secondencryptionkey", algo.to, storeDir)))

checkStream(js)
}()
s, _ := RunServerWithConfig(conf)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

checkStream(js)
})
})
}
}
2 changes: 1 addition & 1 deletion server/opts.go
Expand Up @@ -2078,7 +2078,7 @@ func parseJetStream(v interface{}, opts *Options, errors *[]error, warnings *[]e
doEnable = mv.(bool)
case "key", "ek", "encryption_key":
opts.JetStreamKey = mv.(string)
case "old_key", "old_ek", "old_encryption_key":
case "prev_key", "prev_ek", "prev_encryption_key":
opts.JetStreamOldKey = mv.(string)
case "cipher":
switch strings.ToLower(mv.(string)) {
Expand Down

0 comments on commit bc78e86

Please sign in to comment.