From a460a63d3de718a5d403b14c2357156611333f08 Mon Sep 17 00:00:00 2001 From: Amir Khassaia Date: Thu, 22 Apr 2021 12:24:50 +1000 Subject: [PATCH 1/4] feat: HTTP connect proxy support fix: Add SNI for TLS connections if one is not set Signed-off-by: amir-khassaia --- http_proxy.go | 85 +++++++++++++++++++++++++++++++++++++++++++++++++++ netconn.go | 13 +++++++- 2 files changed, 97 insertions(+), 1 deletion(-) create mode 100644 http_proxy.go diff --git a/http_proxy.go b/http_proxy.go new file mode 100644 index 0000000..9a5448a --- /dev/null +++ b/http_proxy.go @@ -0,0 +1,85 @@ +package mqtt + +import ( + "bufio" + "fmt" + "net" + "net/http" + "net/url" + + "golang.org/x/net/proxy" +) + +// httpProxy is a HTTP/HTTPS connect capable proxy. +type httpProxy struct { + host string + haveAuth bool + username string + password string + forward proxy.Dialer +} + +func (s httpProxy) String() string { + return fmt.Sprintf("HTTP proxy dialer for %s", s.host) +} + +func newHTTPProxy(uri *url.URL, forward proxy.Dialer) (proxy.Dialer, error) { + s := new(httpProxy) + s.host = uri.Host + s.forward = forward + if uri.User != nil { + s.haveAuth = true + s.username = uri.User.Username() + s.password, _ = uri.User.Password() + } + + return s, nil +} + +func (s *httpProxy) Dial(_, addr string) (net.Conn, error) { + reqURL := url.URL{ + Scheme: "https", + Host: addr, + } + + req, err := http.NewRequest("CONNECT", reqURL.String(), nil) + if err != nil { + return nil, err + } + req.Close = false + if s.haveAuth { + req.SetBasicAuth(s.username, s.password) + } + req.Header.Set("User-Agent", "paho.mqtt") + + // Dial and create the client connection. + c, err := s.forward.Dial("tcp", s.host) + if err != nil { + return nil, err + } + + err = req.Write(c) + if err != nil { + _ = c.Close() + return nil, err + } + + resp, err := http.ReadResponse(bufio.NewReader(c), req) + if err != nil { + _ = c.Close() + return nil, err + } + _ = resp.Body.Close() + if resp.StatusCode != 200 { + _ = c.Close() + err = fmt.Errorf("connect server using proxy error, StatusCode [%d]", resp.StatusCode) + return nil, err + } + + return c, nil +} + +func init() { + proxy.RegisterDialerType("http", newHTTPProxy) + proxy.RegisterDialerType("https", newHTTPProxy) +} diff --git a/netconn.go b/netconn.go index 9f9f084..c602a39 100644 --- a/netconn.go +++ b/netconn.go @@ -77,7 +77,7 @@ func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, heade return nil, err } - tlsConn := tls.Client(conn, tlsc) + tlsConn := tls.Client(conn, tlsConfigWithSni(uri, tlsc)) err = tlsConn.Handshake() if err != nil { @@ -89,3 +89,14 @@ func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, heade } return nil, errors.New("unknown protocol") } + +func tlsConfigWithSni(uri *url.URL, conf *tls.Config) *tls.Config { + tlsConfig := conf + if tlsConfig.ServerName == "" { + // Ensure SNI is set appropriately - make a copy to avoid polluting argument or default. + c := tlsConfig.Clone() + c.ServerName = uri.Hostname() + tlsConfig = c + } + return tlsConfig +} From c39ca865033d60a4da73409fa8686e9343a5eb29 Mon Sep 17 00:00:00 2001 From: Amir Khassaia Date: Thu, 22 Apr 2021 12:31:33 +1000 Subject: [PATCH 2/4] refactor: status, error Signed-off-by: amir-khassaia --- http_proxy.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/http_proxy.go b/http_proxy.go index 9a5448a..a2304a0 100644 --- a/http_proxy.go +++ b/http_proxy.go @@ -70,10 +70,9 @@ func (s *httpProxy) Dial(_, addr string) (net.Conn, error) { return nil, err } _ = resp.Body.Close() - if resp.StatusCode != 200 { + if resp.StatusCode != http.StatusOK { _ = c.Close() - err = fmt.Errorf("connect server using proxy error, StatusCode [%d]", resp.StatusCode) - return nil, err + return nil, fmt.Errorf("proxied connection returned an error: %v", resp.Status) } return c, nil From 3eadf6cece581004eb3f20b5c99b22e5468a9a96 Mon Sep 17 00:00:00 2001 From: Amir Khassaia Date: Thu, 22 Apr 2021 15:16:44 +1000 Subject: [PATCH 3/4] refactor: post review discussion, move http proxy handling to a sample app to keep the library simple refactor: revert SNI handling and add a pre connect hook instead to keep the library clean and to allow the clients to customize the final tls config that will be in use for connecting Signed-off-by: amir-khassaia --- client.go | 13 ++- http_proxy.go => cmd/httpproxy/httpproxy.go | 7 +- cmd/httpproxy/main.go | 108 ++++++++++++++++++++ netconn.go | 18 +--- options.go | 12 +++ 5 files changed, 134 insertions(+), 24 deletions(-) rename http_proxy.go => cmd/httpproxy/httpproxy.go (92%) create mode 100644 cmd/httpproxy/main.go diff --git a/client.go b/client.go index 93b5dbe..5309747 100644 --- a/client.go +++ b/client.go @@ -379,8 +379,13 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) { cm := newConnectMsgFromOptions(&c.options, broker) DEBUG.Println(CLI, "about to write new connect msg") CONN: + tlsCfg := c.options.TLSConfig + if c.options.OnConnectAttempt != nil { + DEBUG.Println(CLI, "using custom onConnectAttempt handler...") + tlsCfg = c.options.OnConnectAttempt(broker, c.options.TLSConfig) + } // Start by opening the network connection (tcp, tls, ws) etc - conn, err = openConnection(broker, c.options.TLSConfig, c.options.ConnectTimeout, c.options.HTTPHeaders, c.options.WebsocketOptions) + conn, err = openConnection(broker, tlsCfg, c.options.ConnectTimeout, c.options.HTTPHeaders, c.options.WebsocketOptions) if err != nil { ERROR.Println(CLI, err.Error()) WARN.Println(CLI, "failed to connect to broker, trying next") @@ -397,7 +402,7 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) { // We may be have to attempt the connection with MQTT 3.1 if conn != nil { - conn.Close() + _ = conn.Close() } if !c.options.protocolVersionExplicit && protocolVersion == 4 { // try falling back to 3.1? DEBUG.Println(CLI, "Trying reconnect using MQTT 3.1 protocol") @@ -504,8 +509,8 @@ func (c *client) internalConnLost(err error) { } } -// startCommsWorkers is called when the connection is up. It starts off all of the routines needed to process incoming and -// outgoing messages. +// startCommsWorkers is called when the connection is up. +// It starts off all of the routines needed to process incoming and outgoing messages. // Returns true if the comms workers were started (i.e. they were not already running) func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packets.ControlPacket) bool { DEBUG.Println(CLI, "startCommsWorkers called") diff --git a/http_proxy.go b/cmd/httpproxy/httpproxy.go similarity index 92% rename from http_proxy.go rename to cmd/httpproxy/httpproxy.go index a2304a0..55f4d56 100644 --- a/http_proxy.go +++ b/cmd/httpproxy/httpproxy.go @@ -1,4 +1,4 @@ -package mqtt +package main import ( "bufio" @@ -77,8 +77,3 @@ func (s *httpProxy) Dial(_, addr string) (net.Conn, error) { return c, nil } - -func init() { - proxy.RegisterDialerType("http", newHTTPProxy) - proxy.RegisterDialerType("https", newHTTPProxy) -} diff --git a/cmd/httpproxy/main.go b/cmd/httpproxy/main.go new file mode 100644 index 0000000..a18bb03 --- /dev/null +++ b/cmd/httpproxy/main.go @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2013 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Seth Hoenig + * Allan Stockdill-Mander + * Mike Robertson + */ + +package main + +import ( + "crypto/tls" + "flag" + "fmt" + "golang.org/x/net/proxy" + "log" + "net/url" + + // "log" + "os" + "os/signal" + "strconv" + "syscall" + "time" + + MQTT "github.com/eclipse/paho.mqtt.golang" +) + +func onMessageReceived(_ MQTT.Client, message MQTT.Message) { + fmt.Printf("Received message on topic: %s\nMessage: %s\n", message.Topic(), message.Payload()) +} + +func init() { + // Pre-register custom HTTP proxy dialers for use with proxy.FromEnvironment + proxy.RegisterDialerType("http", newHTTPProxy) + proxy.RegisterDialerType("https", newHTTPProxy) +} + +/** + * Illustrates how to make an MQTT connection with HTTP proxy CONNECT support. + * Specify proxy via environment variable: eg: ALL_PROXY=https://proxy_host:port + */ +func main() { + MQTT.DEBUG = log.New(os.Stdout, "", 0) + MQTT.ERROR = log.New(os.Stderr, "", 0) + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + + hostname, _ := os.Hostname() + + server := flag.String("server", "tcp://127.0.0.1:1883", "The full URL of the MQTT server to "+ + "connect to ex: tcp://127.0.0.1:1883") + topic := flag.String("topic", "#", "Topic to subscribe to") + qos := flag.Int("qos", 0, "The QoS to subscribe to messages at") + clientid := flag.String("clientid", hostname+strconv.Itoa(time.Now().Second()), "A clientid for the connection") + username := flag.String("username", "", "A username to authenticate to the MQTT server") + password := flag.String("password", "", "Password to match username") + token := flag.String("token", "", "An optional token credential to authenticate with") + skipVerify := flag.Bool("skipVerify", false, "Controls whether TLS certificate is verified") + flag.Parse() + + connOpts := MQTT.NewClientOptions().AddBroker(*server). + SetClientID(*clientid). + SetCleanSession(true). + SetProtocolVersion(4) + + if *username != "" { + connOpts.SetUsername(*username) + if *password != "" { + connOpts.SetPassword(*password) + } + } else if *token != "" { + connOpts.SetCredentialsProvider(func() (string, string) { + return "unused", *token + }) + } + + connOpts.SetTLSConfig(&tls.Config{InsecureSkipVerify: *skipVerify, ClientAuth: tls.NoClientCert}) + + connOpts.OnConnect = func(c MQTT.Client) { + if token := c.Subscribe(*topic, byte(*qos), onMessageReceived); token.Wait() && token.Error() != nil { + panic(token.Error()) + } + } + + // Illustrates customized TLS configuration prior to connection attempt + connOpts.OnConnectAttempt = func(broker *url.URL, tlsCfg *tls.Config) *tls.Config { + cfg := tlsCfg.Clone() + cfg.ServerName = broker.Hostname() + return cfg + } + + client := MQTT.NewClient(connOpts) + if token := client.Connect(); token.Wait() && token.Error() != nil { + panic(token.Error()) + } else { + fmt.Printf("Connected to %s\n", *server) + } + + <-c +} diff --git a/netconn.go b/netconn.go index c602a39..0cb6cd1 100644 --- a/netconn.go +++ b/netconn.go @@ -30,7 +30,8 @@ import ( // This just establishes the network connection; once established the type of connection should be irrelevant // -// openConnection opens a network connection using the protocol indicated in the URL. Does not carry out any MQTT specific handshakes +// openConnection opens a network connection using the protocol indicated in the URL. +// Does not carry out any MQTT specific handshakes. func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, headers http.Header, websocketOptions *WebsocketOptions) (net.Conn, error) { switch uri.Scheme { case "ws": @@ -77,11 +78,11 @@ func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, heade return nil, err } - tlsConn := tls.Client(conn, tlsConfigWithSni(uri, tlsc)) + tlsConn := tls.Client(conn, tlsc) err = tlsConn.Handshake() if err != nil { - conn.Close() + _ = conn.Close() return nil, err } @@ -89,14 +90,3 @@ func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, heade } return nil, errors.New("unknown protocol") } - -func tlsConfigWithSni(uri *url.URL, conf *tls.Config) *tls.Config { - tlsConfig := conf - if tlsConfig.ServerName == "" { - // Ensure SNI is set appropriately - make a copy to avoid polluting argument or default. - c := tlsConfig.Clone() - c.ServerName = uri.Hostname() - tlsConfig = c - } - return tlsConfig -} diff --git a/options.go b/options.go index 04f8ae6..d175805 100644 --- a/options.go +++ b/options.go @@ -49,6 +49,9 @@ type OnConnectHandler func(Client) // the initial connection is lost type ReconnectHandler func(Client, *ClientOptions) +// ConnectionAttemptHandler is invoked prior to making the initial connection. +type ConnectionAttemptHandler func(broker *url.URL, tlsCfg *tls.Config) *tls.Config + // ClientOptions contains configurable options for an Client. Note that these should be set using the // relevant methods (e.g. AddBroker) rather than directly. See those functions for information on usage. type ClientOptions struct { @@ -79,6 +82,7 @@ type ClientOptions struct { OnConnect OnConnectHandler OnConnectionLost ConnectionLostHandler OnReconnecting ReconnectHandler + OnConnectAttempt ConnectionAttemptHandler WriteTimeout time.Duration MessageChannelDepth uint ResumeSubs bool @@ -120,6 +124,7 @@ func NewClientOptions() *ClientOptions { Store: nil, OnConnect: nil, OnConnectionLost: DefaultConnectionLostHandler, + OnConnectAttempt: nil, WriteTimeout: 0, // 0 represents timeout disabled ResumeSubs: false, HTTPHeaders: make(map[string][]string), @@ -321,6 +326,13 @@ func (o *ClientOptions) SetReconnectingHandler(cb ReconnectHandler) *ClientOptio return o } +// SetConnectionAttemptHandler sets the ConnectionAttemptHandler callback to be executed prior +// to the client attempting initial connection to the MQTT broker. +func (o *ClientOptions) SetConnectionAttemptHandler(onConnectAttempt ConnectionAttemptHandler) *ClientOptions { + o.OnConnectAttempt = onConnectAttempt + return o +} + // SetWriteTimeout puts a limit on how long a mqtt publish should block until it unblocks with a // timeout error. A duration of 0 never times out. Default never times out func (o *ClientOptions) SetWriteTimeout(t time.Duration) *ClientOptions { From 4c25813ac7def23ab36e80092483cf163017279f Mon Sep 17 00:00:00 2001 From: Amir Khassaia Date: Thu, 22 Apr 2021 16:06:36 +1000 Subject: [PATCH 4/4] fix: improve public func doc Signed-off-by: amir-khassaia --- options.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/options.go b/options.go index d175805..4a1420c 100644 --- a/options.go +++ b/options.go @@ -327,7 +327,9 @@ func (o *ClientOptions) SetReconnectingHandler(cb ReconnectHandler) *ClientOptio } // SetConnectionAttemptHandler sets the ConnectionAttemptHandler callback to be executed prior -// to the client attempting initial connection to the MQTT broker. +// to each attempt to connect to an MQTT broker. Returns the *tls.Config that will be used when establishing +// the connection (a copy of the tls.Config from ClientOptions will be passed in along with the broker URL). +// This allows connection specific changes to be made to the *tls.Config. func (o *ClientOptions) SetConnectionAttemptHandler(onConnectAttempt ConnectionAttemptHandler) *ClientOptions { o.OnConnectAttempt = onConnectAttempt return o