Skip to content

Commit

Permalink
fix some things
Browse files Browse the repository at this point in the history
Change-Id: Id535436ee9da264fba604974f2ce2cb365435f86
Signed-off-by: Seth Hoenig <[email protected]>
  • Loading branch information
shoenig committed Jan 27, 2014
1 parent a334970 commit 43f4b39
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 28 deletions.
36 changes: 36 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so

# Folders
_obj
_test

# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out

*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*

_testmain.go

*.exe

*.msg
*.lok

samples/trivial
samples/trivial2
samples/sample
samples/reconnect
samples/ssl
samples/custom_store
samples/simple
samples/stdinpub
samples/stdoutsub
samples/routing
65 changes: 62 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,63 @@
go-mqtt
=======
Eclipse Paho MQTT Go client
===========================

An mqtt v3.1 client written in Go

This repository contains the source code for the [Eclipse Paho](http://eclipse.org/paho) MQTT Go client library.

This code builds a library which enable applications to connect to an [MQTT](http://mqtt.org) broker to publish messages, and to subscribe to topics and receive published messages.

This library supports a fully asynchronous mode of operation.


Installation and Build
----------------------

This client is designed to work with the standard Go tools, so installation is as easy as:

```
go get git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git
```

The client depends on Google's [websockets](http://godoc.org/code.google.com/p/go.net/websocket) package,
also easily installed with the command:

```
go get code.google.com/p/go.net/websocket
```


Usage and API
-------------

Detailed API documentation is available by using to godoc tool, or can be browsed online
using the [godoc.org](http://godoc.org/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git) service.

Make use of the library by importing it in your Go client source code. For example,
```
import MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
```

Samples are available in the `/samples` directory for reference.


Runtime tracing
---------------

Tracing is enabled by using the `SetTraceLevel` option when creating a ClientOptions struct. See the ClientOptions
documentation for more details.


Reporting bugs
--------------

Please report bugs under the "MQTT-Go" Component in [Eclipse Bugzilla](http://bugs.eclipse.org/bugs/) for the Paho Technology project. This is a very new library as of Q1 2014, so there are sure to be bugs.


More information
----------------

Discussion of the Paho clients takes place on the [Eclipse paho-dev mailing list](https://dev.eclipse.org/mailman/listinfo/paho-dev).

General questions about the MQTT protocol are discussed in the [MQTT Google Group](https://groups.google.com/forum/?hl=en-US&fromgroups#!forum/mqtt).

There is much more information available via the [MQTT community site](http://mqtt.org).
17 changes: 14 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package mqtt

import (
"errors"
"math/rand"
"net"
"sync"
Expand Down Expand Up @@ -104,22 +105,28 @@ func (c *MqttClient) Start() ([]Receipt, error) {

c1, err1 := openConnection(c.options.server, c.options.tlsconfig)
if err1 != nil {
c.trace_w(CLI, "failed to connect to primary broker")
if c.options.server2 != nil {
c2, err2 := openConnection(c.options.server2, c.options.tlsconfig)
if err2 != nil {
c.trace_w(CLI, "failed to connect to standby broker")
return nil, err1
}
c.conn = c2
c.trace_v(CLI, "connected to standby broker")
} else {
c.trace_w(CLI, "standby broker is not configured")
return nil, err1
}
} else {
c.conn = c1
c.trace_v(CLI, "connected to primary broker")
}

chkcond(c.conn != nil)
if c.conn == nil {
c.trace_e(CLI, "Failed to connect to a broker")
return nil, errors.New("Failed to connect to a broker")
}

c.persist.Open()
c.receipts = newReceiptMap()
Expand Down Expand Up @@ -164,10 +171,12 @@ func (c *MqttClient) Start() ([]Receipt, error) {

rc := <-c.begin // wait for connack
if rc != CONN_ACCEPTED {
c.trace_c(CLI, "CONNACK was not CONN_ACCEPTED, but rather %s", rc2str(rc))
return nil, chkrc(rc)
}

c.connected = true
c.trace_v(CLI, "client is connected")

if c.options.timeout != 0 {
go keepalive(c)
Expand All @@ -192,10 +201,11 @@ func (c *MqttClient) Start() ([]Receipt, error) {
// the specified number of milliseconds to wait for existing work to be
// completed.
func (c *MqttClient) Disconnect(quiesce uint) {
c.trace_v(CLI, "disconnecting")
if !c.IsConnected() {
c.trace_w(CLI, "already disconnected")
return
}
c.trace_v(CLI, "disconnecting")
c.connected = false

// wait for work to finish, or quiesce time consumed
Expand All @@ -213,10 +223,11 @@ func (c *MqttClient) Disconnect(quiesce uint) {

// ForceDisconnect will end the connection with the mqtt broker immediately.
func (c *MqttClient) ForceDisconnect() {
c.trace_w(CLI, "force disconnecting")
if !c.IsConnected() {
c.trace_w(CLI, "already disconnected")
return
}
c.trace_v(CLI, "forcefully disconnecting")
c.disconnect()
}

Expand Down
19 changes: 19 additions & 0 deletions oops.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,22 @@ func chkrc(rc ConnRC) error {
}
return nil
}

func rc2str(rc ConnRC) string {
switch rc {
case CONN_ACCEPTED:
return "CONN_ACCEPTED"
case CONN_REF_BAD_PROTO_VER:
return "CONN_REF_BAD_PROTO_VER"
case CONN_REF_ID_REJ:
return "CONN_REF_ID_REJ"
case CONN_REF_SERV_UNAVAIL:
return "CONN_REF_SERV_UNAVAIL"
case CONN_REF_BAD_USER_PASS:
return "CONN_REF_BAD_USER_PASS"
case CONN_REF_NOT_AUTH:
return "CONN_REF_NOT_AUTH"
default:
return "UNKNOWN"
}
}
4 changes: 4 additions & 0 deletions router.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,9 @@ func (r *router) matchAndDispatch(messages <-chan *Message, order bool) {
for e := r.routes.Front(); e != nil; e = e.Next() {
if e.Value.(*route).match(message.Topic()) {
if order {
r.RUnlock()
e.Value.(*route).callback(*message)
r.RLock()
} else {
go e.Value.(*route).callback(*message)
}
Expand All @@ -144,7 +146,9 @@ func (r *router) matchAndDispatch(messages <-chan *Message, order bool) {
r.RUnlock()
if !sent {
if order {
r.RUnlock()
r.defaultHandler(*message)
r.RLock()
} else {
go r.defaultHandler(*message)
}
Expand Down
45 changes: 31 additions & 14 deletions samples/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
* Mike Robertson
*/

/*
/*----------------------------------------------------------------------
This sample is designed to demonstrate the ability to set individual
callbacks on a per-subscription basis. There are three handlers in use:
brokerLoadHandler - $SYS/broker/load/#
Expand All @@ -23,12 +22,15 @@ The client will receive 100 messages total from those subscriptions,
and then print the total number of messages received from each.
It may take a few moments for the sample to complete running, as it
must wait for messages to be published.
*/
-----------------------------------------------------------------------*/

package main

import "fmt"
import MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
import (
"fmt"
MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
"os"
)

var broker_load = make(chan bool)
var broker_connection = make(chan bool)
Expand Down Expand Up @@ -65,12 +67,27 @@ func main() {
if err != nil {
panic(err)
}
ss1 := c.StartSubscription(brokerLoadHandler, "$SYS/broker/load/#", MQTT.QOS_ZERO)
<-ss1
ss2 := c.StartSubscription(brokerConnectionHandler, "$SYS/broker/connection/#", MQTT.QOS_ZERO)
<-ss2
ss3 := c.StartSubscription(brokerClientsHandler, "$SYS/broker/clients/#", MQTT.QOS_ZERO)
<-ss3

if receipt, err := c.StartSubscription(brokerLoadHandler, "$SYS/broker/load/#", MQTT.QOS_ZERO); err != nil {
fmt.Println(err)
os.Exit(1)
} else {
<-receipt
}

if receipt, err := c.StartSubscription(brokerConnectionHandler, "$SYS/broker/connection/#", MQTT.QOS_ZERO); err != nil {
fmt.Println(err)
os.Exit(1)
} else {
<-receipt
}

if receipt, err := c.StartSubscription(brokerClientsHandler, "$SYS/broker/clients/#", MQTT.QOS_ZERO); err != nil {
fmt.Println(err)
os.Exit(1)
} else {
<-receipt
}

num_bload := 0
num_bconns := 0
Expand All @@ -87,9 +104,9 @@ func main() {
}
}

fmt.Printf("Received %d Broker Load messages\n", num_bload)
fmt.Printf("Received %d Broker Connection messages\n", num_bconns)
fmt.Printf("Received %d Broker Clients messages\n", num_bclients)
fmt.Printf("Received %3d Broker Load messages\n", num_bload)
fmt.Printf("Received %3d Broker Connection messages\n", num_bconns)
fmt.Printf("Received %3d Broker Clients messages\n", num_bclients)

c.Disconnect(250)
}
28 changes: 20 additions & 8 deletions samples/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@

package main

import "fmt"
import "time"
import MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
import (
"fmt"
MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
"os"
"time"
)

var f MQTT.MessageHandler = func(msg MQTT.Message) {
fmt.Printf("TOPIC: %s\n", msg.Topic())
Expand All @@ -33,19 +36,28 @@ func main() {
if err != nil {
panic(err)
}
receipt := c.StartSubscription(nil, "/go-mqtt/sample", MQTT.QOS_ZERO)
<-receipt

if receipt, err := c.StartSubscription(nil, "/go-mqtt/sample", MQTT.QOS_ZERO); err != nil {
fmt.Println(err)
os.Exit(1)
} else {
<-receipt
}

for i := 0; i < 5; i++ {
text := fmt.Sprintf("this is msg #%d!", i)
receipt = c.Publish(MQTT.QOS_ONE, "/go-mqtt/sample", text)
receipt := c.Publish(MQTT.QOS_ONE, "/go-mqtt/sample", text)
<-receipt
}

time.Sleep(3 * time.Second)

receipt = c.EndSubscription("/go-mqtt/sample")
<-receipt
if receipt, err := c.EndSubscription("/go-mqtt/sample"); err != nil {
fmt.Println(err)
os.Exit(1)
} else {
<-receipt
}

c.Disconnect(250)
}
12 changes: 12 additions & 0 deletions trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,18 @@ func (c *MqttClient) trace_w(cm component, f string, v ...interface{}) {
c.t.Trace_W(cm, f, v...)
}

func (t *Tracer) Trace_C(cm component, f string, v ...interface{}) {
if t.level >= Critical && t.output != nil {
x := fmt.Sprintf(f, v...)
m := fmt.Sprintf(frmt, cm, timestamp(), t.clientid, x)
t.output.WriteString(m)
}
}

func (c *MqttClient) trace_c(cm component, f string, v ...interface{}) {
c.t.Trace_C(cm, f, v...)
}

func (t *Tracer) Trace_E(cm component, f string, v ...interface{}) {
if t.level >= Critical && t.output != nil {
x := fmt.Sprintf(f, v...)
Expand Down

0 comments on commit 43f4b39

Please sign in to comment.