Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Code consitency fixes #407

Merged
merged 8 commits into from Apr 30, 2020
10 changes: 5 additions & 5 deletions client.go
Expand Up @@ -198,12 +198,12 @@ func (c *client) connectionStatus() uint32 {
func (c *client) setConnected(status uint32) {
c.Lock()
defer c.Unlock()
atomic.StoreUint32(&c.status, uint32(status))
atomic.StoreUint32(&c.status, status)
}

//ErrNotConnected is the error returned from function calls that are
//made when the client is not connected to a broker
var ErrNotConnected = errors.New("Not Connected")
var ErrNotConnected = errors.New("not Connected")

// Connect will create a connection to the message broker, by default
// it will attempt to connect at v3.1.1 and auto retry at v3.1 if that
Expand Down Expand Up @@ -241,7 +241,7 @@ func (c *client) Connect() Token {
protocolVersion := c.options.ProtocolVersion

if len(c.options.Servers) == 0 {
t.setError(fmt.Errorf("No servers defined to connect to"))
t.setError(fmt.Errorf("no servers defined to connect to"))
return
}

Expand Down Expand Up @@ -376,7 +376,7 @@ func (c *client) reconnect() {
err error

rc = byte(1)
sleep = time.Duration(1 * time.Second)
sleep = 1 * time.Second
)

for rc != 0 && atomic.LoadUint32(&c.status) != disconnected {
Expand Down Expand Up @@ -632,7 +632,7 @@ func (c *client) Publish(topic string, qos byte, retained bool, payload interfac
case bytes.Buffer:
pub.Payload = p.Bytes()
default:
token.setError(fmt.Errorf("Unknown payload type"))
token.setError(fmt.Errorf("unknown payload type"))
return token
}

Expand Down
4 changes: 2 additions & 2 deletions fvt_client_test.go
Expand Up @@ -1013,7 +1013,7 @@ func Test_autoreconnect(t *testing.T) {
time.Sleep(5 * time.Second)

fmt.Println("Breaking connection")
c.(*client).internalConnLost(fmt.Errorf("Autoreconnect test"))
c.(*client).internalConnLost(fmt.Errorf("autoreconnect test"))

time.Sleep(5 * time.Second)
if !c.IsConnected() {
Expand Down Expand Up @@ -1323,7 +1323,7 @@ func Test_ResumeSubsWithReconnect(t *testing.T) {

persistOutbound(c.(*client).persist, sub)
//subToken := c.Subscribe(topic, qos, nil)
c.(*client).internalConnLost(fmt.Errorf("Reconnection subscription test"))
c.(*client).internalConnLost(fmt.Errorf("reconnection subscription test"))

// As reconnect is enabled the client should automatically reconnect
subDone := make(chan bool)
Expand Down
2 changes: 1 addition & 1 deletion memstore.go
Expand Up @@ -88,7 +88,7 @@ func (store *MemoryStore) All() []string {
ERROR.Println(STR, "Trying to use memory store, but not open")
return nil
}
keys := []string{}
var keys []string
for k := range store.messages {
keys = append(keys, k)
}
Expand Down
8 changes: 4 additions & 4 deletions messageids.go
Expand Up @@ -40,11 +40,11 @@ func (mids *messageIds) cleanUp() {
for _, token := range mids.index {
switch token.(type) {
case *PublishToken:
token.setError(fmt.Errorf("Connection lost before Publish completed"))
token.setError(fmt.Errorf("connection lost before Publish completed"))
case *SubscribeToken:
token.setError(fmt.Errorf("Connection lost before Subscribe completed"))
token.setError(fmt.Errorf("connection lost before Subscribe completed"))
case *UnsubscribeToken:
token.setError(fmt.Errorf("Connection lost before Unsubscribe completed"))
token.setError(fmt.Errorf("connection lost before Unsubscribe completed"))
case nil:
continue
}
Expand Down Expand Up @@ -114,7 +114,7 @@ func (d *DummyToken) Error() error {
return nil
}

func (p *DummyToken) setError(e error) {}
func (d *DummyToken) setError(e error) {}

// PlaceHolderToken does nothing and was implemented to allow a messageid to be reserved
// it differs from DummyToken in that calling flowComplete does not generate an error (it
Expand Down
2 changes: 1 addition & 1 deletion net.go
Expand Up @@ -96,7 +96,7 @@ func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, heade

return tlsConn, nil
}
return nil, errors.New("Unknown protocol")
return nil, errors.New("unknown protocol")
}

// actually read incoming messages off the wire
Expand Down
22 changes: 11 additions & 11 deletions packets/packets.go
Expand Up @@ -85,13 +85,13 @@ var ConnackReturnCodes = map[uint8]string{
//to a Go error
var ConnErrors = map[byte]error{
Accepted: nil,
ErrRefusedBadProtocolVersion: errors.New("Unnacceptable protocol version"),
ErrRefusedIDRejected: errors.New("Identifier rejected"),
ErrRefusedServerUnavailable: errors.New("Server Unavailable"),
ErrRefusedBadUsernameOrPassword: errors.New("Bad user name or password"),
ErrRefusedNotAuthorised: errors.New("Not Authorized"),
ErrNetworkError: errors.New("Network Error"),
ErrProtocolViolation: errors.New("Protocol Violation"),
ErrRefusedBadProtocolVersion: errors.New("unnacceptable protocol version"),
ErrRefusedIDRejected: errors.New("identifier rejected"),
ErrRefusedServerUnavailable: errors.New("server Unavailable"),
ErrRefusedBadUsernameOrPassword: errors.New("bad user name or password"),
ErrRefusedNotAuthorised: errors.New("not Authorized"),
ErrNetworkError: errors.New("network Error"),
ErrProtocolViolation: errors.New("protocol Violation"),
}

//ReadPacket takes an instance of an io.Reader (such as net.Conn) and attempts
Expand Down Expand Up @@ -123,7 +123,7 @@ func ReadPacket(r io.Reader) (ControlPacket, error) {
return nil, err
}
if n != fh.RemainingLength {
return nil, errors.New("Failed to read expected data")
return nil, errors.New("failed to read expected data")
}

err = cp.Unpack(bytes.NewBuffer(packetBytes))
Expand Down Expand Up @@ -274,9 +274,9 @@ func decodeUint16(b io.Reader) (uint16, error) {
}

func encodeUint16(num uint16) []byte {
bytes := make([]byte, 2)
binary.BigEndian.PutUint16(bytes, num)
return bytes
bytesResult := make([]byte, 2)
binary.BigEndian.PutUint16(bytesResult, num)
return bytesResult
}

func encodeString(field string) []byte {
Expand Down
22 changes: 11 additions & 11 deletions packets/packets_test.go
Expand Up @@ -203,9 +203,9 @@ func TestEncoding(t *testing.T) {
}

strings := map[string][]byte{
"foo": []byte{0x00, 0x03, 'f', 'o', 'o'},
"\U0000FEFF": []byte{0x00, 0x03, 0xEF, 0xBB, 0xBF},
"A\U0002A6D4": []byte{0x00, 0x05, 'A', 0xF0, 0xAA, 0x9B, 0x94},
"foo": {0x00, 0x03, 'f', 'o', 'o'},
"\U0000FEFF": {0x00, 0x03, 0xEF, 0xBB, 0xBF},
"A\U0002A6D4": {0x00, 0x05, 'A', 0xF0, 0xAA, 0x9B, 0x94},
}
for str, encoded := range strings {
if res, err := decodeString(bytes.NewBuffer(encoded)); res != str || err != nil {
Expand All @@ -217,14 +217,14 @@ func TestEncoding(t *testing.T) {
}

lengths := map[int][]byte{
0: []byte{0x00},
127: []byte{0x7F},
128: []byte{0x80, 0x01},
16383: []byte{0xFF, 0x7F},
16384: []byte{0x80, 0x80, 0x01},
2097151: []byte{0xFF, 0xFF, 0x7F},
2097152: []byte{0x80, 0x80, 0x80, 0x01},
268435455: []byte{0xFF, 0xFF, 0xFF, 0x7F},
0: {0x00},
127: {0x7F},
128: {0x80, 0x01},
16383: {0xFF, 0x7F},
16384: {0x80, 0x80, 0x01},
2097151: {0xFF, 0xFF, 0x7F},
2097152: {0x80, 0x80, 0x80, 0x01},
268435455: {0xFF, 0xFF, 0xFF, 0x7F},
}
for length, encoded := range lengths {
if res, err := decodeLength(bytes.NewBuffer(encoded)); res != length || err != nil {
Expand Down
2 changes: 1 addition & 1 deletion packets/publish.go
Expand Up @@ -56,7 +56,7 @@ func (p *PublishPacket) Unpack(b io.Reader) error {
payloadLength -= len(p.TopicName) + 2
}
if payloadLength < 0 {
return fmt.Errorf("Error unpacking publish, payload length < 0")
return fmt.Errorf("error unpacking publish, payload length < 0")
}
p.Payload = make([]byte, payloadLength)
_, err = b.Read(p.Payload)
Expand Down
2 changes: 1 addition & 1 deletion router.go
Expand Up @@ -141,7 +141,7 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
sent := false
r.RLock()
m := messageFromPublish(message, client.ackFunc(message))
handlers := []MessageHandler{}
var handlers []MessageHandler
for e := r.routes.Front(); e != nil; e = e.Next() {
if e.Value.(*route).match(message.TopicName) {
if order {
Expand Down
8 changes: 4 additions & 4 deletions topic.go
Expand Up @@ -21,16 +21,16 @@ import (

//ErrInvalidQos is the error returned when an packet is to be sent
//with an invalid Qos value
var ErrInvalidQos = errors.New("Invalid QoS")
var ErrInvalidQos = errors.New("invalid QoS")

//ErrInvalidTopicEmptyString is the error returned when a topic string
//is passed in that is 0 length
var ErrInvalidTopicEmptyString = errors.New("Invalid Topic; empty string")
var ErrInvalidTopicEmptyString = errors.New("invalid Topic; empty string")

//ErrInvalidTopicMultilevel is the error returned when a topic string
//is passed in that has the multi level wildcard in any position but
//the last
var ErrInvalidTopicMultilevel = errors.New("Invalid Topic; multi-level wildcard must be last level")
var ErrInvalidTopicMultilevel = errors.New("invalid Topic; multi-level wildcard must be last level")

// Topic Names and Topic Filters
// The MQTT v3.1.1 spec clarifies a number of ambiguities with regard
Expand All @@ -51,7 +51,7 @@ var ErrInvalidTopicMultilevel = errors.New("Invalid Topic; multi-level wildcard

func validateSubscribeMap(subs map[string]byte) ([]string, []byte, error) {
if len(subs) == 0 {
return nil, nil, errors.New("Invalid subscription; subscribe map must not be empty")
return nil, nil, errors.New("invalid subscription; subscribe map must not be empty")
}

var topics []string
Expand Down
2 changes: 1 addition & 1 deletion unit_store_test.go
Expand Up @@ -46,7 +46,7 @@ func Test_exists_no(t *testing.T) {

func isemptydir(dir string) bool {
if !exists(dir) {
panic(fmt.Errorf("Directory %s does not exist", dir))
panic(fmt.Errorf("directory %s does not exist", dir))
}
files, err := ioutil.ReadDir(dir)
chkerr(err)
Expand Down