Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can not support shared subscription #1034

Open
cdmikechen opened this issue Jul 26, 2023 · 0 comments
Open

Can not support shared subscription #1034

cdmikechen opened this issue Jul 26, 2023 · 0 comments
Assignees

Comments

@cdmikechen
Copy link

cdmikechen commented Jul 26, 2023

Describe the bug
Shared subscriptions still don't seem to be supported by current MOPs, like $share/group1/persistent://public/default/a/#.
Shared subscription link: https://www.emqx.com/zh/blog/introduction-to-mqtt5-protocol-shared-subscription

To Reproduce
Send a message using topic a/b/c and subsribe topic using $share/group1/persistent://public/default/a/# with basic auth.

2023-07-26T10:55:28,122+0800 [pulsar-ph-mqtt-54-4] ERROR io.streamnative.pulsar.handlers.mqtt.MQTTCommonInboundHandler - Exception was caught while processing MQTT message,
java.lang.IllegalArgumentException: Invalid topic domain: '$share/group1/persistent'
	at org.apache.pulsar.common.naming.TopicDomain.getEnum(TopicDomain.java:43) ~[org.apache.pulsar-pulsar-client-admin-api-2.11.2.jar:2.11.2]
	at org.apache.pulsar.common.naming.TopicName.<init>(TopicName.java:130) ~[org.apache.pulsar-pulsar-common-2.11.2.jar:2.11.2]
	at org.apache.pulsar.common.naming.TopicName.<init>(TopicName.java:36) ~[org.apache.pulsar-pulsar-common-2.11.2.jar:2.11.2]
	at org.apache.pulsar.common.naming.TopicName$1.load(TopicName.java:59) ~[org.apache.pulsar-pulsar-common-2.11.2.jar:2.11.2]
	at org.apache.pulsar.common.naming.TopicName$1.load(TopicName.java:56) ~[org.apache.pulsar-pulsar-common-2.11.2.jar:2.11.2]
	at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3571) ~[com.google.guava-guava-32.1.1-jre.jar:?]
	at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2313) ~[com.google.guava-guava-32.1.1-jre.jar:?]
	at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2190) ~[com.google.guava-guava-32.1.1-jre.jar:?]
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2080) ~[com.google.guava-guava-32.1.1-jre.jar:?]
	at com.google.common.cache.LocalCache.get(LocalCache.java:4012) ~[com.google.guava-guava-32.1.1-jre.jar:?]
	at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035) ~[com.google.guava-guava-32.1.1-jre.jar:?]
	at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5011) ~[com.google.guava-guava-32.1.1-jre.jar:?]
	at org.apache.pulsar.common.naming.TopicName.get(TopicName.java:81) ~[org.apache.pulsar-pulsar-common-2.11.2.jar:2.11.2]
	at io.streamnative.pulsar.handlers.mqtt.support.MQTTBrokerProtocolMethodProcessor.processSubscribe(MQTTBrokerProtocolMethodProcessor.java:342) ~[?:?]
	at io.streamnative.pulsar.handlers.mqtt.MQTTCommonInboundHandler.channelRead(MQTTCommonInboundHandler.java:73) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.93.Final.jar:4.1.93.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.93.Final.jar:4.1.93.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.93.Final.jar:4.1.93.Final]
	at io.streamnative.pulsar.handlers.mqtt.adapter.CombineHandler.channelRead(CombineHandler.java:31) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.93.Final.jar:4.1.93.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.93.Final.jar:4.1.93.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.93.Final.jar:4.1.93.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[io.netty-netty-codec-4.1.93.Final.jar:4.1.93.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[io.netty-netty-codec-4.1.93.Final.jar:4.1.93.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.93.Final.jar:4.1.93.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.93.Final.jar:4.1.93.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.93.Final.jar:4.1.93.Final]
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) ~[io.netty-netty-handler-4.1.93.Final.jar:4.1.93.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[io.netty-netty-transport-4.1.93.Final.jar:4.1.93.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.93.Final.jar:4.1.93.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.93.Final.jar:4.1.93.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[io.netty-netty-transport-4.1.93.Final.jar:4.1.93.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[io.netty-netty-transport-4.1.93.Final.jar:4.1.93.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.93.Final.jar:4.1.93.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[io.netty-netty-transport-4.1.93.Final.jar:4.1.93.Final]
	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800) ~[io.netty-netty-transport-classes-epoll-4.1.93.Final.jar:4.1.93.Final]
	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499) ~[io.netty-netty-transport-classes-epoll-4.1.93.Final.jar:4.1.93.Final]
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397) ~[io.netty-netty-transport-classes-epoll-4.1.93.Final.jar:4.1.93.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[io.netty-netty-common-4.1.93.Final.jar:4.1.93.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.netty-netty-common-4.1.93.Final.jar:4.1.93.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.93.Final.jar:4.1.93.Final]
	at java.lang.Thread.run(Thread.java:833) ~[?:?]
2023-07-26T10:55:28,124+0800 [pulsar-ph-mqtt-54-4] INFO  io.streamnative.pulsar.handlers.mqtt.Connection - Closing connection clientId = mqttx_5290a7f8

Expected behavior
A clear and concise description of what you expected to happen.

Screenshots
Issue: [MIP-3] Support MQTT protocol version 5 #369

image

Desktop (please complete the following information):
NA

Additional context
NA

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants