Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for http/https proxy when using MQTT over websockets #322

Merged
merged 4 commits into from Jun 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 6 additions & 2 deletions README.md
Expand Up @@ -22,11 +22,11 @@ This client is designed to work with the standard Go tools, so installation is a
go get github.com/eclipse/paho.mqtt.golang
```

The client depends on Google's [websockets](https://godoc.org/golang.org/x/net/websocket) and [proxy](https://godoc.org/golang.org/x/net/proxy) package,
The client depends on Google's [proxy](https://godoc.org/golang.org/x/net/proxy) package and the [websockets](https://godoc.org/github.com/gorilla/websocket) package,
also easily installed with the commands:

```
go get golang.org/x/net/websocket
go get github.com/gorilla/websocket
go get golang.org/x/net/proxy
```

Expand All @@ -44,6 +44,10 @@ import "github.com/eclipse/paho.mqtt.golang"

Samples are available in the `cmd` directory for reference.

Note:

The library also supports using MQTT over websockets by using the `ws://` (unsecure) or `wss://` (secure) prefix in the URI. If the client is running behind a corporate http/https proxy then the following environment variables `HTTP_PROXY`, `HTTPS_PROXY` and `NO_PROXY` are taken into account when establishing the connection.


Runtime tracing
---------------
Expand Down
23 changes: 2 additions & 21 deletions net.go
Expand Up @@ -17,7 +17,6 @@ package mqtt
import (
"crypto/tls"
"errors"
"fmt"
"net"
"net/http"
"net/url"
Expand All @@ -28,7 +27,6 @@ import (

"github.com/eclipse/paho.mqtt.golang/packets"
"golang.org/x/net/proxy"
"golang.org/x/net/websocket"
)

func signalError(c chan<- error, err error) {
Expand All @@ -41,27 +39,10 @@ func signalError(c chan<- error, err error) {
func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, headers http.Header) (net.Conn, error) {
switch uri.Scheme {
case "ws":
config, _ := websocket.NewConfig(uri.String(), fmt.Sprintf("http://%s", uri.Host))
config.Protocol = []string{"mqtt"}
config.Header = headers
config.Dialer = &net.Dialer{Timeout: timeout}
conn, err := websocket.DialConfig(config)
if err != nil {
return nil, err
}
conn.PayloadType = websocket.BinaryFrame
conn, err := NewWebsocket(uri.String(), nil, timeout, headers)
return conn, err
case "wss":
config, _ := websocket.NewConfig(uri.String(), fmt.Sprintf("https://%s", uri.Host))
config.Protocol = []string{"mqtt"}
config.TlsConfig = tlsc
config.Header = headers
config.Dialer = &net.Dialer{Timeout: timeout}
conn, err := websocket.DialConfig(config)
if err != nil {
return nil, err
}
conn.PayloadType = websocket.BinaryFrame
conn, err := NewWebsocket(uri.String(), tlsc, timeout, headers)
return conn, err
case "tcp":
allProxy := os.Getenv("all_proxy")
Expand Down
95 changes: 95 additions & 0 deletions websocket.go
@@ -0,0 +1,95 @@
package mqtt

import (
"crypto/tls"
"io"
"net"
"net/http"
"sync"
"time"

"github.com/gorilla/websocket"
)

// NewWebsocket returns a new websocket and returns a net.Conn compatiable interface using the gorilla/websocket package
func NewWebsocket(host string, tlsc *tls.Config, timeout time.Duration, requestHeader http.Header) (net.Conn, error) {
if timeout == 0 {
timeout = 10 * time.Second
}

dialer := &websocket.Dialer{
Proxy: http.ProxyFromEnvironment,
HandshakeTimeout: timeout,
EnableCompression: false,
TLSClientConfig: tlsc,
Subprotocols: []string{"mqtt"},
}
ws, _, err := dialer.Dial(host, requestHeader)

if err != nil {
panic(err)
}

wrapper := &websocketConnector{
Conn: ws,
}
return wrapper, err
}

// websocketConnector is a websocket wrapper so it satisfies the net.Conn interface so it is a
// drop in replacement of the golang.org/x/net/websocket package.
// Implementation guide taken from https://github.com/gorilla/websocket/issues/282
type websocketConnector struct {
*websocket.Conn
r io.Reader
rio sync.Mutex
wio sync.Mutex
}

// SetDeadline sets both the read and write deadlines
func (c *websocketConnector) SetDeadline(t time.Time) error {
if err := c.SetReadDeadline(t); err != nil {
return err
}
err := c.SetWriteDeadline(t)
return err
}

// Write writes data to the websocket
func (c *websocketConnector) Write(p []byte) (int, error) {
c.wio.Lock()
defer c.wio.Unlock()

err := c.WriteMessage(websocket.BinaryMessage, p)
if err != nil {
return 0, err
}
return len(p), nil
}

// Read reads the current websocket frame
func (c *websocketConnector) Read(p []byte) (int, error) {
c.rio.Lock()
defer c.rio.Unlock()
for {
if c.r == nil {
// Advance to next message.
var err error
_, c.r, err = c.NextReader()
if err != nil {
return 0, err
}
}
n, err := c.r.Read(p)
if err == io.EOF {
// At end of message.
c.r = nil
if n > 0 {
return n, nil
}
// No data read, continue to next message.
continue
}
return n, err
}
}