Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add websocket options #418

Merged
merged 1 commit into from May 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion client.go
Expand Up @@ -352,7 +352,7 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
DEBUG.Println(CLI, "about to write new connect msg")
CONN:
// Start by opening the network connection (tcp, tls, ws) etc
conn, err = openConnection(broker, c.options.TLSConfig, c.options.ConnectTimeout, c.options.HTTPHeaders)
conn, err = openConnection(broker, c.options.TLSConfig, c.options.ConnectTimeout, c.options.HTTPHeaders, c.options.WebsocketOptions)
if err != nil {
ERROR.Println(CLI, err.Error())
WARN.Println(CLI, "failed to connect to broker, trying next")
Expand Down
6 changes: 3 additions & 3 deletions netconn.go
Expand Up @@ -31,13 +31,13 @@ import (
//

// openConnection opens a network connection using the protocol indicated in the URL. Does not carry out any MQTT specific handshakes
func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, headers http.Header) (net.Conn, error) {
func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, headers http.Header, websocketOptions *WebsocketOptions) (net.Conn, error) {
switch uri.Scheme {
case "ws":
conn, err := NewWebsocket(uri.String(), nil, timeout, headers)
conn, err := NewWebsocket(uri.String(), nil, timeout, headers, websocketOptions)
return conn, err
case "wss":
conn, err := NewWebsocket(uri.String(), tlsc, timeout, headers)
conn, err := NewWebsocket(uri.String(), tlsc, timeout, headers, websocketOptions)
return conn, err
case "mqtt", "tcp":
allProxy := os.Getenv("all_proxy")
Expand Down
8 changes: 8 additions & 0 deletions options.go
Expand Up @@ -83,6 +83,7 @@ type ClientOptions struct {
MessageChannelDepth uint
ResumeSubs bool
HTTPHeaders http.Header
WebsocketOptions *WebsocketOptions
}

// NewClientOptions will create a new ClientClientOptions type with some
Expand Down Expand Up @@ -122,6 +123,7 @@ func NewClientOptions() *ClientOptions {
WriteTimeout: 0, // 0 represents timeout disabled
ResumeSubs: false,
HTTPHeaders: make(map[string][]string),
WebsocketOptions: &WebsocketOptions{},
}
return o
}
Expand Down Expand Up @@ -372,3 +374,9 @@ func (o *ClientOptions) SetHTTPHeaders(h http.Header) *ClientOptions {
o.HTTPHeaders = h
return o
}

// SetWebsocketOptions sets the additional websocket options used in a WebSocket connection
func (o *ClientOptions) SetWebsocketOptions(w *WebsocketOptions) *ClientOptions {
o.WebsocketOptions = w
return o
}
6 changes: 6 additions & 0 deletions options_reader.go
Expand Up @@ -159,3 +159,9 @@ func (r *ClientOptionsReader) HTTPHeaders() http.Header {
h := r.options.HTTPHeaders
return h
}

// WebsocketOptions returns the currently configured WebSocket options
func (r *ClientOptionsReader) WebsocketOptions() *WebsocketOptions {
s := r.options.WebsocketOptions
return s
}
16 changes: 15 additions & 1 deletion websocket.go
Expand Up @@ -11,19 +11,33 @@ import (
"github.com/gorilla/websocket"
)

// WebsocketOptions are config options for a websocket dialer
type WebsocketOptions struct {
ReadBufferSize int
WriteBufferSize int
}

// NewWebsocket returns a new websocket and returns a net.Conn compatible interface using the gorilla/websocket package
func NewWebsocket(host string, tlsc *tls.Config, timeout time.Duration, requestHeader http.Header) (net.Conn, error) {
func NewWebsocket(host string, tlsc *tls.Config, timeout time.Duration, requestHeader http.Header, options *WebsocketOptions) (net.Conn, error) {
if timeout == 0 {
timeout = 10 * time.Second
}

if options == nil {
// Apply default options
options = &WebsocketOptions{}
}

dialer := &websocket.Dialer{
Proxy: http.ProxyFromEnvironment,
HandshakeTimeout: timeout,
EnableCompression: false,
TLSClientConfig: tlsc,
Subprotocols: []string{"mqtt"},
ReadBufferSize: options.ReadBufferSize,
WriteBufferSize: options.WriteBufferSize,
}

ws, _, err := dialer.Dial(host, requestHeader)

if err != nil {
Expand Down