Skip to content

Commit

Permalink
[FIXED] MQTT: more generic names for outgoing stream, etc. (#4484)
Browse files Browse the repository at this point in the history
- [x] Branch rebased on top of current main (`git pull --rebase origin
dev`)
- [x] Changes squashed to a single commit (described
[here](http://gitready.com/advanced/2009/02/10/squashing-commits-with-rebase.html))
 - [ ] Build is green in Travis CI

### Changes proposed in this pull request:

Removed `qos2` reference from the MQTT outgoing stream name and
subjects.

Rationale: As I was reviewing the documentation and thinking how to move
forward with MQTT, I am inclined to use the `$MQTT_out` stream and
session-specific consumers (as opposed to subscription-specific) for all
QOS-related packet delivery and PI persistence. I think I will be able
to repurpose it "as is", so proposing the generic name change.

Since these (stream, consumer, subscriptions) are newly introduced in
#4349 and have not yet been
released, this is a safe change.
  • Loading branch information
derekcollison committed Sep 4, 2023
2 parents 72dede6 + b24941e commit 60e41aa
Showing 1 changed file with 15 additions and 14 deletions.
29 changes: 15 additions & 14 deletions server/mqtt.go
Expand Up @@ -124,11 +124,12 @@ const (
mqttQoS2IncomingMsgsStreamName = "$MQTT_qos2in"
mqttQoS2IncomingMsgsStreamSubjectPrefix = "$MQTT.qos2.in."

// Stream name and subjects for outgoing MQTT QoS2 PUBREL messages
mqttQoS2PubRelStreamName = "$MQTT_qos2out"
mqttQoS2PubRelConsumerDurablePrefix = "$MQTT_PUBREL_"
mqttQoS2PubRelStoredSubjectPrefix = "$MQTT.qos2.out."
mqttQoS2PubRelDeliverySubjectPrefix = "$MQTT.qos2.delivery."
// Stream name and subjects for outgoing MQTT QoS (PUBREL) messages
mqttOutStreamName = "$MQTT_out"
mqttOutSubjectPrefix = "$MQTT.out."
mqttPubRelSubjectPrefix = "$MQTT.out.pubrel."
mqttPubRelDeliverySubjectPrefix = "$MQTT.deliver.pubrel."
mqttPubRelConsumerDurablePrefix = "$MQTT_PUBREL_"

// As per spec, MQTT server may not redeliver QoS 1 and 2 messages to
// clients, except after client reconnects. However, NATS Server will
Expand Down Expand Up @@ -1274,15 +1275,15 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
}
}

if si, err := lookupStream(mqttQoS2PubRelStreamName, "QoS2 outgoing PUBREL"); err != nil {
if si, err := lookupStream(mqttOutStreamName, "QoS2 outgoing PUBREL"); err != nil {
return nil, err
} else if si == nil {
// Create the stream for the incoming QoS2 messages that have not been
// PUBREL-ed by the sender. NATS messages are submitted as
// "$MQTT.pubrel.<session hash>"
cfg := &StreamConfig{
Name: mqttQoS2PubRelStreamName,
Subjects: []string{mqttQoS2PubRelStoredSubjectPrefix + ">"},
Name: mqttOutStreamName,
Subjects: []string{mqttOutSubjectPrefix + ">"},
Storage: FileStorage,
Retention: InterestPolicy,
Replicas: replicas,
Expand Down Expand Up @@ -2577,9 +2578,9 @@ func mqttSessionCreate(jsa *mqttJSA, id, idHash string, seq uint64, opts *Option
idHash: idHash,
seq: seq,
maxp: maxp,
pubRelSubject: mqttQoS2PubRelStoredSubjectPrefix + idHash,
pubRelDeliverySubject: mqttQoS2PubRelDeliverySubjectPrefix + idHash,
pubRelDeliverySubjectB: []byte(mqttQoS2PubRelDeliverySubjectPrefix + idHash),
pubRelSubject: mqttPubRelSubjectPrefix + idHash,
pubRelDeliverySubject: mqttPubRelDeliverySubjectPrefix + idHash,
pubRelDeliverySubjectB: []byte(mqttPubRelDeliverySubjectPrefix + idHash),
}
}

Expand Down Expand Up @@ -2664,7 +2665,7 @@ func (sess *mqttSession) clear() error {
}
}
if pubRelDur != "" {
_, err := sess.jsa.deleteConsumer(mqttQoS2PubRelStreamName, pubRelDur)
_, err := sess.jsa.deleteConsumer(mqttOutStreamName, pubRelDur)
if isErrorOtherThan(err, JSConsumerNotFoundErr) {
return fmt.Errorf("unable to delete consumer %q for session %q: %v", pubRelDur, sess.id, err)
}
Expand Down Expand Up @@ -4463,10 +4464,10 @@ func (sess *mqttSession) ensurePubRelConsumerSubscription(c *client) error {
}

ccr := &CreateConsumerRequest{
Stream: mqttQoS2PubRelStreamName,
Stream: mqttOutStreamName,
Config: ConsumerConfig{
DeliverSubject: sess.pubRelDeliverySubject,
Durable: mqttQoS2PubRelConsumerDurablePrefix + sess.idHash,
Durable: mqttPubRelConsumerDurablePrefix + sess.idHash,
AckPolicy: AckExplicit,
DeliverPolicy: DeliverNew,
FilterSubject: sess.pubRelSubject,
Expand Down

0 comments on commit 60e41aa

Please sign in to comment.