Skip to content

Synchronous Publisher

Frank Pagliughi edited this page Feb 2, 2018 · 2 revisions

The Rust MQTT library supports two client interfaces, an Asynchronous (non-blocking) API and a Synchronous (blocking) API. The synchronous client is actually just a convenience wrapper around the asynchronous one.

The full source code for this example is at: sync_publish.rs

The synchronous publisher is the simplest of MQTT clients. The steps required are as follows:

  1. Create an instance of a client.
  2. Connect to the server/broker.
  3. Create and publish a message (repeat as many times as necessary)
  4. Disconnect from the broker.

Creating a synchronous client

To create a synchronous client, use mqtt::Client::new(). At a minimum, this requires a URI for the broker, but could also take a full set of creation options. We will show those later, but for now, let's assume that there's a broker running on localhost that we will use. So we can say:

  let cli = mqtt::Client::new("tcp://localhost:1883").unwrap();

The MQTT URI is in the form of <protocol>://<host>:<port>, where the protocol is either tcp for a standard connection or ssl for a secure one. The host can be specified as an IP address or domain name, and the port is an integer number, where 1883 is the standard IANA port number for TCP connections, and 8883 is standard for SSL connections.

Client::new returns a Rust Result object to indicate success or failure of the call. The call doesn't perform any network I/O, it just sets up internal data structures, so is unlikely to fail. In that case we can unwrap the result to get the client object. More complex clients that use persistence could fail, so we might want to get in the habit of checking the return for errors and handling them - or at least reporting a clear error message, like:

let cli = mqtt::Client::new("tcp://localhost:1883").unwrap_or_else(|err| {
    println!("Error creating the client: {:?}", err);
    process::exit(1);
});

Connecting to the broker

Once we have a client we can use it to connect to the broker using the connect function. Like new this can take a set of options, but for now we can use the defaults by specifying the Rust Option enumeration value of None:

if let Err(err) = cli.connect(None) {
    println!("Unable to connect: {:?}", err);
    process::exit(1);
}

This is a blocking network call, and won't return until the connection has succeeded or failed. It can fail for all the normal reasons that network connections fail: the server is down, it's unreachable, the name can't be resolved, there are security restrictions, etc. So it is imperative that a failed connection is handled properly. In a real system, it could be retried for a while. Here we just report the error and exit the application.

Since connect doesn't return anything useful on success (just the unit value), we only pattern match for the error and use it to report the failure to the user.

Creating and publishing messages

When the client is connected we can start creating and publishing messages. An MQTT message has four fields that can be set by the application:

  • Topic - This is a UTF-8 string which the subscriber can match to receive messages.
  • Payload - Any arbitrary binary buffer.
  • QoS - The quality of service 0, 1, or 2.
  • Retained - Whether the broker should hold the last message published on the topic.

Messages can be created with Message::new() (non-retained) and Message::new_retained() (retained) like this:

let msg = mqtt::Message::new("test", "Hello world!", 1);

The creates a message with the topic "test", a payload, "Hello world!", and a QoS of one. In this case, the payload is text, but could be any arbitrary binary which in this Rust client is anything convertible to Vec<8>.

Once we have a message, it can be published to the broker using the client:

if let Err(e) = cli.publish(msg) {
    println!("Error sending message: {:?}", e);
}

With the synchronous client, this is a blocking call and won't return until the message is sent and acknowledged by the server according to the Quality of Service requested. In the case of QoS=0, the function simply queues the message for output and returns, possibly before the message is transmitted. But for QoS=1 or 2, the function will block until the message is transmitted and the proper acknowledgement(s) are received from the broker.

Disconnecting

When all the messages have been sent, the connection to the server can be closed cleanly with a call to one of the disconnect functions, Client::disconnect() or Client::disconnect_after(Duration). In the case of the synchronous client, we know that all QoS=1 or 2 messages have been sent and acknowledged, since the publish() calls block until this is done. But QoS=0 messages may still be queued up waiting for transmission.

Since these are only waiting on outbound transmission, and aren't waiting for acknowledgement from the server, an unbound wait for client shutdown is generally appropriate for the synchronous client:

cli.disconnect(None).unwrap();

This blocks until any remaining outbound messages are sent, then cleanly ends the client session. If a Last Will and Testament message was registered with the server it will be discarded without publication. The underlying network (TCP) connection is also shut down.

Clone this wiki locally