The zeroMQ guide states: zeroMQ looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry atomic messages across various transports like in-process, inter-process, TCP, and multicast.
zeroMQ provides a large number of design patterns to accommodate a variety of use cases.
This project attepmts to use zeroMQ to provide a reactive bridge between distinct environments, i.e. it provide back-pressure over zeroMQ while also providing a reactive sink and source on either side of the wire.
Dependencies for using rx_zmq_streams in your code:
resolvers += "zenaptix at bintray" at "http://dl.bintray.com/zenaptix/rx_zmq_streams"
libraryDependencies += "com.zenaptix" %% "rx_zmq_streams" % "0.4.1"
This application is a somewhat heuristic implementation of Reactive Streams utilizing zeroMQ as transport layer
It creates a ReactiveZeroMQ Akka Extension that provides a:
- pubSink that will transmit its data from an Akka Stream onto zeroMQ
- subSource that will receive data over zeroMQ and stream it out to a subsequent Akka Stream
- a Balanced pubSink that load balances the stream from a source to any number of subscribers
- a Broadcast pubSink that duplicates the stream from a source to any number of subscribers
- the pubSink and subSource are implementing the Reactive Streams protocol and as such allows for a back-pressure on the zeroMQ transmission
It uses zeroMQ Pub/Sub socket as the transport for a Reactive Streams implementation with Akka Streams.
Akka Streams is an implementation of Reactive Streams
zeroMQ is a Socket Library disguised as a concurrency framework. "Sockets on steroids"
The project depends on either jeromq or jzmq
"org.zeromq" % "jzmq" % "3.1.0"
"org.zeromq" % "jeromq" % "0.3.5"
A pubSink is acquired from the RxZMQExtension and is used as a sink at the end of a Akka Stream. All the messages will be forwarded over zeroMQ to a subscriber (subSource)
A subSource is acquired from the RxZMQExtension and is used as a source at the start of a Akka Stream. All the messages received from the zeroMQ socket is pushed onto the Akka Stream.
A good place to start is with the RxZMQExtensionSpec. Below is a trivial example running a connected processes.
Connecting over zeroMQ
import com.zenaptix.reactive.RxZMQExtension
import akka.actor.{Props, ActorSystem}
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl.{Sink, Source}
import akka.util.Timeout
import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.scalalogging.LazyLogging
import zeromq.Message
import scala.concurrent.duration._
class RxZMQExtensionExample {
implicit val timeout: Timeout = Timeout(5 seconds)
implicit val materializer = ActorFlowMaterializer()
val conf = ConfigFactory.load()
def rxZmq = RxZMQExtension(system)
val sourcePath = conf.getString("gateway.source_path")
val file = new File(sourcePath)
// publish the messages from the file
SynchronousFileSource(file,13).map(b => {
println(s"[SOURCE] -> ${b.decodeString("UTF8")}")
Message(b)
}).runWith(rxZmq.pubSink())
// receive the messages from the file
var i = 0
val subSource = rxZmq.subSource("0.0.0.0:5556")
subSource.map(
m => {
i += 1
println(s"[SINK]] <- ${m.part.utf8String}")
}).runWith(Sink.ignore)
}
To run the above code - cd to the directory where the project was cloned and execute:
export RX_ZMQ_INSTALL_DIR=`pwd`
sbt test
</pre>
The Reactive Publisher
package com.zenaptix.reactive
import java.io.File
import akka.actor.ActorSystem
import akka.stream.ActorFlowMaterializer
import akka.stream.io.SynchronousFileSource
import com.typesafe.config.ConfigFactory
import com.typesafe.scalalogging.LazyLogging
object ReactivePublisher extends App with LazyLogging {
implicit val system = ActorSystem("publisher")
implicit val materializer = ActorFlowMaterializer()
lazy val log = system.log
def rxZmq = RxZMQExtension(system)
val conf = ConfigFactory.load()
val sourcePath = conf.getString("gateway.source_path")
val file = new File(sourcePath)
SynchronousFileSource(file,13).map(b => {
logger.debug(s"[SOURCE] -> ${b.decodeString("UTF8")}")
Message(b)
}).runWith(rxZmq.pubSink())
system.awaitTermination()
}
Running
sbt
> project publisher
> run
The Reactive Subscriber
package com.zenaptix.reactive
import akka.actor.ActorSystem
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl.Sink
import com.typesafe.config.ConfigFactory
import com.typesafe.scalalogging.LazyLogging
object ReactiveSubscriber extends App with LazyLogging {
implicit val system = ActorSystem("subscriber")
implicit val materializer = ActorFlowMaterializer()
def rxZmq = RxZMQExtension(system)
val conf = ConfigFactory.load()
val conn = s"${conf.getString("zeromq.host")}:${conf.getInt("zeromq.port")}")
rxZmq.subSource(conn).map(m =>
logger.debug(s"SINK <- ${m.part.decodeString("UTF8")}")).
runWith(Sink.ignore)
}
Running:
sbt
> project subscriber
> run
</pre>
The Publisher
On OSX first install and run boot2docker:
export RX_ZMQ_INSTALL_DIR=`pwd`
sbt
> project publisher
> docker:publishLocal
> exit
docker run -v $RX_ZMQ_INSTALL_DIR/data:/data -p 5556:5556 -p 5557:5557 \
-d --name rx_zmq_publisher com.zenaptix/rx_zmq_publisher:v0.4.1
The Subscriber
and again:
sbt
project subscriber
> docker:publishLocal
> exit
docker run -i -t --name rx_zmq_subscriber com.zenaptix:rx_zmq_subscriber:v0.4.1 -Dzeromq.host=$RXZMQ_PUBLISHER_IP
The Logs then view the publisher log:
docker logs rx_zmq_publisher
rx_zmq_streams is implemented using a Req/Rep channel for exchanging control messages, and a pub/sub channel for receiving the actual data.