diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..47bb0de4 --- /dev/null +++ b/.gitignore @@ -0,0 +1,36 @@ +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +_obj +_test + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe + +*.msg +*.lok + +samples/trivial +samples/trivial2 +samples/sample +samples/reconnect +samples/ssl +samples/custom_store +samples/simple +samples/stdinpub +samples/stdoutsub +samples/routing \ No newline at end of file diff --git a/README.md b/README.md index f7759c8d..ec4f9bf6 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,63 @@ -go-mqtt -======= +Eclipse Paho MQTT Go client +=========================== -An mqtt v3.1 client written in Go + +This repository contains the source code for the [Eclipse Paho](http://eclipse.org/paho) MQTT Go client library. + +This code builds a library which enable applications to connect to an [MQTT](http://mqtt.org) broker to publish messages, and to subscribe to topics and receive published messages. + +This library supports a fully asynchronous mode of operation. + + +Installation and Build +---------------------- + +This client is designed to work with the standard Go tools, so installation is as easy as: + +``` +go get git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git +``` + +The client depends on Google's [websockets](http://godoc.org/code.google.com/p/go.net/websocket) package, +also easily installed with the command: + +``` +go get code.google.com/p/go.net/websocket +``` + + +Usage and API +------------- + +Detailed API documentation is available by using to godoc tool, or can be browsed online +using the [godoc.org](http://godoc.org/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git) service. + +Make use of the library by importing it in your Go client source code. For example, +``` +import MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git" +``` + +Samples are available in the `/samples` directory for reference. + + +Runtime tracing +--------------- + +Tracing is enabled by using the `SetTraceLevel` option when creating a ClientOptions struct. See the ClientOptions +documentation for more details. + + +Reporting bugs +-------------- + +Please report bugs under the "MQTT-Go" Component in [Eclipse Bugzilla](http://bugs.eclipse.org/bugs/) for the Paho Technology project. This is a very new library as of Q1 2014, so there are sure to be bugs. + + +More information +---------------- + +Discussion of the Paho clients takes place on the [Eclipse paho-dev mailing list](https://dev.eclipse.org/mailman/listinfo/paho-dev). + +General questions about the MQTT protocol are discussed in the [MQTT Google Group](https://groups.google.com/forum/?hl=en-US&fromgroups#!forum/mqtt). + +There is much more information available via the [MQTT community site](http://mqtt.org). diff --git a/client.go b/client.go index f71bebea..5a641ff6 100644 --- a/client.go +++ b/client.go @@ -16,6 +16,7 @@ package mqtt import ( + "errors" "math/rand" "net" "sync" @@ -104,14 +105,17 @@ func (c *MqttClient) Start() ([]Receipt, error) { c1, err1 := openConnection(c.options.server, c.options.tlsconfig) if err1 != nil { + c.trace_w(CLI, "failed to connect to primary broker") if c.options.server2 != nil { c2, err2 := openConnection(c.options.server2, c.options.tlsconfig) if err2 != nil { + c.trace_w(CLI, "failed to connect to standby broker") return nil, err1 } c.conn = c2 c.trace_v(CLI, "connected to standby broker") } else { + c.trace_w(CLI, "standby broker is not configured") return nil, err1 } } else { @@ -119,7 +123,10 @@ func (c *MqttClient) Start() ([]Receipt, error) { c.trace_v(CLI, "connected to primary broker") } - chkcond(c.conn != nil) + if c.conn == nil { + c.trace_e(CLI, "Failed to connect to a broker") + return nil, errors.New("Failed to connect to a broker") + } c.persist.Open() c.receipts = newReceiptMap() @@ -164,10 +171,12 @@ func (c *MqttClient) Start() ([]Receipt, error) { rc := <-c.begin // wait for connack if rc != CONN_ACCEPTED { + c.trace_c(CLI, "CONNACK was not CONN_ACCEPTED, but rather %s", rc2str(rc)) return nil, chkrc(rc) } c.connected = true + c.trace_v(CLI, "client is connected") if c.options.timeout != 0 { go keepalive(c) @@ -192,10 +201,11 @@ func (c *MqttClient) Start() ([]Receipt, error) { // the specified number of milliseconds to wait for existing work to be // completed. func (c *MqttClient) Disconnect(quiesce uint) { - c.trace_v(CLI, "disconnecting") if !c.IsConnected() { + c.trace_w(CLI, "already disconnected") return } + c.trace_v(CLI, "disconnecting") c.connected = false // wait for work to finish, or quiesce time consumed @@ -213,10 +223,11 @@ func (c *MqttClient) Disconnect(quiesce uint) { // ForceDisconnect will end the connection with the mqtt broker immediately. func (c *MqttClient) ForceDisconnect() { - c.trace_w(CLI, "force disconnecting") if !c.IsConnected() { + c.trace_w(CLI, "already disconnected") return } + c.trace_v(CLI, "forcefully disconnecting") c.disconnect() } diff --git a/oops.go b/oops.go index 5d04c1c0..e45f4d66 100644 --- a/oops.go +++ b/oops.go @@ -64,3 +64,22 @@ func chkrc(rc ConnRC) error { } return nil } + +func rc2str(rc ConnRC) string { + switch rc { + case CONN_ACCEPTED: + return "CONN_ACCEPTED" + case CONN_REF_BAD_PROTO_VER: + return "CONN_REF_BAD_PROTO_VER" + case CONN_REF_ID_REJ: + return "CONN_REF_ID_REJ" + case CONN_REF_SERV_UNAVAIL: + return "CONN_REF_SERV_UNAVAIL" + case CONN_REF_BAD_USER_PASS: + return "CONN_REF_BAD_USER_PASS" + case CONN_REF_NOT_AUTH: + return "CONN_REF_NOT_AUTH" + default: + return "UNKNOWN" + } +} diff --git a/router.go b/router.go index 96786164..f881878f 100644 --- a/router.go +++ b/router.go @@ -134,7 +134,9 @@ func (r *router) matchAndDispatch(messages <-chan *Message, order bool) { for e := r.routes.Front(); e != nil; e = e.Next() { if e.Value.(*route).match(message.Topic()) { if order { + r.RUnlock() e.Value.(*route).callback(*message) + r.RLock() } else { go e.Value.(*route).callback(*message) } @@ -144,7 +146,9 @@ func (r *router) matchAndDispatch(messages <-chan *Message, order bool) { r.RUnlock() if !sent { if order { + r.RUnlock() r.defaultHandler(*message) + r.RLock() } else { go r.defaultHandler(*message) } diff --git a/samples/routing.go b/samples/routing.go index 77081409..e900a216 100644 --- a/samples/routing.go +++ b/samples/routing.go @@ -12,8 +12,7 @@ * Mike Robertson */ -/* - +/*---------------------------------------------------------------------- This sample is designed to demonstrate the ability to set individual callbacks on a per-subscription basis. There are three handlers in use: brokerLoadHandler - $SYS/broker/load/# @@ -23,12 +22,15 @@ The client will receive 100 messages total from those subscriptions, and then print the total number of messages received from each. It may take a few moments for the sample to complete running, as it must wait for messages to be published. -*/ +-----------------------------------------------------------------------*/ package main -import "fmt" -import MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git" +import ( + "fmt" + MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git" + "os" +) var broker_load = make(chan bool) var broker_connection = make(chan bool) @@ -65,12 +67,27 @@ func main() { if err != nil { panic(err) } - ss1 := c.StartSubscription(brokerLoadHandler, "$SYS/broker/load/#", MQTT.QOS_ZERO) - <-ss1 - ss2 := c.StartSubscription(brokerConnectionHandler, "$SYS/broker/connection/#", MQTT.QOS_ZERO) - <-ss2 - ss3 := c.StartSubscription(brokerClientsHandler, "$SYS/broker/clients/#", MQTT.QOS_ZERO) - <-ss3 + + if receipt, err := c.StartSubscription(brokerLoadHandler, "$SYS/broker/load/#", MQTT.QOS_ZERO); err != nil { + fmt.Println(err) + os.Exit(1) + } else { + <-receipt + } + + if receipt, err := c.StartSubscription(brokerConnectionHandler, "$SYS/broker/connection/#", MQTT.QOS_ZERO); err != nil { + fmt.Println(err) + os.Exit(1) + } else { + <-receipt + } + + if receipt, err := c.StartSubscription(brokerClientsHandler, "$SYS/broker/clients/#", MQTT.QOS_ZERO); err != nil { + fmt.Println(err) + os.Exit(1) + } else { + <-receipt + } num_bload := 0 num_bconns := 0 @@ -87,9 +104,9 @@ func main() { } } - fmt.Printf("Received %d Broker Load messages\n", num_bload) - fmt.Printf("Received %d Broker Connection messages\n", num_bconns) - fmt.Printf("Received %d Broker Clients messages\n", num_bclients) + fmt.Printf("Received %3d Broker Load messages\n", num_bload) + fmt.Printf("Received %3d Broker Connection messages\n", num_bconns) + fmt.Printf("Received %3d Broker Clients messages\n", num_bclients) c.Disconnect(250) } diff --git a/samples/simple.go b/samples/simple.go index 13fc8c9a..54419135 100644 --- a/samples/simple.go +++ b/samples/simple.go @@ -14,9 +14,12 @@ package main -import "fmt" -import "time" -import MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git" +import ( + "fmt" + MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git" + "os" + "time" +) var f MQTT.MessageHandler = func(msg MQTT.Message) { fmt.Printf("TOPIC: %s\n", msg.Topic()) @@ -33,19 +36,28 @@ func main() { if err != nil { panic(err) } - receipt := c.StartSubscription(nil, "/go-mqtt/sample", MQTT.QOS_ZERO) - <-receipt + + if receipt, err := c.StartSubscription(nil, "/go-mqtt/sample", MQTT.QOS_ZERO); err != nil { + fmt.Println(err) + os.Exit(1) + } else { + <-receipt + } for i := 0; i < 5; i++ { text := fmt.Sprintf("this is msg #%d!", i) - receipt = c.Publish(MQTT.QOS_ONE, "/go-mqtt/sample", text) + receipt := c.Publish(MQTT.QOS_ONE, "/go-mqtt/sample", text) <-receipt } time.Sleep(3 * time.Second) - receipt = c.EndSubscription("/go-mqtt/sample") - <-receipt + if receipt, err := c.EndSubscription("/go-mqtt/sample"); err != nil { + fmt.Println(err) + os.Exit(1) + } else { + <-receipt + } c.Disconnect(250) } diff --git a/trace.go b/trace.go index ef46820b..a3a5fc27 100644 --- a/trace.go +++ b/trace.go @@ -75,6 +75,18 @@ func (c *MqttClient) trace_w(cm component, f string, v ...interface{}) { c.t.Trace_W(cm, f, v...) } +func (t *Tracer) Trace_C(cm component, f string, v ...interface{}) { + if t.level >= Critical && t.output != nil { + x := fmt.Sprintf(f, v...) + m := fmt.Sprintf(frmt, cm, timestamp(), t.clientid, x) + t.output.WriteString(m) + } +} + +func (c *MqttClient) trace_c(cm component, f string, v ...interface{}) { + c.t.Trace_C(cm, f, v...) +} + func (t *Tracer) Trace_E(cm component, f string, v ...interface{}) { if t.level >= Critical && t.output != nil { x := fmt.Sprintf(f, v...)