Skip to content

Commit

Permalink
Fixed delivery of retained messages after transfer.
Browse files Browse the repository at this point in the history
I was running a manual test moving from dev to this branch and
noticed that the consumer would receive only 1 message of the 10
messages sent as retained. So I modified the test to verify that
we receive them all and we did not.

The reason was that after the transfer we need to refresh the state
of the stream (stream info) since we attempt to load all messages
based on the state's sequences.

I have also modified a bit the code to update the MaxMsgsPer once
all messages have been transferred.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic authored and neilalexander committed Jun 1, 2023
1 parent 4f797a5 commit a744cb8
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 22 deletions.
61 changes: 41 additions & 20 deletions server/mqtt.go
Expand Up @@ -1170,17 +1170,40 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
if err != nil {
return nil, err
}
as.transferRetainedToPerKeySubjectStream(s)
}
} else {
wantedSubj := mqttRetainedMsgsStreamSubject + as.domainTk + ">"
if len(si.Config.Subjects) != 1 || si.Config.Subjects[0] != wantedSubj {
si.Config.Subjects = []string{wantedSubj}
if _, err := jsa.updateStream(&si.Config); err != nil {
}
// Doing this check outside of above if/else due to possible race when
// creating the stream.
wantedSubj := mqttRetainedMsgsStreamSubject + as.domainTk + ">"
if len(si.Config.Subjects) != 1 || si.Config.Subjects[0] != wantedSubj {
// Update only the Subjects at this stage, not MaxMsgsPer yet.
si.Config.Subjects = []string{wantedSubj}
if si, err = jsa.updateStream(&si.Config); err != nil {
return nil, fmt.Errorf("failed to update stream config: %w", err)
}
}
// Try to transfer regardless if we have already updated the stream or not
// in case not all messages were transferred and the server was restarted.
if as.transferRetainedToPerKeySubjectStream(s) {
// We need another lookup to have up-to-date si.State values in order
// to load all retained messages.
si, err = lookupStream(mqttRetainedMsgsStreamName, "retained messages")
if err != nil {
return nil, err
}
}
// Now, if the stream does not have MaxMsgsPer set to 1, and there are no
// more messages on the single $MQTT.rmsgs subject, update the stream again.
if si.Config.MaxMsgsPer != 1 {
_, err := jsa.loadNextMsgFor(mqttRetainedMsgsStreamName, "$MQTT.rmsgs")
// Looking for an error indicated that there is no such message.
if err != nil && IsNatsErr(err, JSNoMessageFoundErr) {
si.Config.MaxMsgsPer = 1
// We will need an up-to-date si, so don't use local variable here.
if si, err = jsa.updateStream(&si.Config); err != nil {
return nil, fmt.Errorf("failed to update stream config: %w", err)
}
}
as.transferRetainedToPerKeySubjectStream(s)
}

var lastSeq uint64
Expand Down Expand Up @@ -2305,21 +2328,10 @@ func (as *mqttAccountSessionManager) transferUniqueSessStreamsToMuxed(log *Serve
retry = false
}

func (as *mqttAccountSessionManager) transferRetainedToPerKeySubjectStream(log *Server) {
func (as *mqttAccountSessionManager) transferRetainedToPerKeySubjectStream(log *Server) bool {
jsa := &as.jsa
var count, errors int

// Set retry to true, will be set to false on success.
defer func() {
if errors > 0 {
next := mqttDefaultTransferRetry
log.Warnf("Failed to transfer %d MQTT retained messages, will try again in %v", errors, next)
time.AfterFunc(next, func() { as.transferRetainedToPerKeySubjectStream(log) })
} else if count > 0 {
log.Noticef("Transfer of %d MQTT retained messages done!", count)
}
}()

for {
// Try and look up messages on the original undivided "$MQTT.rmsgs" subject.
// If nothing is returned here, we assume to have migrated all old messages.
Expand All @@ -2331,7 +2343,7 @@ func (as *mqttAccountSessionManager) transferRetainedToPerKeySubjectStream(log *
}
log.Warnf(" Unable to load retained message with sequence %d: %s", smsg.Sequence, err)
errors++
return
break
}
// Unmarshal the message so that we can obtain the subject name.
var rmsg mqttRetainedMsg
Expand All @@ -2355,6 +2367,15 @@ func (as *mqttAccountSessionManager) transferRetainedToPerKeySubjectStream(log *
}
count++
}
if errors > 0 {
next := mqttDefaultTransferRetry
log.Warnf("Failed to transfer %d MQTT retained messages, will try again in %v", errors, next)
time.AfterFunc(next, func() { as.transferRetainedToPerKeySubjectStream(log) })
} else if count > 0 {
log.Noticef("Transfer of %d MQTT retained messages done!", count)
}
// Signal if there was any activity (either some transferred or some errors)
return errors > 0 || count > 0
}

//////////////////////////////////////////////////////////////////////////////
Expand Down
19 changes: 17 additions & 2 deletions server/mqtt_test.go
Expand Up @@ -1979,6 +1979,11 @@ func testMQTTCheckPubMsgNoAck(t testing.TB, c net.Conn, r *mqttReader, topic str
}

func testMQTTGetPubMsg(t testing.TB, c net.Conn, r *mqttReader, topic string, payload []byte) (byte, uint16) {
flags, pi, _ := testMQTTGetPubMsgEx(t, c, r, topic, payload)
return flags, pi
}

func testMQTTGetPubMsgEx(t testing.TB, c net.Conn, r *mqttReader, topic string, payload []byte) (byte, uint16, string) {
t.Helper()
b, pl := testMQTTReadPacket(t, r)
if pt := b & mqttPacketMask; pt != mqttPacketPub {
Expand All @@ -1991,7 +1996,7 @@ func testMQTTGetPubMsg(t testing.TB, c net.Conn, r *mqttReader, topic string, pa
if err != nil {
t.Fatal(err)
}
if ptopic != topic {
if topic != _EMPTY_ && ptopic != topic {
t.Fatalf("Expected topic %q, got %q", topic, ptopic)
}
var pi uint16
Expand All @@ -2011,7 +2016,7 @@ func testMQTTGetPubMsg(t testing.TB, c net.Conn, r *mqttReader, topic string, pa
t.Fatalf("Expected payload %q, got %q", payload, ppayload)
}
r.pos += msgLen
return pflags, pi
return pflags, pi, ptopic
}

func testMQTTSendPubAck(t testing.TB, c net.Conn, pi uint16) {
Expand Down Expand Up @@ -3038,6 +3043,16 @@ func TestMQTTRetainedMsgMigration(t *testing.T) {
defer mc.Close()
testMQTTCheckConnAck(t, rc, mqttConnAckRCConnectionAccepted, false)

testMQTTSub(t, 1, mc, rc, []*mqttFilter{{filter: "+", qos: 0}}, []byte{0})
topics := map[string]struct{}{}
for i := 0; i < 100; i++ {
_, _, topic := testMQTTGetPubMsgEx(t, mc, rc, _EMPTY_, []byte("bar"))
topics[topic] = struct{}{}
}
if len(topics) != 100 {
t.Fatalf("Unexpected topics: %v", topics)
}

// Now look at the stream, there should be 100 messages on the new
// divided subjects and none on the old undivided subject.
si, err = js.StreamInfo(mqttRetainedMsgsStreamName, &nats.StreamInfoRequest{
Expand Down

0 comments on commit a744cb8

Please sign in to comment.