Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve potential race in resume/reconnect #365

Merged
merged 2 commits into from Oct 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 18 additions & 3 deletions client.go
Expand Up @@ -113,6 +113,7 @@ type client struct {
stop chan struct{}
persist Store
options ClientOptions
optionsMu sync.Mutex // Protects the options in a few limited cases where needed for testing
workers sync.WaitGroup
}

Expand Down Expand Up @@ -141,7 +142,6 @@ func NewClient(o *ClientOptions) Client {
c.messageIds = messageIds{index: make(map[uint16]tokenCompletor)}
c.msgRouter, c.stopRouter = newRouter()
c.msgRouter.setDefaultHandler(c.options.DefaultPublishHandler)

return c
}

Expand Down Expand Up @@ -246,7 +246,11 @@ func (c *client) Connect() Token {
}

RETRYCONN:
for _, broker := range c.options.Servers {
c.optionsMu.Lock() // Protect c.options.Servers so that servers can be added in test cases
brokers := c.options.Servers
c.optionsMu.Unlock()

for _, broker := range brokers {
cm := newConnectMsgFromOptions(&c.options, broker)
c.options.ProtocolVersion = protocolVersion
CONN:
Expand Down Expand Up @@ -353,6 +357,7 @@ func (c *client) Connect() Token {

// Take care of any messages in the store
if !c.options.CleanSession {
c.workers.Add(1) // disconnect during resume can lead to reconnect being called before resume completes
c.resume(c.options.ResumeSubs)
} else {
c.persist.Reset()
Expand All @@ -378,7 +383,10 @@ func (c *client) reconnect() {
if nil != c.options.OnReconnecting {
c.options.OnReconnecting(c, &c.options)
}
for _, broker := range c.options.Servers {
c.optionsMu.Lock() // Protect c.options.Servers so that servers can be added in test cases
brokers := c.options.Servers
c.optionsMu.Unlock()
for _, broker := range brokers {
cm := newConnectMsgFromOptions(&c.options, broker)
DEBUG.Println(CLI, "about to write new connect msg")
c.Lock()
Expand Down Expand Up @@ -465,6 +473,7 @@ func (c *client) reconnect() {
go outgoing(c)
go incoming(c)

c.workers.Add(1) // disconnect during resume can lead to reconnect being called before resume completes
c.resume(false)
}

Expand Down Expand Up @@ -741,6 +750,7 @@ func (c *client) reserveStoredPublishIDs() {
// Load all stored messages and resend them
// Call this to ensure QOS > 1,2 even after an application crash
func (c *client) resume(subscription bool) {
defer c.workers.Done() // resume must complete before any attempt to reconnect is made

storedKeys := c.persist.All()
for _, key := range storedKeys {
Expand All @@ -758,6 +768,7 @@ func (c *client) resume(subscription bool) {
select {
case c.oboundP <- &PacketAndToken{p: packet, t: token}:
case <-c.stop:
return
}
}
case *packets.UnsubscribePacket:
Expand All @@ -767,13 +778,15 @@ func (c *client) resume(subscription bool) {
select {
case c.oboundP <- &PacketAndToken{p: packet, t: token}:
case <-c.stop:
return
}
}
case *packets.PubrelPacket:
DEBUG.Println(STR, fmt.Sprintf("loaded pending pubrel (%d)", details.MessageID))
select {
case c.oboundP <- &PacketAndToken{p: packet, t: nil}:
case <-c.stop:
return
}
case *packets.PublishPacket:
token := newToken(packets.Publish).(*PublishToken)
Expand All @@ -784,6 +797,7 @@ func (c *client) resume(subscription bool) {
select {
case c.obound <- &PacketAndToken{p: packet, t: token}:
case <-c.stop:
return
}
default:
ERROR.Println(STR, "invalid message type in store (discarded)")
Expand All @@ -796,6 +810,7 @@ func (c *client) resume(subscription bool) {
select {
case c.ibound <- packet:
case <-c.stop:
return
}
default:
ERROR.Println(STR, "invalid message type in store (discarded)")
Expand Down
62 changes: 32 additions & 30 deletions fvt_client_test.go
Expand Up @@ -38,18 +38,18 @@ func Test_Start(t *testing.T) {
}

/* uncomment this if you have connection policy disallowing FailClientID
func Test_InvalidConnRc(t *testing.T) {
ops := NewClientOptions().SetClientID("FailClientID").
AddBroker("tcp://" + FVT_IP + ":17003").
SetStore(NewFileStore("/tmp/fvt/InvalidConnRc"))

c := NewClient(ops)
_, err := c.Connect()
if err != ErrNotAuthorized {
t.Fatalf("Did not receive error as expected, got %v", err)
}
c.Disconnect(250)
}
func Test_InvalidConnRc(t *testing.T) {
ops := NewClientOptions().SetClientID("FailClientID").
AddBroker("tcp://" + FVT_IP + ":17003").
SetStore(NewFileStore("/tmp/fvt/InvalidConnRc"))

c := NewClient(ops)
_, err := c.Connect()
if err != ErrNotAuthorized {
t.Fatalf("Did not receive error as expected, got %v", err)
}
c.Disconnect(250)
}
*/

// Helper function for Test_Start_Ssl
Expand All @@ -75,22 +75,22 @@ func NewTLSConfig() *tls.Config {
}

/* uncomment this if you have ssl setup
func Test_Start_Ssl(t *testing.T) {
tlsconfig := NewTlsConfig()
ops := NewClientOptions().SetClientID("StartSsl").
AddBroker(FVT_SSL).
SetStore(NewFileStore("/tmp/fvt/Start_Ssl")).
SetTlsConfig(tlsconfig)

c := NewClient(ops)

_, err := c.Connect()
if err != nil {
t.Fatalf("Error on Client.Connect(): %v", err)
}

c.Disconnect(250)
}
func Test_Start_Ssl(t *testing.T) {
tlsconfig := NewTlsConfig()
ops := NewClientOptions().SetClientID("StartSsl").
AddBroker(FVT_SSL).
SetStore(NewFileStore("/tmp/fvt/Start_Ssl")).
SetTlsConfig(tlsconfig)

c := NewClient(ops)

_, err := c.Connect()
if err != nil {
t.Fatalf("Error on Client.Connect(): %v", err)
}

c.Disconnect(250)
}
*/

func Test_Publish_1(t *testing.T) {
Expand Down Expand Up @@ -1111,7 +1111,9 @@ func Test_ConnectRetry(t *testing.T) {
if connectToken.Error() != nil {
t.Fatalf("Connect returned error (should be retrying) (%v)", connectToken.Error())
}
c.options.AddBroker(FVTTCP) // note this is not threadsafe but should be OK for test
c.optionsMu.Lock() // Protect c.options.Servers so that servers can be added in test cases
c.options.AddBroker(FVTTCP)
c.optionsMu.Unlock()
if connectToken.Wait() && connectToken.Error() != nil {
t.Fatalf("Error connecting after valid broker added: %v", connectToken.Error())
}
Expand Down Expand Up @@ -1181,7 +1183,7 @@ func Test_ConnectRetryPublish(t *testing.T) {

// disconnect and then reconnect with correct server
p.Disconnect(250)

pops = NewClientOptions().AddBroker(FVTTCP).SetClientID("crp-pub").SetCleanSession(false).
SetStore(memStore2).SetConnectRetry(true).SetConnectRetryInterval(time.Second / 2)
p = NewClient(pops).(*client)
Expand Down