Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persistence whilst awaiting initial connection #333

Merged
merged 2 commits into from Aug 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
68 changes: 62 additions & 6 deletions client.go
Expand Up @@ -156,6 +156,8 @@ func (c *client) AddRoute(topic string, callback MessageHandler) {

// IsConnected returns a bool signifying whether
// the client is connected or not.
// connected means that the connection is up now OR it will
// be established/reestablished automatically when possible
func (c *client) IsConnected() bool {
c.RLock()
defer c.RUnlock()
Expand All @@ -165,6 +167,8 @@ func (c *client) IsConnected() bool {
return true
case c.options.AutoReconnect && status > connecting:
return true
case c.options.ConnectRetry && status == connecting:
return true
default:
return false
}
Expand Down Expand Up @@ -209,14 +213,27 @@ func (c *client) Connect() Token {
t := newToken(packets.Connect).(*ConnectToken)
DEBUG.Println(CLI, "Connect()")

if c.options.ConnectRetry && atomic.LoadUint32(&c.status) != disconnected {
// if in any state other than disconnected and ConnectRetry is
// enabled then the connection will come up automatically
// client can assume connection is up
WARN.Println(CLI, "Connect() called but not disconnected")
t.returnCode = packets.Accepted
t.flowComplete()
return t
}

c.obound = make(chan *PacketAndToken)
c.oboundP = make(chan *PacketAndToken)
c.ibound = make(chan packets.ControlPacket)

go func() {
c.persist.Open()
c.persist.Open()
if c.options.ConnectRetry {
c.reserveStoredPublishIDs() // Reserve IDs to allow publish before connect complete
}
c.setConnected(connecting)

c.setConnected(connecting)
go func() {
c.errors = make(chan error, 1)
c.stop = make(chan struct{})

Expand All @@ -228,12 +245,16 @@ func (c *client) Connect() Token {
return
}

RETRYCONN:
for _, broker := range c.options.Servers {
cm := newConnectMsgFromOptions(&c.options, broker)
c.options.ProtocolVersion = protocolVersion
CONN:
DEBUG.Println(CLI, "about to write new connect msg")
c.conn, err = openConnection(broker, c.options.TLSConfig, c.options.ConnectTimeout, c.options.HTTPHeaders)
c.Lock()
c.conn, err = openConnection(broker, c.options.TLSConfig, c.options.ConnectTimeout,
c.options.HTTPHeaders)
c.Unlock()
if err == nil {
DEBUG.Println(CLI, "socket connected to broker")
switch c.options.ProtocolVersion {
Expand All @@ -259,10 +280,12 @@ func (c *client) Connect() Token {

rc, t.sessionPresent = c.connect()
if rc != packets.Accepted {
c.Lock()
if c.conn != nil {
c.conn.Close()
c.conn = nil
}
c.Unlock()
//if the protocol version was explicitly set don't do any fallback
if c.options.protocolVersionExplicit {
ERROR.Println(CLI, "Connecting to", broker, "CONNACK was not CONN_ACCEPTED, but rather", packets.ConnackReturnCodes[rc])
Expand All @@ -283,6 +306,14 @@ func (c *client) Connect() Token {
}

if c.conn == nil {
if c.options.ConnectRetry {
DEBUG.Println(CLI, "Connect failed, sleeping for", int(c.options.ConnectRetryInterval.Seconds()), "seconds and will then retry")
time.Sleep(c.options.ConnectRetryInterval)

if atomic.LoadUint32(&c.status) == connecting {
goto RETRYCONN
}
}
ERROR.Println(CLI, "Failed to connect to a broker")
c.setConnected(disconnected)
c.persist.Close()
Expand Down Expand Up @@ -596,9 +627,12 @@ func (c *client) Publish(topic string, qos byte, retained bool, payload interfac
token.messageID = pub.MessageID
}
persistOutbound(c.persist, pub)
if c.connectionStatus() == reconnecting {
switch c.connectionStatus() {
case connecting:
DEBUG.Println(CLI, "storing publish message (connecting), topic:", topic)
case reconnecting:
DEBUG.Println(CLI, "storing publish message (reconnecting), topic:", topic)
} else {
default:
DEBUG.Println(CLI, "sending publish message, topic:", topic)
publishWaitTimeout := c.options.WriteTimeout
if publishWaitTimeout == 0 {
Expand Down Expand Up @@ -673,6 +707,28 @@ func (c *client) SubscribeMultiple(filters map[string]byte, callback MessageHand
return token
}

// reserveStoredPublishIDs reserves the ids for publish packets in the persistant store to ensure these are not duplicated
func (c *client) reserveStoredPublishIDs() {
// The resume function sets the stored id for publish packets only (some other packets
// will get new ids in net code). This means that the only keys we need to ensure are
// unique are the publish ones (and these will completed/replaced in resume() )
if c.options.CleanSession == false {
storedKeys := c.persist.All()
for _, key := range storedKeys {
packet := c.persist.Get(key)
if packet == nil {
continue
}
switch packet.(type) {
case *packets.PublishPacket:
details := packet.Details()
token := &PlaceHolderToken{id: details.MessageID}
c.claimID(token, details.MessageID)
}
}
}
}

// Load all stored messages and resend them
// Call this to ensure QOS > 1,2 even after an application crash
func (c *client) resume(subscription bool) {
Expand Down
101 changes: 101 additions & 0 deletions fvt_client_test.go
Expand Up @@ -22,6 +22,8 @@ import (
"io/ioutil"
"testing"
"time"

"github.com/eclipse/paho.mqtt.golang/packets"
)

func Test_Start(t *testing.T) {
Expand Down Expand Up @@ -1097,3 +1099,102 @@ func Test_cleanUpMids_2(t *testing.T) {
}
fmt.Println(token.Error())
}

func Test_ConnectRetry(t *testing.T) {
// Connect for publish - initially use invalid server
cops := NewClientOptions().AddBroker("256.256.256.256").SetClientID("cr-pub").
SetConnectRetry(true).SetConnectRetryInterval(time.Second / 2)
c := NewClient(cops).(*client)
connectToken := c.Connect()

time.Sleep(time.Second) // Wait a second to ensure we are past SetConnectRetryInterval
if connectToken.Error() != nil {
t.Fatalf("Connect returned error (should be retrying) (%v)", connectToken.Error())
}
c.options.AddBroker(FVTTCP) // note this is not threadsafe but should be OK for test
if connectToken.Wait() && connectToken.Error() != nil {
t.Fatalf("Error connecting after valid broker added: %v", connectToken.Error())
}
c.Disconnect(250)
}

func Test_ConnectRetryPublish(t *testing.T) {
topic := "/test/connectRetry"
payload := "sample Payload"
choke := make(chan bool)

// subscribe to topic and wait for expected message (only received after connection successful)
sops := NewClientOptions().AddBroker(FVTTCP).SetClientID("crp-sub")
var f MessageHandler = func(client Client, msg Message) {
if msg.Topic() != topic || string(msg.Payload()) != payload {
t.Fatalf("Received unexpected message: %v, %v", msg.Topic(), msg.Payload())
}
choke <- true
}
sops.SetDefaultPublishHandler(f)

s := NewClient(sops)
if token := s.Connect(); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", token.Error())
}

if token := s.Subscribe(topic, 0, nil); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Subscribe(): %v", token.Error())
}

// Connect for publish - initially use invalid server
memStore := NewMemoryStore()
memStore.Open()
pops := NewClientOptions().AddBroker("256.256.256.256").SetClientID("crp-pub").
SetStore(memStore).SetConnectRetry(true).SetConnectRetryInterval(time.Second / 2)
p := NewClient(pops).(*client)
connectToken := p.Connect()
p.Publish(topic, 1, false, payload)
// Check publish packet in the memorystore
ids := memStore.All()
if len(ids) == 0 {
t.Fatalf("Expected published message to be in store")
} else if len(ids) != 1 {
t.Fatalf("Expected 1 message to be in store")
}
packet := memStore.Get(ids[0])
if packet == nil {
t.Fatal("Failed to retrieve packet from store")
}
pp, ok := packet.(*packets.PublishPacket)
if !ok {
t.Fatalf("Message in store not of the expected type (%T)", packet)
}
if pp.TopicName != topic || string(pp.Payload) != payload {
t.Fatalf("Stored message Packet contents not as expected (%v, %v)", pp.TopicName, pp.Payload)
}
time.Sleep(time.Second) // Wait a second to ensure we are past SetConnectRetryInterval
if connectToken.Error() != nil {
t.Fatalf("Connect returned error (should be retrying) (%v)", connectToken.Error())
}

// disconnecting closes the store (both in disconnect and in Connect which runs as a goRoutine).
// As such we duplicate the store
memStore2 := NewMemoryStore()
memStore2.Open()
memStore2.Put(ids[0], packet)

// disconnect and then reconnect with correct server
p.Disconnect(250)

pops = NewClientOptions().AddBroker(FVTTCP).SetClientID("crp-pub").SetCleanSession(false).
SetStore(memStore2).SetConnectRetry(true).SetConnectRetryInterval(time.Second / 2)
p = NewClient(pops).(*client)
if token := p.Connect(); token.Wait() && token.Error() != nil {
t.Fatalf("Error on valid Publish.Connect(): %v", token.Error())
}

if connectToken.Wait() && connectToken.Error() == nil {
t.Fatalf("Expected connection error - got nil")
}
wait(choke)

p.Disconnect(250)
s.Disconnect(250)
memStore.Close()
}
26 changes: 25 additions & 1 deletion messageids.go
Expand Up @@ -114,4 +114,28 @@ func (d *DummyToken) Error() error {
return nil
}

func (d *DummyToken) setError(e error) {}
func (p *DummyToken) setError(e error) {}

// PlaceHolderToken does nothing and was implemented to allow a messageid to be reserved
// it differs from DummyToken in that calling flowComplete does not generate an error (it
// is expected that flowComplete will be called when the token is overwritten with a real token)
type PlaceHolderToken struct {
id uint16
}

func (p *PlaceHolderToken) Wait() bool {
return true
}

func (p *PlaceHolderToken) WaitTimeout(t time.Duration) bool {
return true
}

func (p *PlaceHolderToken) flowComplete() {
}

func (p *PlaceHolderToken) Error() error {
return nil
}

func (p *PlaceHolderToken) setError(e error) {}
21 changes: 21 additions & 0 deletions options.go
Expand Up @@ -68,6 +68,8 @@ type ClientOptions struct {
ConnectTimeout time.Duration
MaxReconnectInterval time.Duration
AutoReconnect bool
ConnectRetryInterval time.Duration
ConnectRetry bool
Store Store
DefaultPublishHandler MessageHandler
OnConnect OnConnectHandler
Expand Down Expand Up @@ -107,6 +109,8 @@ func NewClientOptions() *ClientOptions {
ConnectTimeout: 30 * time.Second,
MaxReconnectInterval: 10 * time.Minute,
AutoReconnect: true,
ConnectRetryInterval: 30 * time.Second,
ConnectRetry: false,
Store: nil,
OnConnect: nil,
OnConnectionLost: DefaultConnectionLostHandler,
Expand Down Expand Up @@ -326,6 +330,23 @@ func (o *ClientOptions) SetAutoReconnect(a bool) *ClientOptions {
return o
}

// SetConnectRetryInterval sets the time that will be waited between connection attempts
// when initially connecting if ConnectRetry is TRUE
func (o *ClientOptions) SetConnectRetryInterval(t time.Duration) *ClientOptions {
o.ConnectRetryInterval = t
return o
}

// SetConnectRetry sets whether the connect function will automatically retry the connection
// in the event of a failure (when true the token returned by the Connect function will
// not complete until the connection is up or it is cancelled)
// If ConnectRetry is true then subscriptions should be requested in OnConnect handler
// Setting this to TRUE permits mesages to be published before the connection is established
func (o *ClientOptions) SetConnectRetry(a bool) *ClientOptions {
o.ConnectRetry = a
return o
}

// SetMessageChannelDepth DEPRECATED The value set here no longer has any effect, this function
// remains so the API is not altered.
func (o *ClientOptions) SetMessageChannelDepth(s uint) *ClientOptions {
Expand Down
12 changes: 12 additions & 0 deletions options_reader.go
Expand Up @@ -133,6 +133,18 @@ func (r *ClientOptionsReader) AutoReconnect() bool {
return s
}

//ConnectRetryInterval returns the delay between retries on the initial connection (if ConnectRetry true)
func (r *ClientOptionsReader) ConnectRetryInterval() time.Duration {
s := r.options.ConnectRetryInterval
return s
}

//ConnectRetry returns whether the initial connection request will be retried until connection established
func (r *ClientOptionsReader) ConnectRetry() bool {
s := r.options.ConnectRetry
return s
}

func (r *ClientOptionsReader) WriteTimeout() time.Duration {
s := r.options.WriteTimeout
return s
Expand Down