Skip to content

Commit

Permalink
MQTT: Cleanup code regarding retain flag and add test (#4443)
Browse files Browse the repository at this point in the history
As per specification MQTT-3.3.1-8, we are now setting the RETAIN flag
when delivering to new subscriptions and clear the flag in all other
conditions.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
derekcollison committed Aug 29, 2023
2 parents d61466c + 8bd68b5 commit a64f7a0
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 10 deletions.
11 changes: 5 additions & 6 deletions server/mqtt.go
Expand Up @@ -2341,6 +2341,7 @@ func (as *mqttAccountSessionManager) serializeRetainedMsgsForSub(sess *mqttSessi

// Need to use the subject for the retained message, not the `sub` subject.
// We can find the published retained message in rm.sub.subject.
// Set the RETAIN flag: [MQTT-3.3.1-8].
flags := mqttSerializePublishMsg(prm, pi, qos, false, true, []byte(rm.Topic), rm.Msg)
if trace {
pp := mqttPublish{
Expand Down Expand Up @@ -4153,7 +4154,6 @@ func mqttDeliverMsgCbQoS0(sub *subscription, pc *client, _ *Account, subject, re
return
}

var retained bool
var topic []byte
if pc.isMqtt() {
// This is an MQTT publisher directly connected to this server.
Expand All @@ -4169,7 +4169,6 @@ func mqttDeliverMsgCbQoS0(sub *subscription, pc *client, _ *Account, subject, re
if len(pc.pa.mapped) > 0 && len(pc.pa.psi) > 0 {
topic = natsSubjectToMQTTTopic(subject)
}
retained = mqttIsRetained(pc.mqtt.pp.flags)

} else {
// Non MQTT client, could be NATS publisher, or ROUTER, etc..
Expand All @@ -4188,7 +4187,7 @@ func mqttDeliverMsgCbQoS0(sub *subscription, pc *client, _ *Account, subject, re
}

// Message never has a packet identifier nor is marked as duplicate.
pc.mqttEnqueuePublishMsgTo(cc, sub, 0, 0, false, retained, topic, msg)
pc.mqttEnqueuePublishMsgTo(cc, sub, 0, 0, false, topic, msg)
}

// This is the callback attached to a JS durable subscription for a MQTT QoS 1+
Expand Down Expand Up @@ -4264,7 +4263,7 @@ func mqttDeliverMsgCbQoS12(sub *subscription, pc *client, _ *Account, subject, r
}

originalTopic := natsSubjectToMQTTTopic(strippedSubj)
pc.mqttEnqueuePublishMsgTo(cc, sub, pi, qos, dup, false, originalTopic, msg)
pc.mqttEnqueuePublishMsgTo(cc, sub, pi, qos, dup, originalTopic, msg)
}

func mqttDeliverPubRelCb(sub *subscription, pc *client, _ *Account, subject, reply string, rmsg []byte) {
Expand Down Expand Up @@ -4323,11 +4322,11 @@ func isMQTTReservedSubscription(subject string) bool {

// Common function to mqtt delivery callbacks to serialize and send the message
// to the `cc` client.
func (c *client) mqttEnqueuePublishMsgTo(cc *client, sub *subscription, pi uint16, qos byte, dup, retained bool, topic, msg []byte) {
func (c *client) mqttEnqueuePublishMsgTo(cc *client, sub *subscription, pi uint16, qos byte, dup bool, topic, msg []byte) {
sw := mqttWriter{}
w := &sw

flags := mqttSerializePublishMsg(w, pi, qos, dup, retained, topic, msg)
flags := mqttSerializePublishMsg(w, pi, qos, dup, false, topic, msg)

cc.mu.Lock()
if sub.mqtt.prm != nil {
Expand Down
70 changes: 66 additions & 4 deletions server/mqtt_test.go
Expand Up @@ -3848,6 +3848,11 @@ func TestMQTTWillRetain(t *testing.T) {
s := testMQTTRunServer(t, o)
defer testMQTTShutdownServer(s)

mces, res := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
defer mces.Close()
testMQTTCheckConnAck(t, res, mqttConnAckRCConnectionAccepted, false)
testMQTTSub(t, 1, mces, res, []*mqttFilter{{filter: "will/#", qos: test.subQoS}}, []byte{test.subQoS})

willTopic := []byte("will/topic")
willMsg := []byte("bye")

Expand All @@ -3869,7 +3874,7 @@ func TestMQTTWillRetain(t *testing.T) {

// Wait for the server to process the connection close, which will
// cause the "will" message to be published (and retained).
checkClientsCount(t, s, 0)
checkClientsCount(t, s, 1)

// Create subscription on will topic and expect will message.
mcs, rs := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
Expand All @@ -3878,7 +3883,7 @@ func TestMQTTWillRetain(t *testing.T) {

testMQTTSub(t, 1, mcs, rs, []*mqttFilter{{filter: "will/#", qos: test.subQoS}}, []byte{test.subQoS})
pflags, _ := testMQTTGetPubMsg(t, mcs, rs, "will/topic", willMsg)
if pflags&mqttPubFlagRetain == 0 {
if !mqttIsRetained(pflags) {
t.Fatalf("expected retain flag to be set, it was not: %v", pflags)
}
// Expected QoS will be the lesser of the pub/sub QoS.
Expand All @@ -3889,6 +3894,17 @@ func TestMQTTWillRetain(t *testing.T) {
if qos := mqttGetQoS(pflags); qos != expectedQoS {
t.Fatalf("expected qos to be %v, got %v", expectedQoS, qos)
}

// The existing subscription (prior to sending the will) should receive
// the will but the retain flag should not be set.
pflags, _ = testMQTTGetPubMsg(t, mces, res, "will/topic", willMsg)
if mqttIsRetained(pflags) {
t.Fatalf("expected retain flag to not be set, it was: %v", pflags)
}
// Expected QoS will be the lesser of the pub/sub QoS.
if qos := mqttGetQoS(pflags); qos != expectedQoS {
t.Fatalf("expected qos to be %v, got %v", expectedQoS, qos)
}
})
}
}
Expand Down Expand Up @@ -3949,7 +3965,7 @@ func TestMQTTWillRetainPermViolation(t *testing.T) {

testMQTTSub(t, 1, mcs, rs, []*mqttFilter{{filter: "foo", qos: 1}}, []byte{1})
pflags, _ := testMQTTGetPubMsg(t, mcs, rs, "foo", []byte("bye"))
if pflags&mqttPubFlagRetain == 0 {
if !mqttIsRetained(pflags) {
t.Fatalf("expected retain flag to be set, it was not: %v", pflags)
}
if qos := mqttGetQoS(pflags); qos != 1 {
Expand Down Expand Up @@ -4032,7 +4048,7 @@ func TestMQTTPublishRetain(t *testing.T) {

if test.subGetsIt {
pflags, _ := testMQTTGetPubMsg(t, mc2, rs2, "foo", []byte(test.expectedValue))
if pflags&mqttPubFlagRetain == 0 {
if !mqttIsRetained(pflags) {
t.Fatalf("retain flag should have been set, it was not: flags=%v", pflags)
}
} else {
Expand All @@ -4045,6 +4061,52 @@ func TestMQTTPublishRetain(t *testing.T) {
}
}

func TestMQTTRetainFlag(t *testing.T) {
o := testMQTTDefaultOptions()
s := testMQTTRunServer(t, o)
defer testMQTTShutdownServer(s)

mc1, rs1 := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
defer mc1.Close()
testMQTTCheckConnAck(t, rs1, mqttConnAckRCConnectionAccepted, false)
testMQTTPublish(t, mc1, rs1, 0, false, true, "foo/0", 0, []byte("flag set"))
testMQTTPublish(t, mc1, rs1, 0, false, true, "foo/1", 0, []byte("flag set"))
testMQTTFlush(t, mc1, nil, rs1)

mc2, rs2 := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
defer mc2.Close()
testMQTTCheckConnAck(t, rs2, mqttConnAckRCConnectionAccepted, false)

testMQTTSub(t, 1, mc2, rs2, []*mqttFilter{{filter: "foo/0", qos: 0}}, []byte{0})
pflags, _ := testMQTTGetPubMsg(t, mc2, rs2, "foo/0", []byte("flag set"))
if !mqttIsRetained(pflags) {
t.Fatalf("retain flag should have been set, it was not: flags=%v", pflags)
}

testMQTTSub(t, 1, mc2, rs2, []*mqttFilter{{filter: "foo/1", qos: 1}}, []byte{1})
pflags, _ = testMQTTGetPubMsg(t, mc2, rs2, "foo/1", []byte("flag set"))
if !mqttIsRetained(pflags) {
t.Fatalf("retain flag should have been set, it was not: flags=%v", pflags)
}

// For existing subscriptions, RETAIN flag should not be set: [MQTT-3.3.1-9].
testMQTTPublish(t, mc1, rs1, 0, false, true, "foo/0", 0, []byte("flag not set"))
testMQTTFlush(t, mc1, nil, rs1)

pflags, _ = testMQTTGetPubMsg(t, mc2, rs2, "foo/0", []byte("flag not set"))
if mqttIsRetained(pflags) {
t.Fatalf("retain flag should not have been set, it was: flags=%v", pflags)
}

testMQTTPublish(t, mc1, rs1, 0, false, true, "foo/1", 0, []byte("flag not set"))
testMQTTFlush(t, mc1, nil, rs1)

pflags, _ = testMQTTGetPubMsg(t, mc2, rs2, "foo/1", []byte("flag not set"))
if mqttIsRetained(pflags) {
t.Fatalf("retain flag should not have been set, it was: flags=%v", pflags)
}
}

func TestMQTTPublishRetainPermViolation(t *testing.T) {
o := testMQTTDefaultOptions()
o.Users = []*User{
Expand Down

0 comments on commit a64f7a0

Please sign in to comment.