Skip to content

Commit

Permalink
Merge pull request #333 from ChIoT-Tech/master
Browse files Browse the repository at this point in the history
Persistence whilst awaiting initial connection
  • Loading branch information
Al S-M committed Aug 29, 2019
2 parents 08f8223 + 660697f commit c49ea09
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 7 deletions.
68 changes: 62 additions & 6 deletions client.go
Expand Up @@ -156,6 +156,8 @@ func (c *client) AddRoute(topic string, callback MessageHandler) {

// IsConnected returns a bool signifying whether
// the client is connected or not.
// connected means that the connection is up now OR it will
// be established/reestablished automatically when possible
func (c *client) IsConnected() bool {
c.RLock()
defer c.RUnlock()
Expand All @@ -165,6 +167,8 @@ func (c *client) IsConnected() bool {
return true
case c.options.AutoReconnect && status > connecting:
return true
case c.options.ConnectRetry && status == connecting:
return true
default:
return false
}
Expand Down Expand Up @@ -209,14 +213,27 @@ func (c *client) Connect() Token {
t := newToken(packets.Connect).(*ConnectToken)
DEBUG.Println(CLI, "Connect()")

if c.options.ConnectRetry && atomic.LoadUint32(&c.status) != disconnected {
// if in any state other than disconnected and ConnectRetry is
// enabled then the connection will come up automatically
// client can assume connection is up
WARN.Println(CLI, "Connect() called but not disconnected")
t.returnCode = packets.Accepted
t.flowComplete()
return t
}

c.obound = make(chan *PacketAndToken)
c.oboundP = make(chan *PacketAndToken)
c.ibound = make(chan packets.ControlPacket)

go func() {
c.persist.Open()
c.persist.Open()
if c.options.ConnectRetry {
c.reserveStoredPublishIDs() // Reserve IDs to allow publish before connect complete
}
c.setConnected(connecting)

c.setConnected(connecting)
go func() {
c.errors = make(chan error, 1)
c.stop = make(chan struct{})

Expand All @@ -228,12 +245,16 @@ func (c *client) Connect() Token {
return
}

RETRYCONN:
for _, broker := range c.options.Servers {
cm := newConnectMsgFromOptions(&c.options, broker)
c.options.ProtocolVersion = protocolVersion
CONN:
DEBUG.Println(CLI, "about to write new connect msg")
c.conn, err = openConnection(broker, c.options.TLSConfig, c.options.ConnectTimeout, c.options.HTTPHeaders)
c.Lock()
c.conn, err = openConnection(broker, c.options.TLSConfig, c.options.ConnectTimeout,
c.options.HTTPHeaders)
c.Unlock()
if err == nil {
DEBUG.Println(CLI, "socket connected to broker")
switch c.options.ProtocolVersion {
Expand All @@ -259,10 +280,12 @@ func (c *client) Connect() Token {

rc, t.sessionPresent = c.connect()
if rc != packets.Accepted {
c.Lock()
if c.conn != nil {
c.conn.Close()
c.conn = nil
}
c.Unlock()
//if the protocol version was explicitly set don't do any fallback
if c.options.protocolVersionExplicit {
ERROR.Println(CLI, "Connecting to", broker, "CONNACK was not CONN_ACCEPTED, but rather", packets.ConnackReturnCodes[rc])
Expand All @@ -283,6 +306,14 @@ func (c *client) Connect() Token {
}

if c.conn == nil {
if c.options.ConnectRetry {
DEBUG.Println(CLI, "Connect failed, sleeping for", int(c.options.ConnectRetryInterval.Seconds()), "seconds and will then retry")
time.Sleep(c.options.ConnectRetryInterval)

if atomic.LoadUint32(&c.status) == connecting {
goto RETRYCONN
}
}
ERROR.Println(CLI, "Failed to connect to a broker")
c.setConnected(disconnected)
c.persist.Close()
Expand Down Expand Up @@ -596,9 +627,12 @@ func (c *client) Publish(topic string, qos byte, retained bool, payload interfac
token.messageID = pub.MessageID
}
persistOutbound(c.persist, pub)
if c.connectionStatus() == reconnecting {
switch c.connectionStatus() {
case connecting:
DEBUG.Println(CLI, "storing publish message (connecting), topic:", topic)
case reconnecting:
DEBUG.Println(CLI, "storing publish message (reconnecting), topic:", topic)
} else {
default:
DEBUG.Println(CLI, "sending publish message, topic:", topic)
publishWaitTimeout := c.options.WriteTimeout
if publishWaitTimeout == 0 {
Expand Down Expand Up @@ -673,6 +707,28 @@ func (c *client) SubscribeMultiple(filters map[string]byte, callback MessageHand
return token
}

// reserveStoredPublishIDs reserves the ids for publish packets in the persistant store to ensure these are not duplicated
func (c *client) reserveStoredPublishIDs() {
// The resume function sets the stored id for publish packets only (some other packets
// will get new ids in net code). This means that the only keys we need to ensure are
// unique are the publish ones (and these will completed/replaced in resume() )
if c.options.CleanSession == false {
storedKeys := c.persist.All()
for _, key := range storedKeys {
packet := c.persist.Get(key)
if packet == nil {
continue
}
switch packet.(type) {
case *packets.PublishPacket:
details := packet.Details()
token := &PlaceHolderToken{id: details.MessageID}
c.claimID(token, details.MessageID)
}
}
}
}

// Load all stored messages and resend them
// Call this to ensure QOS > 1,2 even after an application crash
func (c *client) resume(subscription bool) {
Expand Down
101 changes: 101 additions & 0 deletions fvt_client_test.go
Expand Up @@ -22,6 +22,8 @@ import (
"io/ioutil"
"testing"
"time"

"github.com/eclipse/paho.mqtt.golang/packets"
)

func Test_Start(t *testing.T) {
Expand Down Expand Up @@ -1097,3 +1099,102 @@ func Test_cleanUpMids_2(t *testing.T) {
}
fmt.Println(token.Error())
}

func Test_ConnectRetry(t *testing.T) {
// Connect for publish - initially use invalid server
cops := NewClientOptions().AddBroker("256.256.256.256").SetClientID("cr-pub").
SetConnectRetry(true).SetConnectRetryInterval(time.Second / 2)
c := NewClient(cops).(*client)
connectToken := c.Connect()

time.Sleep(time.Second) // Wait a second to ensure we are past SetConnectRetryInterval
if connectToken.Error() != nil {
t.Fatalf("Connect returned error (should be retrying) (%v)", connectToken.Error())
}
c.options.AddBroker(FVTTCP) // note this is not threadsafe but should be OK for test
if connectToken.Wait() && connectToken.Error() != nil {
t.Fatalf("Error connecting after valid broker added: %v", connectToken.Error())
}
c.Disconnect(250)
}

func Test_ConnectRetryPublish(t *testing.T) {
topic := "/test/connectRetry"
payload := "sample Payload"
choke := make(chan bool)

// subscribe to topic and wait for expected message (only received after connection successful)
sops := NewClientOptions().AddBroker(FVTTCP).SetClientID("crp-sub")
var f MessageHandler = func(client Client, msg Message) {
if msg.Topic() != topic || string(msg.Payload()) != payload {
t.Fatalf("Received unexpected message: %v, %v", msg.Topic(), msg.Payload())
}
choke <- true
}
sops.SetDefaultPublishHandler(f)

s := NewClient(sops)
if token := s.Connect(); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", token.Error())
}

if token := s.Subscribe(topic, 0, nil); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Subscribe(): %v", token.Error())
}

// Connect for publish - initially use invalid server
memStore := NewMemoryStore()
memStore.Open()
pops := NewClientOptions().AddBroker("256.256.256.256").SetClientID("crp-pub").
SetStore(memStore).SetConnectRetry(true).SetConnectRetryInterval(time.Second / 2)
p := NewClient(pops).(*client)
connectToken := p.Connect()
p.Publish(topic, 1, false, payload)
// Check publish packet in the memorystore
ids := memStore.All()
if len(ids) == 0 {
t.Fatalf("Expected published message to be in store")
} else if len(ids) != 1 {
t.Fatalf("Expected 1 message to be in store")
}
packet := memStore.Get(ids[0])
if packet == nil {
t.Fatal("Failed to retrieve packet from store")
}
pp, ok := packet.(*packets.PublishPacket)
if !ok {
t.Fatalf("Message in store not of the expected type (%T)", packet)
}
if pp.TopicName != topic || string(pp.Payload) != payload {
t.Fatalf("Stored message Packet contents not as expected (%v, %v)", pp.TopicName, pp.Payload)
}
time.Sleep(time.Second) // Wait a second to ensure we are past SetConnectRetryInterval
if connectToken.Error() != nil {
t.Fatalf("Connect returned error (should be retrying) (%v)", connectToken.Error())
}

// disconnecting closes the store (both in disconnect and in Connect which runs as a goRoutine).
// As such we duplicate the store
memStore2 := NewMemoryStore()
memStore2.Open()
memStore2.Put(ids[0], packet)

// disconnect and then reconnect with correct server
p.Disconnect(250)

pops = NewClientOptions().AddBroker(FVTTCP).SetClientID("crp-pub").SetCleanSession(false).
SetStore(memStore2).SetConnectRetry(true).SetConnectRetryInterval(time.Second / 2)
p = NewClient(pops).(*client)
if token := p.Connect(); token.Wait() && token.Error() != nil {
t.Fatalf("Error on valid Publish.Connect(): %v", token.Error())
}

if connectToken.Wait() && connectToken.Error() == nil {
t.Fatalf("Expected connection error - got nil")
}
wait(choke)

p.Disconnect(250)
s.Disconnect(250)
memStore.Close()
}
26 changes: 25 additions & 1 deletion messageids.go
Expand Up @@ -114,4 +114,28 @@ func (d *DummyToken) Error() error {
return nil
}

func (d *DummyToken) setError(e error) {}
func (p *DummyToken) setError(e error) {}

// PlaceHolderToken does nothing and was implemented to allow a messageid to be reserved
// it differs from DummyToken in that calling flowComplete does not generate an error (it
// is expected that flowComplete will be called when the token is overwritten with a real token)
type PlaceHolderToken struct {
id uint16
}

func (p *PlaceHolderToken) Wait() bool {
return true
}

func (p *PlaceHolderToken) WaitTimeout(t time.Duration) bool {
return true
}

func (p *PlaceHolderToken) flowComplete() {
}

func (p *PlaceHolderToken) Error() error {
return nil
}

func (p *PlaceHolderToken) setError(e error) {}
21 changes: 21 additions & 0 deletions options.go
Expand Up @@ -68,6 +68,8 @@ type ClientOptions struct {
ConnectTimeout time.Duration
MaxReconnectInterval time.Duration
AutoReconnect bool
ConnectRetryInterval time.Duration
ConnectRetry bool
Store Store
DefaultPublishHandler MessageHandler
OnConnect OnConnectHandler
Expand Down Expand Up @@ -107,6 +109,8 @@ func NewClientOptions() *ClientOptions {
ConnectTimeout: 30 * time.Second,
MaxReconnectInterval: 10 * time.Minute,
AutoReconnect: true,
ConnectRetryInterval: 30 * time.Second,
ConnectRetry: false,
Store: nil,
OnConnect: nil,
OnConnectionLost: DefaultConnectionLostHandler,
Expand Down Expand Up @@ -326,6 +330,23 @@ func (o *ClientOptions) SetAutoReconnect(a bool) *ClientOptions {
return o
}

// SetConnectRetryInterval sets the time that will be waited between connection attempts
// when initially connecting if ConnectRetry is TRUE
func (o *ClientOptions) SetConnectRetryInterval(t time.Duration) *ClientOptions {
o.ConnectRetryInterval = t
return o
}

// SetConnectRetry sets whether the connect function will automatically retry the connection
// in the event of a failure (when true the token returned by the Connect function will
// not complete until the connection is up or it is cancelled)
// If ConnectRetry is true then subscriptions should be requested in OnConnect handler
// Setting this to TRUE permits mesages to be published before the connection is established
func (o *ClientOptions) SetConnectRetry(a bool) *ClientOptions {
o.ConnectRetry = a
return o
}

// SetMessageChannelDepth DEPRECATED The value set here no longer has any effect, this function
// remains so the API is not altered.
func (o *ClientOptions) SetMessageChannelDepth(s uint) *ClientOptions {
Expand Down
12 changes: 12 additions & 0 deletions options_reader.go
Expand Up @@ -133,6 +133,18 @@ func (r *ClientOptionsReader) AutoReconnect() bool {
return s
}

//ConnectRetryInterval returns the delay between retries on the initial connection (if ConnectRetry true)
func (r *ClientOptionsReader) ConnectRetryInterval() time.Duration {
s := r.options.ConnectRetryInterval
return s
}

//ConnectRetry returns whether the initial connection request will be retried until connection established
func (r *ClientOptionsReader) ConnectRetry() bool {
s := r.options.ConnectRetry
return s
}

func (r *ClientOptionsReader) WriteTimeout() time.Duration {
s := r.options.WriteTimeout
return s
Expand Down

0 comments on commit c49ea09

Please sign in to comment.