-
Notifications
You must be signed in to change notification settings - Fork 539
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #497 from amir-khassaia/feat/http-connect-proxy-su…
…pport Add client option `SetConnectionAttemptHandler` that will be called prior to dialling a broker. This enables a connection specific tls.Config to be set (providing better support for proxies and tls with multiple brokers). In addition a new example has been added that demonstrates how to connect via a proxy (using SNI).
- Loading branch information
Showing
5 changed files
with
213 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
package main | ||
|
||
import ( | ||
"bufio" | ||
"fmt" | ||
"net" | ||
"net/http" | ||
"net/url" | ||
|
||
"golang.org/x/net/proxy" | ||
) | ||
|
||
// httpProxy is a HTTP/HTTPS connect capable proxy. | ||
type httpProxy struct { | ||
host string | ||
haveAuth bool | ||
username string | ||
password string | ||
forward proxy.Dialer | ||
} | ||
|
||
func (s httpProxy) String() string { | ||
return fmt.Sprintf("HTTP proxy dialer for %s", s.host) | ||
} | ||
|
||
func newHTTPProxy(uri *url.URL, forward proxy.Dialer) (proxy.Dialer, error) { | ||
s := new(httpProxy) | ||
s.host = uri.Host | ||
s.forward = forward | ||
if uri.User != nil { | ||
s.haveAuth = true | ||
s.username = uri.User.Username() | ||
s.password, _ = uri.User.Password() | ||
} | ||
|
||
return s, nil | ||
} | ||
|
||
func (s *httpProxy) Dial(_, addr string) (net.Conn, error) { | ||
reqURL := url.URL{ | ||
Scheme: "https", | ||
Host: addr, | ||
} | ||
|
||
req, err := http.NewRequest("CONNECT", reqURL.String(), nil) | ||
if err != nil { | ||
return nil, err | ||
} | ||
req.Close = false | ||
if s.haveAuth { | ||
req.SetBasicAuth(s.username, s.password) | ||
} | ||
req.Header.Set("User-Agent", "paho.mqtt") | ||
|
||
// Dial and create the client connection. | ||
c, err := s.forward.Dial("tcp", s.host) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
err = req.Write(c) | ||
if err != nil { | ||
_ = c.Close() | ||
return nil, err | ||
} | ||
|
||
resp, err := http.ReadResponse(bufio.NewReader(c), req) | ||
if err != nil { | ||
_ = c.Close() | ||
return nil, err | ||
} | ||
_ = resp.Body.Close() | ||
if resp.StatusCode != http.StatusOK { | ||
_ = c.Close() | ||
return nil, fmt.Errorf("proxied connection returned an error: %v", resp.Status) | ||
} | ||
|
||
return c, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
/* | ||
* Copyright (c) 2013 IBM Corp. | ||
* | ||
* All rights reserved. This program and the accompanying materials | ||
* are made available under the terms of the Eclipse Public License v1.0 | ||
* which accompanies this distribution, and is available at | ||
* http://www.eclipse.org/legal/epl-v10.html | ||
* | ||
* Contributors: | ||
* Seth Hoenig | ||
* Allan Stockdill-Mander | ||
* Mike Robertson | ||
*/ | ||
|
||
package main | ||
|
||
import ( | ||
"crypto/tls" | ||
"flag" | ||
"fmt" | ||
"golang.org/x/net/proxy" | ||
"log" | ||
"net/url" | ||
|
||
// "log" | ||
"os" | ||
"os/signal" | ||
"strconv" | ||
"syscall" | ||
"time" | ||
|
||
MQTT "github.com/eclipse/paho.mqtt.golang" | ||
) | ||
|
||
func onMessageReceived(_ MQTT.Client, message MQTT.Message) { | ||
fmt.Printf("Received message on topic: %s\nMessage: %s\n", message.Topic(), message.Payload()) | ||
} | ||
|
||
func init() { | ||
// Pre-register custom HTTP proxy dialers for use with proxy.FromEnvironment | ||
proxy.RegisterDialerType("http", newHTTPProxy) | ||
proxy.RegisterDialerType("https", newHTTPProxy) | ||
} | ||
|
||
/** | ||
* Illustrates how to make an MQTT connection with HTTP proxy CONNECT support. | ||
* Specify proxy via environment variable: eg: ALL_PROXY=https://proxy_host:port | ||
*/ | ||
func main() { | ||
MQTT.DEBUG = log.New(os.Stdout, "", 0) | ||
MQTT.ERROR = log.New(os.Stderr, "", 0) | ||
|
||
c := make(chan os.Signal, 1) | ||
signal.Notify(c, os.Interrupt, syscall.SIGTERM) | ||
|
||
hostname, _ := os.Hostname() | ||
|
||
server := flag.String("server", "tcp://127.0.0.1:1883", "The full URL of the MQTT server to "+ | ||
"connect to ex: tcp://127.0.0.1:1883") | ||
topic := flag.String("topic", "#", "Topic to subscribe to") | ||
qos := flag.Int("qos", 0, "The QoS to subscribe to messages at") | ||
clientid := flag.String("clientid", hostname+strconv.Itoa(time.Now().Second()), "A clientid for the connection") | ||
username := flag.String("username", "", "A username to authenticate to the MQTT server") | ||
password := flag.String("password", "", "Password to match username") | ||
token := flag.String("token", "", "An optional token credential to authenticate with") | ||
skipVerify := flag.Bool("skipVerify", false, "Controls whether TLS certificate is verified") | ||
flag.Parse() | ||
|
||
connOpts := MQTT.NewClientOptions().AddBroker(*server). | ||
SetClientID(*clientid). | ||
SetCleanSession(true). | ||
SetProtocolVersion(4) | ||
|
||
if *username != "" { | ||
connOpts.SetUsername(*username) | ||
if *password != "" { | ||
connOpts.SetPassword(*password) | ||
} | ||
} else if *token != "" { | ||
connOpts.SetCredentialsProvider(func() (string, string) { | ||
return "unused", *token | ||
}) | ||
} | ||
|
||
connOpts.SetTLSConfig(&tls.Config{InsecureSkipVerify: *skipVerify, ClientAuth: tls.NoClientCert}) | ||
|
||
connOpts.OnConnect = func(c MQTT.Client) { | ||
if token := c.Subscribe(*topic, byte(*qos), onMessageReceived); token.Wait() && token.Error() != nil { | ||
panic(token.Error()) | ||
} | ||
} | ||
|
||
// Illustrates customized TLS configuration prior to connection attempt | ||
connOpts.OnConnectAttempt = func(broker *url.URL, tlsCfg *tls.Config) *tls.Config { | ||
cfg := tlsCfg.Clone() | ||
cfg.ServerName = broker.Hostname() | ||
return cfg | ||
} | ||
|
||
client := MQTT.NewClient(connOpts) | ||
if token := client.Connect(); token.Wait() && token.Error() != nil { | ||
panic(token.Error()) | ||
} else { | ||
fmt.Printf("Connected to %s\n", *server) | ||
} | ||
|
||
<-c | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters