diff --git a/README.md b/README.md index 8777423a..c1acf4e1 100644 --- a/README.md +++ b/README.md @@ -104,8 +104,12 @@ func main() { * Seemingly random disconnections may be caused by another client connecting to the broker with the same client identifier; this is as per the [spec](https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc384800405). -* A `MessageHandler` (called when a new message is received) must not block. If you wish to perform a long-running task, -or publish a message, then please use a go routine (blocking in the handler is a common cause of unexpected `pingresp +* Unless ordered delivery of messages is essential (and you have configured your broker to support this e.g. + `max_inflight_messages=1` in mosquitto) then set `ClientOptions.SetOrderMatters(false)`. Doing so will avoid the + below issue (deadlocks due to blocking message handlers). +* A `MessageHandler` (called when a new message is received) must not block (unless + `ClientOptions.SetOrderMatters(false)` set). If you wish to perform a long-running task, or publish a message, then + please use a go routine (blocking in the handler is a common cause of unexpected `pingresp not received, disconnecting` errors). * When QOS1+ subscriptions have been created previously and you connect with `CleanSession` set to false it is possible that the broker will deliver retained messages before `Subscribe` can be called. To process these messages either configure a handler with `AddRoute` or diff --git a/client.go b/client.go index 5b4b2e7f..8558f3bf 100644 --- a/client.go +++ b/client.go @@ -55,6 +55,8 @@ const ( // information can be found in their respective documentation. // Numerous connection options may be specified by configuring a // and then supplying a ClientOptions type. +// Implementations of Client must be safe for concurrent use by multiple +// goroutines type Client interface { // IsConnected returns a bool signifying whether // the client is connected or not. @@ -75,11 +77,21 @@ type Client interface { // Returns a token to track delivery of the message to the broker Publish(topic string, qos byte, retained bool, payload interface{}) Token // Subscribe starts a new subscription. Provide a MessageHandler to be executed when - // a message is published on the topic provided, or nil for the default handler + // a message is published on the topic provided, or nil for the default handler. + // + // If `options.OrderMatters` is true (the default) then `callback` must not block or + // call functions within this package that may block (e.g. `Publish`) other than in + // a new go routine. + // `callback` must be safe for concurrent use by multiple goroutines. Subscribe(topic string, qos byte, callback MessageHandler) Token // SubscribeMultiple starts a new subscription for multiple topics. Provide a MessageHandler to // be executed when a message is published on one of the topics provided, or nil for the - // default handler + // default handler. + // + // If `options.OrderMatters` is true (the default) then `callback` must not block or + // call functions within this package that may block (e.g. `Publish`) other than in + // a new go routine. + // `callback` must be safe for concurrent use by multiple goroutines. SubscribeMultiple(filters map[string]byte, callback MessageHandler) Token // Unsubscribe will end the subscription from each of the topics provided. // Messages published to those topics from other clients will no longer be @@ -87,7 +99,13 @@ type Client interface { Unsubscribe(topics ...string) Token // AddRoute allows you to add a handler for messages on a specific topic // without making a subscription. For example having a different handler - // for parts of a wildcard subscription + // for parts of a wildcard subscription or for receiving retained messages + // upon connection (before Sub scribe can be processed). + // + // If `options.OrderMatters` is true (the default) then `callback` must not block or + // call functions within this package that may block (e.g. `Publish`) other than in + // a new go routine. + // `callback` must be safe for concurrent use by multiple goroutines. AddRoute(topic string, callback MessageHandler) // OptionsReader returns a ClientOptionsReader which is a copy of the clientoptions // in use by the client. @@ -95,6 +113,8 @@ type Client interface { } // client implements the Client interface +// clients are safe for concurrent use by multiple +// goroutines type client struct { lastSent atomic.Value // time.Time - the last time a packet was successfully sent to network lastReceived atomic.Value // time.Time - the last time a packet was successfully received from network @@ -153,6 +173,11 @@ func NewClient(o *ClientOptions) Client { // AddRoute allows you to add a handler for messages on a specific topic // without making a subscription. For example having a different handler // for parts of a wildcard subscription +// +// If `options.OrderMatters` is true (the default) then `callback` must not block or +// call functions within this package that may block (e.g. `Publish`) other than in +// a new go routine. +// `callback` must be safe for concurrent use by multiple goroutines. func (c *client) AddRoute(topic string, callback MessageHandler) { if callback != nil { c.msgRouter.addRoute(topic, callback) @@ -686,10 +711,10 @@ func (c *client) Publish(topic string, qos byte, retained bool, payload interfac // Subscribe starts a new subscription. Provide a MessageHandler to be executed when // a message is published on the topic provided. // -// Please note: you should try to keep the execution time of the callback to be -// as low as possible, especially when SetOrderMatters(true) (the default) is in -// place. Blocking calls in message handlers might otherwise delay delivery to -// other message handlers. +// If `options.OrderMatters` is true (the default) then `callback` must not block or +// call functions within this package that may block (e.g. `Publish`) other than in +// a new go routine. +// `callback` must be safe for concurrent use by multiple goroutines. func (c *client) Subscribe(topic string, qos byte, callback MessageHandler) Token { token := newToken(packets.Subscribe).(*SubscribeToken) DEBUG.Println(CLI, "enter Subscribe") @@ -766,6 +791,11 @@ func (c *client) Subscribe(topic string, qos byte, callback MessageHandler) Toke // SubscribeMultiple starts a new subscription for multiple topics. Provide a MessageHandler to // be executed when a message is published on one of the topics provided. +// +// If `options.OrderMatters` is true (the default) then `callback` must not block or +// call functions within this package that may block (e.g. `Publish`) other than in +// a new go routine. +// `callback` must be safe for concurrent use by multiple goroutines. func (c *client) SubscribeMultiple(filters map[string]byte, callback MessageHandler) Token { var err error token := newToken(packets.Subscribe).(*SubscribeToken) diff --git a/cmd/docker/binds/mosquitto/config/mosquitto.conf b/cmd/docker/binds/mosquitto/config/mosquitto.conf index da13df71..6a50fd76 100644 --- a/cmd/docker/binds/mosquitto/config/mosquitto.conf +++ b/cmd/docker/binds/mosquitto/config/mosquitto.conf @@ -7,8 +7,9 @@ allow_anonymous true # Anyone can connect -# Port to use for the default listener. -#port 1883 +# Mosquitto v2+ requires that a listener be definer (otherwise it listens on loopback) +listener 1883 + #log_type error #log_type warning diff --git a/cmd/docker/publisher/main.go b/cmd/docker/publisher/main.go index b98d3bf7..1e6b3152 100644 --- a/cmd/docker/publisher/main.go +++ b/cmd/docker/publisher/main.go @@ -20,6 +20,8 @@ const ( SERVERADDRESS = "tcp://mosquitto:1883" DELAY = time.Second CLIENTID = "mqtt_publisher" + + WRITETOLOG = true // If true then published messages will be written to the console ) func main() { @@ -32,6 +34,7 @@ func main() { opts.AddBroker(SERVERADDRESS) opts.SetClientID(CLIENTID) + opts.SetOrderMatters(false) // Allow out of order messages (use this option unless in order delivery is essential) opts.ConnectTimeout = time.Second // Minimal delays on connect opts.WriteTimeout = time.Second // Minimal delays on writes opts.KeepAlive = 10 // Keepalive every 10 seconds so we quickly detect network outages @@ -85,6 +88,9 @@ func main() { panic(err) } + if WRITETOLOG { + fmt.Printf("sending message: %s\n", msg) + } t := client.Publish(TOPIC, QOS, false, msg) // Handle the token in a go routine so this loop keeps sending messages regardless of delivery status go func() { diff --git a/cmd/docker/subscriber/main.go b/cmd/docker/subscriber/main.go index fc0148c6..fffca909 100644 --- a/cmd/docker/subscriber/main.go +++ b/cmd/docker/subscriber/main.go @@ -93,6 +93,7 @@ func main() { opts.AddBroker(SERVERADDRESS) opts.SetClientID(CLIENTID) + opts.SetOrderMatters(false) // Allow out of order messages (use this option unless in order delivery is essential) opts.ConnectTimeout = time.Second // Minimal delays on connect opts.WriteTimeout = time.Second // Minimal delays on writes opts.KeepAlive = 10 // Keepalive every 10 seconds so we quickly detect network outages diff --git a/fvt/docker/runTests.cmd b/fvt/docker/runTests.cmd index 9a738613..7d38e65e 100644 --- a/fvt/docker/runTests.cmd +++ b/fvt/docker/runTests.cmd @@ -1,9 +1,22 @@ @ECHO OFF REM Windows CMD file to run golang Paho tests with docker mosquitto instance cls +:start docker-compose up -d REM Docker for windows does not support publishing to 127.0.0.1 so set the address for the tests to use. set TEST_FVT_ADDR=0.0.0.0 -go test -v ../../ +REM `--count 1` prevents the system from using cached results. Note that running the tests multiple times may fail +REM because the broker state will not be as expected +go test --count 1 -v ../../ rem go test -race -v ../../ -docker-compose down \ No newline at end of file +IF ERRORLEVEL 1 GOTO failed +GOTO successful + +:failed +docker-compose down +echo "Error" +exit + +:successful +docker-compose down +REM goto start \ No newline at end of file diff --git a/options.go b/options.go index eb475918..ee2e71b0 100644 --- a/options.go +++ b/options.go @@ -49,7 +49,8 @@ type OnConnectHandler func(Client) // the initial connection is lost type ReconnectHandler func(Client, *ClientOptions) -// ClientOptions contains configurable options for an Client. +// ClientOptions contains configurable options for an Client. Note that these should be set using the +// relevant methods (e.g. `AddBroker`) rather than directly. See those functions for information on usage. type ClientOptions struct { Servers []*url.URL ClientID string @@ -89,7 +90,7 @@ type ClientOptions struct { // default values. // Port: 1883 // CleanSession: True -// Order: True +// Order: True (note: it is recommended that this be set to FALSE unless order is important) // KeepAlive: 30 (seconds) // ConnectTimeout: 30 (seconds) // MaxReconnectInterval 10 (minutes) @@ -203,10 +204,13 @@ func (o *ClientOptions) SetCleanSession(clean bool) *ClientOptions { } // SetOrderMatters will set the message routing to guarantee order within -// each QoS level. By default, this value is true. If set to false, +// each QoS level. By default, this value is true. If set to false (recommended), // this flag indicates that messages can be delivered asynchronously // from the client to the application and possibly arrive out of order. // Specifically, the message handler is called in its own go routine. +// Note that setting this to true does not guarantee in-order delivery +// (this is subject to broker settings like `max_inflight_messages=1` in mosquitto) +// and if `true` then handlers must not block. func (o *ClientOptions) SetOrderMatters(order bool) *ClientOptions { o.Order = order return o @@ -286,6 +290,11 @@ func (o *ClientOptions) SetBinaryWill(topic string, payload []byte, qos byte, re // SetDefaultPublishHandler sets the MessageHandler that will be called when a message // is received that does not match any known subscriptions. +// +// If `options.OrderMatters` is true (the default) then `defaultHandler` must not block or +// call functions within this package that may block (e.g. `Publish`) other than in +// a new go routine. +// `defaultHandler` must be safe for concurrent use by multiple goroutines. func (o *ClientOptions) SetDefaultPublishHandler(defaultHandler MessageHandler) *ClientOptions { o.DefaultPublishHandler = defaultHandler return o