Skip to content

Commit

Permalink
Merge pull request #322 from reubenmiller/gorilla-websockets
Browse files Browse the repository at this point in the history
Adding support for http/https proxy when using MQTT over websockets
  • Loading branch information
alsm committed Jun 28, 2019
2 parents adca289 + 4f2b526 commit 0d81c29
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 23 deletions.
8 changes: 6 additions & 2 deletions README.md
Expand Up @@ -22,11 +22,11 @@ This client is designed to work with the standard Go tools, so installation is a
go get github.com/eclipse/paho.mqtt.golang
```

The client depends on Google's [websockets](https://godoc.org/golang.org/x/net/websocket) and [proxy](https://godoc.org/golang.org/x/net/proxy) package,
The client depends on Google's [proxy](https://godoc.org/golang.org/x/net/proxy) package and the [websockets](https://godoc.org/github.com/gorilla/websocket) package,
also easily installed with the commands:

```
go get golang.org/x/net/websocket
go get github.com/gorilla/websocket
go get golang.org/x/net/proxy
```

Expand All @@ -44,6 +44,10 @@ import "github.com/eclipse/paho.mqtt.golang"

Samples are available in the `cmd` directory for reference.

Note:

The library also supports using MQTT over websockets by using the `ws://` (unsecure) or `wss://` (secure) prefix in the URI. If the client is running behind a corporate http/https proxy then the following environment variables `HTTP_PROXY`, `HTTPS_PROXY` and `NO_PROXY` are taken into account when establishing the connection.


Runtime tracing
---------------
Expand Down
23 changes: 2 additions & 21 deletions net.go
Expand Up @@ -17,7 +17,6 @@ package mqtt
import (
"crypto/tls"
"errors"
"fmt"
"net"
"net/http"
"net/url"
Expand All @@ -28,7 +27,6 @@ import (

"github.com/eclipse/paho.mqtt.golang/packets"
"golang.org/x/net/proxy"
"golang.org/x/net/websocket"
)

func signalError(c chan<- error, err error) {
Expand All @@ -41,27 +39,10 @@ func signalError(c chan<- error, err error) {
func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, headers http.Header) (net.Conn, error) {
switch uri.Scheme {
case "ws":
config, _ := websocket.NewConfig(uri.String(), fmt.Sprintf("http://%s", uri.Host))
config.Protocol = []string{"mqtt"}
config.Header = headers
config.Dialer = &net.Dialer{Timeout: timeout}
conn, err := websocket.DialConfig(config)
if err != nil {
return nil, err
}
conn.PayloadType = websocket.BinaryFrame
conn, err := NewWebsocket(uri.String(), nil, timeout, headers)
return conn, err
case "wss":
config, _ := websocket.NewConfig(uri.String(), fmt.Sprintf("https://%s", uri.Host))
config.Protocol = []string{"mqtt"}
config.TlsConfig = tlsc
config.Header = headers
config.Dialer = &net.Dialer{Timeout: timeout}
conn, err := websocket.DialConfig(config)
if err != nil {
return nil, err
}
conn.PayloadType = websocket.BinaryFrame
conn, err := NewWebsocket(uri.String(), tlsc, timeout, headers)
return conn, err
case "tcp":
allProxy := os.Getenv("all_proxy")
Expand Down
95 changes: 95 additions & 0 deletions websocket.go
@@ -0,0 +1,95 @@
package mqtt

import (
"crypto/tls"
"io"
"net"
"net/http"
"sync"
"time"

"github.com/gorilla/websocket"
)

// NewWebsocket returns a new websocket and returns a net.Conn compatiable interface using the gorilla/websocket package
func NewWebsocket(host string, tlsc *tls.Config, timeout time.Duration, requestHeader http.Header) (net.Conn, error) {
if timeout == 0 {
timeout = 10 * time.Second
}

dialer := &websocket.Dialer{
Proxy: http.ProxyFromEnvironment,
HandshakeTimeout: timeout,
EnableCompression: false,
TLSClientConfig: tlsc,
Subprotocols: []string{"mqtt"},
}
ws, _, err := dialer.Dial(host, requestHeader)

if err != nil {
panic(err)
}

wrapper := &websocketConnector{
Conn: ws,
}
return wrapper, err
}

// websocketConnector is a websocket wrapper so it satisfies the net.Conn interface so it is a
// drop in replacement of the golang.org/x/net/websocket package.
// Implementation guide taken from https://github.com/gorilla/websocket/issues/282
type websocketConnector struct {
*websocket.Conn
r io.Reader
rio sync.Mutex
wio sync.Mutex
}

// SetDeadline sets both the read and write deadlines
func (c *websocketConnector) SetDeadline(t time.Time) error {
if err := c.SetReadDeadline(t); err != nil {
return err
}
err := c.SetWriteDeadline(t)
return err
}

// Write writes data to the websocket
func (c *websocketConnector) Write(p []byte) (int, error) {
c.wio.Lock()
defer c.wio.Unlock()

err := c.WriteMessage(websocket.BinaryMessage, p)
if err != nil {
return 0, err
}
return len(p), nil
}

// Read reads the current websocket frame
func (c *websocketConnector) Read(p []byte) (int, error) {
c.rio.Lock()
defer c.rio.Unlock()
for {
if c.r == nil {
// Advance to next message.
var err error
_, c.r, err = c.NextReader()
if err != nil {
return 0, err
}
}
n, err := c.r.Read(p)
if err == io.EOF {
// At end of message.
c.r = nil
if n > 0 {
return n, nil
}
// No data read, continue to next message.
continue
}
return n, err
}
}

0 comments on commit 0d81c29

Please sign in to comment.