Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHANGED] MQTT: Support for topics with . character. #4243

Merged
merged 2 commits into from Jun 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 22 additions & 9 deletions server/mqtt.go
Expand Up @@ -4115,7 +4115,8 @@ func mqttFilterToNATSSubject(filter []byte) ([]byte, error) {
// - '/' is replaced with './' if the last or next character in mt is '/'
// For instance, foo//bar would become foo./.bar
// - '/' is replaced with '.' for all other conditions (foo/bar -> foo.bar)
// - '.' and ' ' cause an error to be returned.
// - '.' is replaced with '//'.
// - ' ' cause an error to be returned.
//
// If there is no need to convert anything (say "foo" remains "foo"), then
// the no memory is allocated and the returned slice is the original `mt`.
Expand Down Expand Up @@ -4154,9 +4155,15 @@ func mqttToNATSSubjectConversion(mt []byte, wcOk bool) ([]byte, error) {
}
res = append(res, btsep)
}
case btsep, ' ':
// As of now, we cannot support '.' or ' ' in the MQTT topic/filter.
case ' ':
// As of now, we cannot support ' ' in the MQTT topic/filter.
return nil, errMQTTUnsupportedCharacters
case btsep:
if !cp {
makeCopy(i)
}
res = append(res, mqttTopicLevelSep, mqttTopicLevelSep)
j++
case mqttSingleLevelWC, mqttMultiLevelWC:
if !wcOk {
// Spec [MQTT-3.3.2-2] and [MQTT-4.7.1-1]
Expand Down Expand Up @@ -4195,16 +4202,22 @@ func natsSubjectToMQTTTopic(subject string) []byte {
for i := 0; i < len(subject); i++ {
switch subject[i] {
case mqttTopicLevelSep:
if !(i == 0 && i < end && subject[i+1] == btsep) {
topic[j] = mqttTopicLevelSep
j++
if i < end {
switch c := subject[i+1]; c {
case btsep, mqttTopicLevelSep:
if c == btsep {
topic[j] = mqttTopicLevelSep
} else {
topic[j] = btsep
}
j++
i++
default:
}
}
case btsep:
topic[j] = mqttTopicLevelSep
j++
if i < end && subject[i+1] == mqttTopicLevelSep {
i++
}
default:
topic[j] = subject[i]
j++
Expand Down
62 changes: 60 additions & 2 deletions server/mqtt_test.go
Expand Up @@ -1750,6 +1750,7 @@ func TestMQTTTopicAndSubjectConversion(t *testing.T) {
{"///foo/", "///foo/", "/././.foo./", ""},
{"///foo//", "///foo//", "/././.foo././", ""},
{"///foo///", "///foo///", "/././.foo./././", ""},
{"//.foo.//", "//.foo.//", "/././/foo//././", ""},
{"foo/bar", "foo/bar", "foo.bar", ""},
{"/foo/bar", "/foo/bar", "/.foo.bar", ""},
{"/foo/bar/", "/foo/bar/", "/.foo.bar./", ""},
Expand All @@ -1762,17 +1763,31 @@ func TestMQTTTopicAndSubjectConversion(t *testing.T) {
{"foo//bar", "foo//bar", "foo./.bar", ""},
{"foo///bar", "foo///bar", "foo././.bar", ""},
{"foo////bar", "foo////bar", "foo./././.bar", ""},
{".", ".", "//", ""},
{"..", "..", "////", ""},
{"...", "...", "//////", ""},
{"./", "./", "//./", ""},
{".//.", ".//.", "//././/", ""},
{"././.", "././.", "//.//.//", ""},
{"././/.", "././/.", "//.//././/", ""},
{".foo", ".foo", "//foo", ""},
{"foo.", "foo.", "foo//", ""},
{".foo.", ".foo.", "//foo//", ""},
{"foo../bar/", "foo../bar/", "foo////.bar./", ""},
{"foo../bar/.", "foo../bar/.", "foo////.bar.//", ""},
{"/foo/", "/foo/", "/.foo./", ""},
{"./foo/.", "./foo/.", "//.foo.//", ""},
{"foo.bar/baz", "foo.bar/baz", "foo//bar.baz", ""},
// These should produce errors
{"foo/+", "foo/+", "", "wildcards not allowed in publish"},
{"foo/#", "foo/#", "", "wildcards not allowed in publish"},
{"foo bar", "foo bar", "", "not supported"},
{"foo.bar", "foo.bar", "", "not supported"},
} {
t.Run(test.name, func(t *testing.T) {
res, err := mqttTopicToNATSPubSubject([]byte(test.mqttTopic))
if test.err != _EMPTY_ {
if err == nil || !strings.Contains(err.Error(), test.err) {
t.Fatalf("Expected error %q, got %q", test.err, err.Error())
t.Fatalf("Expected error %q, got %v", test.err, err)
}
return
}
Expand Down Expand Up @@ -1813,6 +1828,7 @@ func TestMQTTFilterConversion(t *testing.T) {
{"single level wildcard", "foo//+//", "foo./.*././"},
{"single level wildcard", "foo//+//bar", "foo./.*./.bar"},
{"single level wildcard", "foo///+///bar", "foo././.*././.bar"},
{"single level wildcard", "foo.bar///+///baz", "foo//bar././.*././.baz"},

{"multi level wildcard", "#", ">"},
{"multi level wildcard", "/#", "/.>"},
Expand All @@ -1821,6 +1837,7 @@ func TestMQTTFilterConversion(t *testing.T) {
{"multi level wildcard", "foo//#", "foo./.>"},
{"multi level wildcard", "foo///#", "foo././.>"},
{"multi level wildcard", "foo/bar/#", "foo.bar.>"},
{"multi level wildcard", "foo/bar.baz/#", "foo.bar//baz.>"},
} {
t.Run(test.name, func(t *testing.T) {
res, err := mqttFilterToNATSSubject([]byte(test.mqttTopic))
Expand Down Expand Up @@ -6454,6 +6471,47 @@ func TestMQTTSubjectWildcardStart(t *testing.T) {
require_True(t, si.State.Msgs == 0)
}

func TestMQTTTopicWithDot(t *testing.T) {
o := testMQTTDefaultOptions()
s := testMQTTRunServer(t, o)
defer testMQTTShutdownServer(s)

nc := natsConnect(t, s.ClientURL(), nats.UserInfo("mqtt", "pwd"))
defer nc.Close()

sub := natsSubSync(t, nc, "*.*")

c1, r1 := testMQTTConnect(t, &mqttConnInfo{user: "mqtt", pass: "pwd", clientID: "conn1", cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
defer c1.Close()
testMQTTCheckConnAck(t, r1, mqttConnAckRCConnectionAccepted, false)
testMQTTSub(t, 1, c1, r1, []*mqttFilter{{filter: "spBv1.0/plant1", qos: 0}}, []byte{0})
testMQTTSub(t, 1, c1, r1, []*mqttFilter{{filter: "spBv1.0/plant2", qos: 1}}, []byte{1})

c2, r2 := testMQTTConnect(t, &mqttConnInfo{user: "mqtt", pass: "pwd", clientID: "conn2", cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
defer c2.Close()
testMQTTCheckConnAck(t, r2, mqttConnAckRCConnectionAccepted, false)

testMQTTPublish(t, c2, r2, 0, false, false, "spBv1.0/plant1", 0, []byte("msg1"))
testMQTTCheckPubMsg(t, c1, r1, "spBv1.0/plant1", 0, []byte("msg1"))
msg := natsNexMsg(t, sub, time.Second)
require_Equal(t, msg.Subject, "spBv1//0.plant1")

testMQTTPublish(t, c2, r2, 1, false, false, "spBv1.0/plant2", 1, []byte("msg2"))
testMQTTCheckPubMsg(t, c1, r1, "spBv1.0/plant2", mqttPubQos1, []byte("msg2"))
msg = natsNexMsg(t, sub, time.Second)
require_Equal(t, msg.Subject, "spBv1//0.plant2")

natsPub(t, nc, "spBv1//0.plant1", []byte("msg3"))
testMQTTCheckPubMsg(t, c1, r1, "spBv1.0/plant1", 0, []byte("msg3"))
msg = natsNexMsg(t, sub, time.Second)
require_Equal(t, msg.Subject, "spBv1//0.plant1")

natsPub(t, nc, "spBv1//0.plant2", []byte("msg4"))
testMQTTCheckPubMsg(t, c1, r1, "spBv1.0/plant2", 0, []byte("msg4"))
msg = natsNexMsg(t, sub, time.Second)
require_Equal(t, msg.Subject, "spBv1//0.plant2")
}

//////////////////////////////////////////////////////////////////////////
//
// Benchmarks
Expand Down