Skip to content

Commit

Permalink
[CHANGED] MQTT: Support for topics with . character. (#4243)
Browse files Browse the repository at this point in the history
The `.` character will be transformed to `//` in NATS subject. For
instance an MQTT message published on `spBv1.0/plant1` would be received
by a NATS subscriber as `spBv1//0.plant1`.

Conversely, a NATS message published on `spBv1//0.plant1` would be
received by an MQTT subscriber as `spBv1.0/plant1`.

Resolves #1879
Resolves #3482

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
derekcollison committed Jun 14, 2023
2 parents 694cc7d + f2d009b commit 91d0b6a
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 11 deletions.
31 changes: 22 additions & 9 deletions server/mqtt.go
Expand Up @@ -4115,7 +4115,8 @@ func mqttFilterToNATSSubject(filter []byte) ([]byte, error) {
// - '/' is replaced with './' if the last or next character in mt is '/'
// For instance, foo//bar would become foo./.bar
// - '/' is replaced with '.' for all other conditions (foo/bar -> foo.bar)
// - '.' and ' ' cause an error to be returned.
// - '.' is replaced with '//'.
// - ' ' cause an error to be returned.
//
// If there is no need to convert anything (say "foo" remains "foo"), then
// the no memory is allocated and the returned slice is the original `mt`.
Expand Down Expand Up @@ -4154,9 +4155,15 @@ func mqttToNATSSubjectConversion(mt []byte, wcOk bool) ([]byte, error) {
}
res = append(res, btsep)
}
case btsep, ' ':
// As of now, we cannot support '.' or ' ' in the MQTT topic/filter.
case ' ':
// As of now, we cannot support ' ' in the MQTT topic/filter.
return nil, errMQTTUnsupportedCharacters
case btsep:
if !cp {
makeCopy(i)
}
res = append(res, mqttTopicLevelSep, mqttTopicLevelSep)
j++
case mqttSingleLevelWC, mqttMultiLevelWC:
if !wcOk {
// Spec [MQTT-3.3.2-2] and [MQTT-4.7.1-1]
Expand Down Expand Up @@ -4195,16 +4202,22 @@ func natsSubjectToMQTTTopic(subject string) []byte {
for i := 0; i < len(subject); i++ {
switch subject[i] {
case mqttTopicLevelSep:
if !(i == 0 && i < end && subject[i+1] == btsep) {
topic[j] = mqttTopicLevelSep
j++
if i < end {
switch c := subject[i+1]; c {
case btsep, mqttTopicLevelSep:
if c == btsep {
topic[j] = mqttTopicLevelSep
} else {
topic[j] = btsep
}
j++
i++
default:
}
}
case btsep:
topic[j] = mqttTopicLevelSep
j++
if i < end && subject[i+1] == mqttTopicLevelSep {
i++
}
default:
topic[j] = subject[i]
j++
Expand Down
62 changes: 60 additions & 2 deletions server/mqtt_test.go
Expand Up @@ -1750,6 +1750,7 @@ func TestMQTTTopicAndSubjectConversion(t *testing.T) {
{"///foo/", "///foo/", "/././.foo./", ""},
{"///foo//", "///foo//", "/././.foo././", ""},
{"///foo///", "///foo///", "/././.foo./././", ""},
{"//.foo.//", "//.foo.//", "/././/foo//././", ""},
{"foo/bar", "foo/bar", "foo.bar", ""},
{"/foo/bar", "/foo/bar", "/.foo.bar", ""},
{"/foo/bar/", "/foo/bar/", "/.foo.bar./", ""},
Expand All @@ -1762,17 +1763,31 @@ func TestMQTTTopicAndSubjectConversion(t *testing.T) {
{"foo//bar", "foo//bar", "foo./.bar", ""},
{"foo///bar", "foo///bar", "foo././.bar", ""},
{"foo////bar", "foo////bar", "foo./././.bar", ""},
{".", ".", "//", ""},
{"..", "..", "////", ""},
{"...", "...", "//////", ""},
{"./", "./", "//./", ""},
{".//.", ".//.", "//././/", ""},
{"././.", "././.", "//.//.//", ""},
{"././/.", "././/.", "//.//././/", ""},
{".foo", ".foo", "//foo", ""},
{"foo.", "foo.", "foo//", ""},
{".foo.", ".foo.", "//foo//", ""},
{"foo../bar/", "foo../bar/", "foo////.bar./", ""},
{"foo../bar/.", "foo../bar/.", "foo////.bar.//", ""},
{"/foo/", "/foo/", "/.foo./", ""},
{"./foo/.", "./foo/.", "//.foo.//", ""},
{"foo.bar/baz", "foo.bar/baz", "foo//bar.baz", ""},
// These should produce errors
{"foo/+", "foo/+", "", "wildcards not allowed in publish"},
{"foo/#", "foo/#", "", "wildcards not allowed in publish"},
{"foo bar", "foo bar", "", "not supported"},
{"foo.bar", "foo.bar", "", "not supported"},
} {
t.Run(test.name, func(t *testing.T) {
res, err := mqttTopicToNATSPubSubject([]byte(test.mqttTopic))
if test.err != _EMPTY_ {
if err == nil || !strings.Contains(err.Error(), test.err) {
t.Fatalf("Expected error %q, got %q", test.err, err.Error())
t.Fatalf("Expected error %q, got %v", test.err, err)
}
return
}
Expand Down Expand Up @@ -1813,6 +1828,7 @@ func TestMQTTFilterConversion(t *testing.T) {
{"single level wildcard", "foo//+//", "foo./.*././"},
{"single level wildcard", "foo//+//bar", "foo./.*./.bar"},
{"single level wildcard", "foo///+///bar", "foo././.*././.bar"},
{"single level wildcard", "foo.bar///+///baz", "foo//bar././.*././.baz"},

{"multi level wildcard", "#", ">"},
{"multi level wildcard", "/#", "/.>"},
Expand All @@ -1821,6 +1837,7 @@ func TestMQTTFilterConversion(t *testing.T) {
{"multi level wildcard", "foo//#", "foo./.>"},
{"multi level wildcard", "foo///#", "foo././.>"},
{"multi level wildcard", "foo/bar/#", "foo.bar.>"},
{"multi level wildcard", "foo/bar.baz/#", "foo.bar//baz.>"},
} {
t.Run(test.name, func(t *testing.T) {
res, err := mqttFilterToNATSSubject([]byte(test.mqttTopic))
Expand Down Expand Up @@ -6454,6 +6471,47 @@ func TestMQTTSubjectWildcardStart(t *testing.T) {
require_True(t, si.State.Msgs == 0)
}

func TestMQTTTopicWithDot(t *testing.T) {
o := testMQTTDefaultOptions()
s := testMQTTRunServer(t, o)
defer testMQTTShutdownServer(s)

nc := natsConnect(t, s.ClientURL(), nats.UserInfo("mqtt", "pwd"))
defer nc.Close()

sub := natsSubSync(t, nc, "*.*")

c1, r1 := testMQTTConnect(t, &mqttConnInfo{user: "mqtt", pass: "pwd", clientID: "conn1", cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
defer c1.Close()
testMQTTCheckConnAck(t, r1, mqttConnAckRCConnectionAccepted, false)
testMQTTSub(t, 1, c1, r1, []*mqttFilter{{filter: "spBv1.0/plant1", qos: 0}}, []byte{0})
testMQTTSub(t, 1, c1, r1, []*mqttFilter{{filter: "spBv1.0/plant2", qos: 1}}, []byte{1})

c2, r2 := testMQTTConnect(t, &mqttConnInfo{user: "mqtt", pass: "pwd", clientID: "conn2", cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
defer c2.Close()
testMQTTCheckConnAck(t, r2, mqttConnAckRCConnectionAccepted, false)

testMQTTPublish(t, c2, r2, 0, false, false, "spBv1.0/plant1", 0, []byte("msg1"))
testMQTTCheckPubMsg(t, c1, r1, "spBv1.0/plant1", 0, []byte("msg1"))
msg := natsNexMsg(t, sub, time.Second)
require_Equal(t, msg.Subject, "spBv1//0.plant1")

testMQTTPublish(t, c2, r2, 1, false, false, "spBv1.0/plant2", 1, []byte("msg2"))
testMQTTCheckPubMsg(t, c1, r1, "spBv1.0/plant2", mqttPubQos1, []byte("msg2"))
msg = natsNexMsg(t, sub, time.Second)
require_Equal(t, msg.Subject, "spBv1//0.plant2")

natsPub(t, nc, "spBv1//0.plant1", []byte("msg3"))
testMQTTCheckPubMsg(t, c1, r1, "spBv1.0/plant1", 0, []byte("msg3"))
msg = natsNexMsg(t, sub, time.Second)
require_Equal(t, msg.Subject, "spBv1//0.plant1")

natsPub(t, nc, "spBv1//0.plant2", []byte("msg4"))
testMQTTCheckPubMsg(t, c1, r1, "spBv1.0/plant2", 0, []byte("msg4"))
msg = natsNexMsg(t, sub, time.Second)
require_Equal(t, msg.Subject, "spBv1//0.plant2")
}

//////////////////////////////////////////////////////////////////////////
//
// Benchmarks
Expand Down

0 comments on commit 91d0b6a

Please sign in to comment.