Skip to content

Commit

Permalink
Add test for MQTT retained message migration
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed May 31, 2023
1 parent d5139df commit cd23f51
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 0 deletions.
1 change: 1 addition & 0 deletions server/mqtt.go
Expand Up @@ -2330,6 +2330,7 @@ func (as *mqttAccountSessionManager) transferRetainedToPerKeySubjectStream(log *
break
}
log.Warnf(" Unable to load retained message with sequence %d: %s", smsg.Sequence, err)
errors++
return
}
// Unmarshal the message so that we can obtain the subject name.
Expand Down
72 changes: 72 additions & 0 deletions server/mqtt_test.go
Expand Up @@ -2993,6 +2993,78 @@ func TestMQTTRetainedMsgNetworkUpdates(t *testing.T) {
}
}

func TestMQTTRetainedMsgMigration(t *testing.T) {
o := testMQTTDefaultOptions()
s := testMQTTRunServer(t, o)
defer testMQTTShutdownServer(s)

nc, js := jsClientConnect(t, s)
defer nc.Close()

// Create the retained messages stream to listen on the old subject first.
// The server will correct this when the migration takes place.
_, err := js.AddStream(&nats.StreamConfig{
Name: mqttRetainedMsgsStreamName,
Subjects: []string{`$MQTT.rmsgs`},
Storage: nats.FileStorage,
Retention: nats.LimitsPolicy,
Replicas: 1,
})
require_NoError(t, err)

// Publish some retained messages on the old "$MQTT.rmsgs" subject.
for i := 0; i < 100; i++ {
msg := fmt.Sprintf(
`{"origin":"b5IQZNtG","subject":"test%d","topic":"test%d","msg":"YmFy","flags":1}`, i, i,
)
_, err := js.Publish(`$MQTT.rmsgs`, []byte(msg))
require_NoError(t, err)
}

// Check that the old subject looks right.
si, err := js.StreamInfo(mqttRetainedMsgsStreamName, &nats.StreamInfoRequest{
SubjectsFilter: `$MQTT.>`,
})
require_NoError(t, err)
if si.State.NumSubjects != 1 {
t.Fatalf("expected 1 subject, got %d", si.State.NumSubjects)
}
if n := si.State.Subjects[`$MQTT.rmsgs`]; n != 100 {
t.Fatalf("expected to find 100 messages on the original subject but found %d", n)
}

// Create an MQTT client, this will cause a migration to take place.
mc, rc := testMQTTConnect(t, &mqttConnInfo{clientID: "sub", cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
defer mc.Close()
testMQTTCheckConnAck(t, rc, mqttConnAckRCConnectionAccepted, false)

// Now look at the stream, there should be 100 messages on the new
// divided subjects and none on the old undivided subject.
si, err = js.StreamInfo(mqttRetainedMsgsStreamName, &nats.StreamInfoRequest{
SubjectsFilter: `$MQTT.>`,
})
require_NoError(t, err)
if si.State.NumSubjects != 100 {
t.Fatalf("expected 100 subjects, got %d", si.State.NumSubjects)
}
if n := si.State.Subjects[`$MQTT.rmsgs`]; n > 0 {
t.Fatalf("expected to find no messages on the original subject but found %d", n)
}

// Check that the message counts look right. There should be one
// retained message per key.
for i := 0; i < 100; i++ {
expected := fmt.Sprintf(`$MQTT.rmsgs.test%d`, i)
n, ok := si.State.Subjects[expected]
if !ok {
t.Fatalf("expected to find %q but didn't", expected)
}
if n != 1 {
t.Fatalf("expected %q to have 1 message but had %d", expected, n)
}
}
}

func TestMQTTClusterReplicasCount(t *testing.T) {
for _, test := range []struct {
size int
Expand Down

0 comments on commit cd23f51

Please sign in to comment.