Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQTT: Cleanup code regarding retain flag and add test #4443

Merged
merged 1 commit into from Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 5 additions & 6 deletions server/mqtt.go
Expand Up @@ -2341,6 +2341,7 @@ func (as *mqttAccountSessionManager) serializeRetainedMsgsForSub(sess *mqttSessi

// Need to use the subject for the retained message, not the `sub` subject.
// We can find the published retained message in rm.sub.subject.
// Set the RETAIN flag: [MQTT-3.3.1-8].
flags := mqttSerializePublishMsg(prm, pi, qos, false, true, []byte(rm.Topic), rm.Msg)
if trace {
pp := mqttPublish{
Expand Down Expand Up @@ -4153,7 +4154,6 @@ func mqttDeliverMsgCbQoS0(sub *subscription, pc *client, _ *Account, subject, re
return
}

var retained bool
var topic []byte
if pc.isMqtt() {
// This is an MQTT publisher directly connected to this server.
Expand All @@ -4169,7 +4169,6 @@ func mqttDeliverMsgCbQoS0(sub *subscription, pc *client, _ *Account, subject, re
if len(pc.pa.mapped) > 0 && len(pc.pa.psi) > 0 {
topic = natsSubjectToMQTTTopic(subject)
}
retained = mqttIsRetained(pc.mqtt.pp.flags)

} else {
// Non MQTT client, could be NATS publisher, or ROUTER, etc..
Expand All @@ -4188,7 +4187,7 @@ func mqttDeliverMsgCbQoS0(sub *subscription, pc *client, _ *Account, subject, re
}

// Message never has a packet identifier nor is marked as duplicate.
pc.mqttEnqueuePublishMsgTo(cc, sub, 0, 0, false, retained, topic, msg)
pc.mqttEnqueuePublishMsgTo(cc, sub, 0, 0, false, topic, msg)
}

// This is the callback attached to a JS durable subscription for a MQTT QoS 1+
Expand Down Expand Up @@ -4264,7 +4263,7 @@ func mqttDeliverMsgCbQoS12(sub *subscription, pc *client, _ *Account, subject, r
}

originalTopic := natsSubjectToMQTTTopic(strippedSubj)
pc.mqttEnqueuePublishMsgTo(cc, sub, pi, qos, dup, false, originalTopic, msg)
pc.mqttEnqueuePublishMsgTo(cc, sub, pi, qos, dup, originalTopic, msg)
}

func mqttDeliverPubRelCb(sub *subscription, pc *client, _ *Account, subject, reply string, rmsg []byte) {
Expand Down Expand Up @@ -4323,11 +4322,11 @@ func isMQTTReservedSubscription(subject string) bool {

// Common function to mqtt delivery callbacks to serialize and send the message
// to the `cc` client.
func (c *client) mqttEnqueuePublishMsgTo(cc *client, sub *subscription, pi uint16, qos byte, dup, retained bool, topic, msg []byte) {
func (c *client) mqttEnqueuePublishMsgTo(cc *client, sub *subscription, pi uint16, qos byte, dup bool, topic, msg []byte) {
sw := mqttWriter{}
w := &sw

flags := mqttSerializePublishMsg(w, pi, qos, dup, retained, topic, msg)
flags := mqttSerializePublishMsg(w, pi, qos, dup, false, topic, msg)

cc.mu.Lock()
if sub.mqtt.prm != nil {
Expand Down
70 changes: 66 additions & 4 deletions server/mqtt_test.go
Expand Up @@ -3848,6 +3848,11 @@ func TestMQTTWillRetain(t *testing.T) {
s := testMQTTRunServer(t, o)
defer testMQTTShutdownServer(s)

mces, res := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
defer mces.Close()
testMQTTCheckConnAck(t, res, mqttConnAckRCConnectionAccepted, false)
testMQTTSub(t, 1, mces, res, []*mqttFilter{{filter: "will/#", qos: test.subQoS}}, []byte{test.subQoS})

willTopic := []byte("will/topic")
willMsg := []byte("bye")

Expand All @@ -3869,7 +3874,7 @@ func TestMQTTWillRetain(t *testing.T) {

// Wait for the server to process the connection close, which will
// cause the "will" message to be published (and retained).
checkClientsCount(t, s, 0)
checkClientsCount(t, s, 1)

// Create subscription on will topic and expect will message.
mcs, rs := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
Expand All @@ -3878,7 +3883,7 @@ func TestMQTTWillRetain(t *testing.T) {

testMQTTSub(t, 1, mcs, rs, []*mqttFilter{{filter: "will/#", qos: test.subQoS}}, []byte{test.subQoS})
pflags, _ := testMQTTGetPubMsg(t, mcs, rs, "will/topic", willMsg)
if pflags&mqttPubFlagRetain == 0 {
if !mqttIsRetained(pflags) {
t.Fatalf("expected retain flag to be set, it was not: %v", pflags)
}
// Expected QoS will be the lesser of the pub/sub QoS.
Expand All @@ -3889,6 +3894,17 @@ func TestMQTTWillRetain(t *testing.T) {
if qos := mqttGetQoS(pflags); qos != expectedQoS {
t.Fatalf("expected qos to be %v, got %v", expectedQoS, qos)
}

// The existing subscription (prior to sending the will) should receive
// the will but the retain flag should not be set.
pflags, _ = testMQTTGetPubMsg(t, mces, res, "will/topic", willMsg)
if mqttIsRetained(pflags) {
t.Fatalf("expected retain flag to not be set, it was: %v", pflags)
}
// Expected QoS will be the lesser of the pub/sub QoS.
if qos := mqttGetQoS(pflags); qos != expectedQoS {
t.Fatalf("expected qos to be %v, got %v", expectedQoS, qos)
}
})
}
}
Expand Down Expand Up @@ -3949,7 +3965,7 @@ func TestMQTTWillRetainPermViolation(t *testing.T) {

testMQTTSub(t, 1, mcs, rs, []*mqttFilter{{filter: "foo", qos: 1}}, []byte{1})
pflags, _ := testMQTTGetPubMsg(t, mcs, rs, "foo", []byte("bye"))
if pflags&mqttPubFlagRetain == 0 {
if !mqttIsRetained(pflags) {
t.Fatalf("expected retain flag to be set, it was not: %v", pflags)
}
if qos := mqttGetQoS(pflags); qos != 1 {
Expand Down Expand Up @@ -4032,7 +4048,7 @@ func TestMQTTPublishRetain(t *testing.T) {

if test.subGetsIt {
pflags, _ := testMQTTGetPubMsg(t, mc2, rs2, "foo", []byte(test.expectedValue))
if pflags&mqttPubFlagRetain == 0 {
if !mqttIsRetained(pflags) {
t.Fatalf("retain flag should have been set, it was not: flags=%v", pflags)
}
} else {
Expand All @@ -4045,6 +4061,52 @@ func TestMQTTPublishRetain(t *testing.T) {
}
}

func TestMQTTRetainFlag(t *testing.T) {
o := testMQTTDefaultOptions()
s := testMQTTRunServer(t, o)
defer testMQTTShutdownServer(s)

mc1, rs1 := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
defer mc1.Close()
testMQTTCheckConnAck(t, rs1, mqttConnAckRCConnectionAccepted, false)
testMQTTPublish(t, mc1, rs1, 0, false, true, "foo/0", 0, []byte("flag set"))
testMQTTPublish(t, mc1, rs1, 0, false, true, "foo/1", 0, []byte("flag set"))
testMQTTFlush(t, mc1, nil, rs1)

mc2, rs2 := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
defer mc2.Close()
testMQTTCheckConnAck(t, rs2, mqttConnAckRCConnectionAccepted, false)

testMQTTSub(t, 1, mc2, rs2, []*mqttFilter{{filter: "foo/0", qos: 0}}, []byte{0})
pflags, _ := testMQTTGetPubMsg(t, mc2, rs2, "foo/0", []byte("flag set"))
if !mqttIsRetained(pflags) {
t.Fatalf("retain flag should have been set, it was not: flags=%v", pflags)
}

testMQTTSub(t, 1, mc2, rs2, []*mqttFilter{{filter: "foo/1", qos: 1}}, []byte{1})
pflags, _ = testMQTTGetPubMsg(t, mc2, rs2, "foo/1", []byte("flag set"))
if !mqttIsRetained(pflags) {
t.Fatalf("retain flag should have been set, it was not: flags=%v", pflags)
}

// For existing subscriptions, RETAIN flag should not be set: [MQTT-3.3.1-9].
testMQTTPublish(t, mc1, rs1, 0, false, true, "foo/0", 0, []byte("flag not set"))
testMQTTFlush(t, mc1, nil, rs1)

pflags, _ = testMQTTGetPubMsg(t, mc2, rs2, "foo/0", []byte("flag not set"))
if mqttIsRetained(pflags) {
t.Fatalf("retain flag should not have been set, it was: flags=%v", pflags)
}

testMQTTPublish(t, mc1, rs1, 0, false, true, "foo/1", 0, []byte("flag not set"))
testMQTTFlush(t, mc1, nil, rs1)

pflags, _ = testMQTTGetPubMsg(t, mc2, rs2, "foo/1", []byte("flag not set"))
if mqttIsRetained(pflags) {
t.Fatalf("retain flag should not have been set, it was: flags=%v", pflags)
}
}

func TestMQTTPublishRetainPermViolation(t *testing.T) {
o := testMQTTDefaultOptions()
o.Users = []*User{
Expand Down