Skip to content

Commit

Permalink
Merge pull request #365 from ChIoT-Tech/Issue362-Race
Browse files Browse the repository at this point in the history
Resolve potential race in resume/reconnect
  • Loading branch information
Al S-M committed Oct 24, 2019
2 parents 79454df + ed4e938 commit 962bb61
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 4 deletions.
21 changes: 18 additions & 3 deletions client.go
Expand Up @@ -113,6 +113,7 @@ type client struct {
stop chan struct{}
persist Store
options ClientOptions
optionsMu sync.Mutex // Protects the options in a few limited cases where needed for testing
workers sync.WaitGroup
}

Expand Down Expand Up @@ -141,7 +142,6 @@ func NewClient(o *ClientOptions) Client {
c.messageIds = messageIds{index: make(map[uint16]tokenCompletor)}
c.msgRouter, c.stopRouter = newRouter()
c.msgRouter.setDefaultHandler(c.options.DefaultPublishHandler)

return c
}

Expand Down Expand Up @@ -246,7 +246,11 @@ func (c *client) Connect() Token {
}

RETRYCONN:
for _, broker := range c.options.Servers {
c.optionsMu.Lock() // Protect c.options.Servers so that servers can be added in test cases
brokers := c.options.Servers
c.optionsMu.Unlock()

for _, broker := range brokers {
cm := newConnectMsgFromOptions(&c.options, broker)
c.options.ProtocolVersion = protocolVersion
CONN:
Expand Down Expand Up @@ -353,6 +357,7 @@ func (c *client) Connect() Token {

// Take care of any messages in the store
if !c.options.CleanSession {
c.workers.Add(1) // disconnect during resume can lead to reconnect being called before resume completes
c.resume(c.options.ResumeSubs)
} else {
c.persist.Reset()
Expand All @@ -378,7 +383,10 @@ func (c *client) reconnect() {
if nil != c.options.OnReconnecting {
c.options.OnReconnecting(c, &c.options)
}
for _, broker := range c.options.Servers {
c.optionsMu.Lock() // Protect c.options.Servers so that servers can be added in test cases
brokers := c.options.Servers
c.optionsMu.Unlock()
for _, broker := range brokers {
cm := newConnectMsgFromOptions(&c.options, broker)
DEBUG.Println(CLI, "about to write new connect msg")
c.Lock()
Expand Down Expand Up @@ -465,6 +473,7 @@ func (c *client) reconnect() {
go outgoing(c)
go incoming(c)

c.workers.Add(1) // disconnect during resume can lead to reconnect being called before resume completes
c.resume(false)
}

Expand Down Expand Up @@ -786,6 +795,7 @@ func (c *client) reserveStoredPublishIDs() {
// Load all stored messages and resend them
// Call this to ensure QOS > 1,2 even after an application crash
func (c *client) resume(subscription bool) {
defer c.workers.Done() // resume must complete before any attempt to reconnect is made

storedKeys := c.persist.All()
for _, key := range storedKeys {
Expand All @@ -807,6 +817,7 @@ func (c *client) resume(subscription bool) {
select {
case c.oboundP <- &PacketAndToken{p: packet, t: token}:
case <-c.stop:
return
}
}
case *packets.UnsubscribePacket:
Expand All @@ -816,13 +827,15 @@ func (c *client) resume(subscription bool) {
select {
case c.oboundP <- &PacketAndToken{p: packet, t: token}:
case <-c.stop:
return
}
}
case *packets.PubrelPacket:
DEBUG.Println(STR, fmt.Sprintf("loaded pending pubrel (%d)", details.MessageID))
select {
case c.oboundP <- &PacketAndToken{p: packet, t: nil}:
case <-c.stop:
return
}
case *packets.PublishPacket:
token := newToken(packets.Publish).(*PublishToken)
Expand All @@ -833,6 +846,7 @@ func (c *client) resume(subscription bool) {
select {
case c.obound <- &PacketAndToken{p: packet, t: token}:
case <-c.stop:
return
}
default:
ERROR.Println(STR, "invalid message type in store (discarded)")
Expand All @@ -845,6 +859,7 @@ func (c *client) resume(subscription bool) {
select {
case c.ibound <- packet:
case <-c.stop:
return
}
default:
ERROR.Println(STR, "invalid message type in store (discarded)")
Expand Down
4 changes: 3 additions & 1 deletion fvt_client_test.go
Expand Up @@ -1118,7 +1118,9 @@ func Test_ConnectRetry(t *testing.T) {
if connectToken.Error() != nil {
t.Fatalf("Connect returned error (should be retrying) (%v)", connectToken.Error())
}
c.options.AddBroker(FVTTCP) // note this is not threadsafe but should be OK for test
c.optionsMu.Lock() // Protect c.options.Servers so that servers can be added in test cases
c.options.AddBroker(FVTTCP)
c.optionsMu.Unlock()
if connectToken.Wait() && connectToken.Error() != nil {
t.Fatalf("Error connecting after valid broker added: %v", connectToken.Error())
}
Expand Down

0 comments on commit 962bb61

Please sign in to comment.