Skip to content

Commit

Permalink
Merge pull request #471 from ChIoT-Tech/master
Browse files Browse the repository at this point in the history
Fix typos and lint issues (including reformatting).
  • Loading branch information
MattBrittan committed Dec 21, 2020
2 parents 5695029 + 58086e1 commit fbe4d12
Show file tree
Hide file tree
Showing 35 changed files with 191 additions and 192 deletions.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -147,7 +147,7 @@ seeing may be due to:
* Bugs in the broker.
* Issues with whatever you are communicating with.

When submitting an issue please ensure that you provide sufficient details to enable us to eliminate causes outside of
When submitting an issue, please ensure that you provide sufficient details to enable us to eliminate causes outside of
this library.

Contributing
Expand Down
26 changes: 13 additions & 13 deletions client.go
Expand Up @@ -100,13 +100,13 @@ type client struct {
lastReceived atomic.Value // time.Time - the last time a packet was successfully received from network
pingOutstanding int32 // set to 1 if a ping has been sent but response not ret received

status uint32 // see consts at top of file for possible values
status uint32 // see const definitions at top of file for possible values
sync.RWMutex // Protects the above two variables (note: atomic writes are also used somewhat inconsistently)

messageIds // effectively a map from message id to token completor

obound chan *PacketAndToken // outgoing publish packet
oboundP chan *PacketAndToken // outgoing 'priotity' packet (anything other than publish)
oboundP chan *PacketAndToken // outgoing 'priority' packet (anything other than publish)
msgRouter *router // routes topics to handlers
persist Store
options ClientOptions
Expand Down Expand Up @@ -215,7 +215,7 @@ var ErrNotConnected = errors.New("not Connected")
// fails
// Note: If using QOS1+ and CleanSession=false it is advisable to add
// routes (or a DefaultPublishHandler) prior to calling Connect()
// because queued messages may be delivered immediatly post connection
// because queued messages may be delivered immediately post connection
func (c *client) Connect() Token {
t := newToken(packets.Connect).(*ConnectToken)
DEBUG.Println(CLI, "Connect()")
Expand Down Expand Up @@ -263,7 +263,7 @@ func (c *client) Connect() Token {
t.setError(err)
return
}
inboundFromStore := make(chan packets.ControlPacket) // there may be some inbound comms packets in the store that are awaitring processing
inboundFromStore := make(chan packets.ControlPacket) // there may be some inbound comms packets in the store that are awaiting processing
if c.startCommsWorkers(conn, inboundFromStore) {
// Take care of any messages in the store
if !c.options.CleanSession {
Expand All @@ -286,7 +286,7 @@ func (c *client) Connect() Token {
func (c *client) reconnect() {
DEBUG.Println(CLI, "enter reconnect")
var (
sleep = time.Duration(1 * time.Second)
sleep = 1 * time.Second
conn net.Conn
)

Expand Down Expand Up @@ -323,7 +323,7 @@ func (c *client) reconnect() {
return
}

inboundFromStore := make(chan packets.ControlPacket) // there may be some inbound comms packets in the store that are awaitring processing
inboundFromStore := make(chan packets.ControlPacket) // there may be some inbound comms packets in the store that are awaiting processing
if c.startCommsWorkers(conn, inboundFromStore) {
c.resume(c.options.ResumeSubs, inboundFromStore)
}
Expand Down Expand Up @@ -391,7 +391,7 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
// Maintain same error format as used previously
if rc != packets.ErrNetworkError { // mqtt error
err = packets.ConnErrors[rc]
} else { // network error (if this occured in ConnectMQTT then err will be nil)
} else { // network error (if this occurred in ConnectMQTT then err will be nil)
err = fmt.Errorf("%s : %s", packets.ConnErrors[rc], err)
}
}
Expand Down Expand Up @@ -476,7 +476,7 @@ func (c *client) internalConnLost(err error) {
}
}

// startCommsWorkers is called when the connection is up. It starts off all of the routines needed to process incomming and
// startCommsWorkers is called when the connection is up. It starts off all of the routines needed to process incoming and
// outgoing messages.
// Returns true if the comms workers were started (i.e. they were not already running)
func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packets.ControlPacket) bool {
Expand Down Expand Up @@ -514,9 +514,9 @@ func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packet
// c.oboundP and c.obound need to stay active for the life of the client because, depending upon the options,
// messages may be published while the client is disconnected (they will block unless in a goroutine). However
// to keep the comms routines clean we want to shutdown the input messages it uses so create out own channels
// and copy data accross.
// and copy data across.
commsobound := make(chan *PacketAndToken) // outgoing publish packets
commsoboundP := make(chan *PacketAndToken) // outgoing 'priotity' packet
commsoboundP := make(chan *PacketAndToken) // outgoing 'priority' packet
c.workers.Add(1)
go func() {
defer c.workers.Done()
Expand All @@ -542,7 +542,7 @@ func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packet
}
close(commsoboundP) // Nothing sending to these channels anymore so close them and allow comms routines to exit
close(commsobound)
DEBUG.Println(CLI, "startCommsWorkers output redirector finnished")
DEBUG.Println(CLI, "startCommsWorkers output redirector finished")
return
}
}
Expand Down Expand Up @@ -595,7 +595,7 @@ func (c *client) stopCommsWorkers() chan struct{} {
}

// It is important that everything is stopped in the correct order to avoid deadlocks. The main issue here is
// the router because it both receives incomming publish messages and also sends outgoing acknowledgements. To
// the router because it both receives incoming publish messages and also sends outgoing acknowledgements. To
// avoid issues we signal the workers to stop and close the connection (it is probably already closed but
// there is no harm in being sure). We can then wait for the workers to finnish before closing outbound comms
// channels which will allow the comms routines to exit.
Expand All @@ -604,7 +604,7 @@ func (c *client) stopCommsWorkers() chan struct{} {
close(c.stop) // Signal for workers to stop
c.conn.Close() // Possible that this is already closed but no harm in closing again
c.conn = nil // Important that this is the only place that this is set to nil
c.connMu.Unlock() // As the conection is now nil we can unlock the mu (allowing subsequent calls to exit immediately)
c.connMu.Unlock() // As the connection is now nil we can unlock the mu (allowing subsequent calls to exit immediately)

doneChan := make(chan struct{})

Expand Down
2 changes: 1 addition & 1 deletion cmd/custom_store/main.go
Expand Up @@ -28,7 +28,7 @@ import (
// This NoOpStore type implements the go-mqtt/Store interface, which
// allows it to be used by the go-mqtt client library. However, it is
// highly recommended that you do not use this NoOpStore in production,
// because it will NOT provide any sort of guaruntee of message delivery.
// because it will NOT provide any sort of guarantee of message delivery.
type NoOpStore struct {
// Contain nothing
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/docker/binds/readme.md
@@ -1,7 +1,7 @@
Binds
=====

Folders within this folder will be bound to folders within the containers; this includes the mosquitto
configuration and persistence files.
Folders within this folder will be bound to folders within the containers; this includes the mosquitto configuration and
persistence files.

Note: While testing some files within these folders may become large and should be deleted when appropriate.
10 changes: 5 additions & 5 deletions cmd/docker/readme.md
Expand Up @@ -6,12 +6,12 @@ using docker (ideally `docker-compose`). While it provides an end-to-end example
starting point for producing reproducible examples (when logging an issue with the library).

Because the publisher (`pub`), broker (`mosquitto`) and subscriber (`sub`) run in separate containers this setup closely
simulates a real deployment. One thing to bear in mind is that the network between the containers is very fast and
simulates a real deployment. One thing to bear in mind is that the network between the containers is very fast and
reliable (but there are some techniques that can be used to simulate failures etc).

# Usage

Ensure that you have [docker](https://docs.docker.com/get-docker/) and
Ensure that you have [docker](https://docs.docker.com/get-docker/) and
[docker-compose](https://docs.docker.com/compose/install/) installed.

To start everything up change into the `cmd/docker` folder and run:
Expand Down Expand Up @@ -43,9 +43,9 @@ docker-compose down

Feel free to copy the folder and modify the publisher/subscriber to work as you want them to!

Note: The `pub` and `sub` containers connect to mosquitto via the internal network (`test-net`) but mosquitto
should also be available on the host port `8883` if you wish to connect to it. This will not work if you have
mosquitto installed locally (edit the `docker-compose.yml` and change the `published` port).
Note: The `pub` and `sub` containers connect to mosquitto via the internal network (`test-net`) but mosquitto should
also be available on the host port `8883` if you wish to connect to it. This will not work if you have mosquitto
installed locally (edit the `docker-compose.yml` and change the `published` port).

# Simulating Network Connection Loss

Expand Down
6 changes: 3 additions & 3 deletions cmd/stdinpub/main.go
Expand Up @@ -20,7 +20,7 @@ import (
"flag"
"fmt"
"io"
//"log"
// "log"
"os"
"strconv"
"time"
Expand All @@ -29,8 +29,8 @@ import (
)

func main() {
//MQTT.DEBUG = log.New(os.Stdout, "", 0)
//MQTT.ERROR = log.New(os.Stdout, "", 0)
// MQTT.DEBUG = log.New(os.Stdout, "", 0)
// MQTT.ERROR = log.New(os.Stdout, "", 0)
stdin := bufio.NewReader(os.Stdin)
hostname, _ := os.Hostname()

Expand Down
6 changes: 3 additions & 3 deletions cmd/stdoutsub/main.go
Expand Up @@ -18,7 +18,7 @@ import (
"crypto/tls"
"flag"
"fmt"
//"log"
// "log"
"os"
"os/signal"
"strconv"
Expand All @@ -33,8 +33,8 @@ func onMessageReceived(client MQTT.Client, message MQTT.Message) {
}

func main() {
//MQTT.DEBUG = log.New(os.Stdout, "", 0)
//MQTT.ERROR = log.New(os.Stdout, "", 0)
// MQTT.DEBUG = log.New(os.Stdout, "", 0)
// MQTT.ERROR = log.New(os.Stdout, "", 0)
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)

Expand Down
8 changes: 4 additions & 4 deletions fvt_client_test.go
Expand Up @@ -1061,9 +1061,9 @@ func Test_cleanUpMids(t *testing.T) {
// - If the transmit succeeds (regardless of whether the handshake completes then no error is generated)
// If the intention is that an error should always be returned if the publish is incomplete upon disconnedt then
// internalConnLost needs to be altered (if c.options.CleanSession && !c.options.AutoReconnect)
//if token.Error() == nil {
//t.Fatal("token should have received an error on connection loss")
//}
// if token.Error() == nil {
// t.Fatal("token should have received an error on connection loss")
// }
fmt.Println(token.Error())

c.Disconnect(250)
Expand Down Expand Up @@ -1319,7 +1319,7 @@ func Test_ResumeSubsWithReconnect(t *testing.T) {
DEBUG.Println(CLI, sub.String())

persistOutbound(c.(*client).persist, sub)
//subToken := c.Subscribe(topic, qos, nil)
// subToken := c.Subscribe(topic, qos, nil)
c.(*client).internalConnLost(fmt.Errorf("reconnection subscription test"))

// As reconnect is enabled the client should automatically reconnect
Expand Down
4 changes: 2 additions & 2 deletions message.go
Expand Up @@ -16,9 +16,9 @@ package mqtt

import (
"net/url"
"sync"

"github.com/eclipse/paho.mqtt.golang/packets"
"sync"
)

// Message defines the externals that a message implementation must support
Expand Down Expand Up @@ -114,7 +114,7 @@ func newConnectMsgFromOptions(options *ClientOptions, broker *url.URL) *packets.
if username != "" {
m.UsernameFlag = true
m.Username = username
//mustn't have password without user as well
// mustn't have password without user as well
if password != "" {
m.PasswordFlag = true
m.Password = []byte(password)
Expand Down
2 changes: 1 addition & 1 deletion messageids.go
Expand Up @@ -29,7 +29,7 @@ type messageIds struct {
sync.RWMutex
index map[uint16]tokenCompletor

lastIssuedID uint16 // The most recently issued ID. Used so we cycle through ids rather than immediatly reusing them (can make debugging easier)
lastIssuedID uint16 // The most recently issued ID. Used so we cycle through ids rather than immediately reusing them (can make debugging easier)
}

const (
Expand Down
12 changes: 6 additions & 6 deletions net.go
Expand Up @@ -96,7 +96,7 @@ func verifyCONNACK(conn io.Reader) (byte, bool, error) {
return msg.ReturnCode, msg.SessionPresent, nil
}

// inbound encapuslates the output from startIncoming.
// inbound encapsulates the output from startIncoming.
// err - If != nil then an error has occurred
// cp - A control packet received over the network link
type inbound struct {
Expand All @@ -105,7 +105,7 @@ type inbound struct {
}

// startIncoming initiates a goroutine that reads incoming messages off the wire and sends them to the channel (returned).
// If there are any issues with the network connection then the returned cahnnel will be closed and the goroutine will exit
// If there are any issues with the network connection then the returned channel will be closed and the goroutine will exit
// (so closing the connection will terminate the goroutine)
func startIncoming(conn io.Reader) <-chan inbound {
var err error
Expand Down Expand Up @@ -135,7 +135,7 @@ func startIncoming(conn io.Reader) <-chan inbound {
return ibound
}

// incomingComms encapuslates the possible output of the incomingComms routine. If err != nil then an error has occurred and
// incomingComms encapsulates the possible output of the incomingComms routine. If err != nil then an error has occurred and
// the routine will have terminated; otherwise one of the other members should be non-nil
type incomingComms struct {
err error // If non-nil then there has been an error (ignore everything else)
Expand All @@ -145,7 +145,7 @@ type incomingComms struct {

// startIncomingComms initiates incoming communications; this includes starting a goroutine to process incoming
// messages.
// Accepts a channel of inbound messages from the store (persistanced messages); note this must be closed as soon as the
// Accepts a channel of inbound messages from the store (persisted messages); note this must be closed as soon as the
// everything in the store has been sent.
// Returns a channel that will be passed any received packets; this will be closed on a network error (and inboundFromStore closed)
func startIncomingComms(conn io.Reader,
Expand Down Expand Up @@ -242,7 +242,7 @@ func startIncomingComms(conn io.Reader,
return output
}

// startOutgoingComms initiates a go routint to transmit outgoing packets.
// startOutgoingComms initiates a go routine to transmit outgoing packets.
// Pass in an open network connection and channels for outbound messages (including those triggered
// directly from incoming comms).
// Returns a channel that will receive details of any errors (closed when the goroutine exits)
Expand Down Expand Up @@ -388,7 +388,7 @@ func startComms(conn net.Conn, // Network connection (must be active)
oboundErr := startOutgoingComms(conn, c, oboundp, obound, outboundFromIncoming)
DEBUG.Println(NET, "startComms started")

// Run up go routines to handle the output from the above comms functions - these are handled in seperate
// Run up go routines to handle the output from the above comms functions - these are handled in separate
// go routines because they can interact (e.g. ibound triggers an ACK to obound which triggers an error)
var wg sync.WaitGroup
wg.Add(2)
Expand Down
2 changes: 1 addition & 1 deletion netconn.go
Expand Up @@ -87,5 +87,5 @@ func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, heade

return tlsConn, nil
}
return nil, errors.New("Unknown protocol")
return nil, errors.New("unknown protocol")
}
6 changes: 3 additions & 3 deletions options.go
Expand Up @@ -198,7 +198,7 @@ func (o *ClientOptions) SetCredentialsProvider(p CredentialsProvider) *ClientOpt
// when this client connects to an MQTT broker. By setting this flag, you are
// indicating that no messages saved by the broker for this client should be
// delivered. Any messages that were going to be sent by this client before
// diconnecting previously but didn't will not be sent upon connecting to the
// disconnecting previously but didn't will not be sent upon connecting to the
// broker.
func (o *ClientOptions) SetCleanSession(clean bool) *ClientOptions {
o.CleanSession = clean
Expand Down Expand Up @@ -323,7 +323,7 @@ func (o *ClientOptions) SetWriteTimeout(t time.Duration) *ClientOptions {
}

// SetConnectTimeout limits how long the client will wait when trying to open a connection
// to an MQTT server before timing out and erroring the attempt. A duration of 0 never times out.
// to an MQTT server before timing out. A duration of 0 never times out.
// Default 30 seconds. Currently only operational on TCP/TLS connections.
func (o *ClientOptions) SetConnectTimeout(t time.Duration) *ClientOptions {
o.ConnectTimeout = t
Expand Down Expand Up @@ -356,7 +356,7 @@ func (o *ClientOptions) SetConnectRetryInterval(t time.Duration) *ClientOptions
// in the event of a failure (when true the token returned by the Connect function will
// not complete until the connection is up or it is cancelled)
// If ConnectRetry is true then subscriptions should be requested in OnConnect handler
// Setting this to TRUE permits mesages to be published before the connection is established
// Setting this to TRUE permits messages to be published before the connection is established
func (o *ClientOptions) SetConnectRetry(a bool) *ClientOptions {
o.ConnectRetry = a
return o
Expand Down

0 comments on commit fbe4d12

Please sign in to comment.