Scala version: | 2.10.2 |
Akka version: | 2.2.0 |
- Overview
- Terminology
- First steps
- Stackable traits
- Sender references
- Channels
- Recovery
- Snapshots
- Behavior changes
- Event series
- Idempotency
- Serialization
- Further examples
- Miscellaneous
- Appendix A: Legend
- Appendix B: Project
- Appendix C: Articles
- Appendix D: Support
The Eventsourced library adds scalable actor state persistence and at-least-once message delivery guarantees to Akka. With Eventsourced, stateful actors
- persist received messages by appending them to a log (journal)
- project received messages to derive current state
- usually hold current state in memory (memory image)
- recover current (or past) state by replaying received messages (during normal application start or after crashes)
- never persist current state directly (except optional state snapshots for recovery time optimization)
In other words, Eventsourced implements a write-ahead log (WAL) that is used to keep track of messages an actor receives and to recover its state by replaying logged messages. Appending messages to a log instead of persisting actor state directly allows for actor state persistence at very high transaction rates and supports efficient replication. In contrast to other WAL-based systems, Eventsourced usually keeps the whole message history in the log and makes usage of state snapshots optional.
Logged messages represent intended changes to an actor's state. Logging changes instead of updating current state is one of the core concept of event sourcing. Eventsourced can be used to implement event sourcing concepts but it is not limited to that. More details about Eventsourced and its relation to event sourcing can be found at How does Eventsourced persist actor state and how is this related to event sourcing.
Eventsourced can also be used to make message exchanges between actors reliable so that they can be resumed after crashes, for example. For that purpose, channels with at-least-once message delivery guarantees are provided. Channels also prevent that output messages, sent by persistent actors, are redundantly delivered during replays which is relevant for message exchanges between these actors and other services.
The core building blocks provided by Eventsourced are processors, channels and journals. These are managed by an Akka extension, the EventsourcingExtension
.
A processor is a stateful actor that logs (persists) messages it receives. A stateful actor is turned into a processor by modifying it with the stackable Eventsourced
trait during construction. A processor can be used like any other actor.
Messages wrapped inside Message
are logged by a processor, unwrapped messages are not logged. Wrapped messages are often referred to as events in this user guide. Wrapped messages can also be commands, as explained in section Application.
Logging behavior is implemented by the Eventsourced
trait, a processor's receive
method doesn't need to care about that. Acknowledging a successful write to a sender can be done by sending a reply. A processor can also hot-swap its behavior by still keeping its logging functionality.
Processors are registered at an EventsourcingExtension
. This extension provides methods to recover processor state by replaying logged messages. Processors can be registered and recovered at any time during an application run.
Eventsourced doesn't impose any restrictions how processors maintain state. A processor can use vars, mutable data structures or STM references, for example.
Channels are used by processors for sending messages to other actors (channel destinations) and receiving replies from them. Channels
- require their destinations to confirm the receipt of messages for providing at-least-once delivery guarantees (explicit ack-retry protocol). Receipt confirmations are written to a log.
- prevent redundant delivery of messages to destinations during processor recovery (replay of messages). Replayed messages with matching receipt confirmations are dropped by the corresponding channels.
A channel itself is an actor that decorates a destination with the aforementioned functionality. Processors usually create channels as child actors for decorating destination actor references.
A processor may also sent messages directly to another actor without using a channel. In this case that actor will redundantly receive messages during processor recovery.
Eventsourced provides three different channel types (more are planned).
- Default channel
- Does not store received messages.
- Re-delivers uncomfirmed messages only during recovery of the sending processor.
- Order of messages as sent by a processor is not preserved in failure cases.
- Reliable channel
- Stores received messages.
- Re-delivers unconfirmed messages based on a configurable re-delivery policy.
- Order of messages as sent by a processor is preserved, even in failure cases.
- Often used to deal with unreliable remote destinations.
- Can recover from crashes of the JVM it is running in.
- Reliable request-reply channel
- Same as reliable channel but additionally guarantees at-least-once delivery of replies.
- Order of replies not guaranteed to correspond to the order of sent request messages.
Eventsourced channels are not meant to replace any existing messaging system but can be used, for example, to reliably connect processors to such a system, if needed. More generally, they are useful to integrate processors with other services, as described in this article.
A journal is an actor that is used by processors and channels to log messages and receipt confirmations. The quality of service (availability, scalability, ...) provided by a journal depends on the used storage technology. The Journals section below gives an overview of existing journal implementations and their development status.
The Eventsourced library doesn't impose any restrictions on the structure and semantics of Message
payloads. Hence, persistent messages can therefore be events as well as commands. Both can be seen as facts about the interaction of an application with its environment. This is demonstrated in the Eventsourced reference application which persists events as well as commands. This also simplifies the implementation of long-running, persistent business processes (sometimes referred to as sagas). These are processors that react on events by sending commands to other services which can be other processors or external services, for example.
Eventsourced fits well into applications that implement the CQRS pattern and follow a domain-driven design (DDD) (see reference application). On the other hand, the library doesn't force applications to do so and allows them to implement event-sourcing (and/or command-sourcing) without CQRS and/or DDD.
For persisting messages, Eventsourced currently provides the following journal implementations:
Journal | Usage |
---|---|
LevelDB journal. It can be configured either with a native LevelDB (accessed via leveldbjni) or a LevelDB Java port as storage backend. Running a native LevelDB from sbt requires special settings. All examples in this user guide use the LevelDB Java port. | Production |
HBase journal. An HBase backed journal supporting high-availability, horizontal read and write scalability, concurrent and non-blocking reads and writes. Details here. | Experimental |
MongoDB Casbah based journal. A MongoDB backed journal. Details here. Thanks to Duncan DeVore. | Experimental |
MongoDB Reactive based journal. A MongoDB backed journal. Details here. Thanks to Duncan DeVore. | Experimental |
DynamoDB journal. A DynamoDB backed journal. Details here. Thanks to Scott Clasen. | Experimental |
Journal.IO journal. Journal.IO backed journal for testing purposes. Messages are persisted. | Testing |
In memory journal. An in-memory journal for testing purposes. Messages are not persisted. | Testing |
In the following, the terms persistent actor, event-sourced actor, event-sourced processor and processor are used interchangeably. Furthermore, a Message
is often referred to as event message.
This section guides through the minimum steps required to create, use and recover an event-sourced actor and demonstrates the usage of channels. Code from this section is contained in FirstSteps.scala and FirstSteps.java. It can be executed from the sbt prompt with
Scala:
> project eventsourced-examples
> run-main org.eligosource.eventsourced.guide.FirstSteps
Java:
> project eventsourced-examples
> run-main org.eligosource.eventsourced.guide.japi.FirstSteps
The example in this section and all further examples use a journal that is backed by a LevelDB Java port. For running a native LevelDB instance from sbt, additional settings are required. A legend to the figures used in this and other sections is in Appendix A.
EventsourcingExtension
is an Akka extension provided by the Eventsourced library. It is used by applications to
- create and register event-sourced actors (called processors or event processors)
- create and register channels
- recover registered processors and channels from journaled event messages
An EventsourcingExtension
is initialized with an ActorSystem
and a journal ActorRef
.
Scala:
import java.io.File
import akka.actor._
import org.eligosource.eventsourced.core._
import org.eligosource.eventsourced.journal.leveldb._
val system: ActorSystem = ActorSystem("example")
val journal: ActorRef = LeveldbJournalProps(new File("target/example-1"), native = false).createJournal
val extension: EventsourcingExtension = EventsourcingExtension(system, journal)
Java:
import java.io.File;
import akka.actor.*;
import org.eligosource.eventsourced.core.*;
import org.eligosource.eventsourced.journal.leveldb.*;
final ActorSystem system = ActorSystem.create("guide");
final ActorRef journal = LeveldbJournalProps.create(new File("target/guide-1-java")).withNative(false).createJournal(system);
final EventsourcingExtension extension = EventsourcingExtension.create(system, journal);
This example uses a LevelDB journal but any other journal implementation can be used as well.
With the Scala API, event-sourced actors can be defined as 'plain' actors. With the Java API, event-sourced actors need to extend the abstract UntypedEventsourcedActor
class. For example,
Scala:
class Processor extends Actor {
var counter = 0
def receive = {
case msg: Message => {
counter = counter + 1
println("[processor] event = %s (%d)" format (msg.event, counter))
}
}
}
Java:
public class Processor extends UntypedEventsourcedActor {
private int counter = 0;
@Override
public int id() {
return 1;
}
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof Message) {
Message msg = (Message)message;
counter = counter + 1;
System.out.println(String.format("[processor] event = %s (%d)", msg.event(), counter));
}
}
}
is an actor that counts the number of received event Message
s. In Eventsourced applications, events are always communicated (transported) via event Message
s.
To make the Scala Processor
an event-sourced actor, it must be modified with the stackable Eventsourced
trait during instantiation. The Java Processor
already extends UntypedEventsourcedActor
class, so no further modification is needed.
Scala:
// create and register event-sourced processor
val processor: ActorRef = extension.processorOf(Props(new Processor with Eventsourced { val id = 1 } ))
// recover registered processors by replaying journaled events
extension.recover()
Java:
// create and register event-sourced processor
final ActorRef processor = extension.processorOf(Props.create(Processor.class), system);
// recover registered processors by replaying journaled events
extension.recover();
An actor that is modified with Eventsourced
(or extends UntypedEventsourcedActor
) writes event Message
s to a journal before its receive
method (or onReceive
method, respectively) is called. The processorOf
method registers that actor under a unique id
. The processor id
is defined by implementing the abstract Eventsourced.id
member which must be a positive integer and consistently re-used across applications runs. The recover
method recovers the state of processor
by replaying all event messages that processor
received in previous application runs.
The event-sourced processor
can be used like any other actor. Messages of type Message
are written to the journal, messages of any other type are directly received by processor
without being journaled.
Scala:
// send event message to processor (will be journaled)
processor ! Message("foo")
Java:
// send event message to processor (will be journaled)
processor.tell(Message.create("foo"), null);
A first application run will create an empty journal. Hence, no event messages will be replayed and the processor
writes
[processor] event = foo (1)
to stdout
. When the application is restarted, however, the processor
's state will be recovered by replaying the previously journaled event message. Then, the application sends another event message. You will therefore see
[processor] event = foo (1)
[processor] event = foo (2)
on stdout
where the first println
is triggered by a replayed event message.
In this step, the event-sourced processor
is extended to send out new event messages to a destination
. It creates another event message (by making a copy of the received event message) with an updated event
field and sends the updated message to destination
.
Scala:
class Processor(destination: ActorRef) extends Actor {
var counter = 0;
def receive = {
case msg: Message => {
counter = counter + 1
// …
destination ! msg.copy(event = "processed %d event messages so far" format counter)
}
}
}
val destination: ActorRef = system.actorOf(Props[Destination])
// instantiate processor by passing the destination as constructor argument
val processor: ActorRef = extension.processorOf(Props(new Processor(destination) with Eventsourced { val id = 1 } ))
extension.recover()
Java:
public class Processor extends UntypedEventsourcedActor {
private ActorRef destination;
private int counter = 0;
public Processor(ActorRef destination) {
this.destination = destination;
}
@Override
public int id() {
return 1;
}
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof Message) {
Message msg = (Message)message;
counter = counter + 1;
// …
destination.tell(msg.withEvent(String.format("processed %d event messages so far", counter)), getSelf());
}
}
}
final ActorRef destination = system.actorOf(Props.create(Destination.class));
// instantiate processor by passing the destination as constructor argument
final ActorRef processor = extension.processorOf(Props.create(Processor.class, destination), system);
extension.recover();
Without any further actions, this would also send event messages to destination
during recovery (i.e. during replay of event messages). With every application restart, destination
would redundantly receive the whole event message history again and again. This is not acceptable in most cases, such as when destination
represents an external service, for example.
To prevent redundant message delivery to destination
we need something that remembers which messages have already been successfully delivered. This is exactly the use case for channels. A channel drops all messages that have already been successfully delivered to a destination. We therefore wrap destination
by a channel and let the processor communicate with the destination via that channel. This can be done without changing the code of Processor
.
Scala:
val destination: ActorRef = system.actorOf(Props[Destination])
// wrap destination by channel
val channel: ActorRef = extension.channelOf(DefaultChannelProps(1, destination))
// instantiate processor by passing the channel (i.e. wrapped destination) as constructor argument
val processor: ActorRef = extension.processorOf(Props(new Processor(channel) with Eventsourced { val id = 1 } ))
Java:
final ActorRef destination = system.actorOf(Props.create(Destination.class));
// wrap destination by channel
final ActorRef channel = extension.channelOf(DefaultChannelProps.create(1, destination), system);
// instantiate processor by passing the channel (i.e. wrapped destination) as constructor argument
final ActorRef processor = extension.processorOf(Props.create(Processor.class, channel), system);
A channel must have a unique id (1
in our example), a positive integer that must be consistently defined across application runs. Here, we create a default channel that is configured with a DefaultChannelProps
configuration object. If applications need reliable event message delivery to destinations, they should use a reliable channel that is configured with a ReliableChannelProps
configuration object.
Assuming the following definition of a Destination
actor
Scala:
class Destination extends Actor {
def receive = {
case msg: Message => {
println("[destination] event = '%s'" format msg.event)
// confirm receipt of event message from channel
msg.confirm()
}
}
}
Java:
public class Destination extends UntypedActor {
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof Message) {
Message msg = (Message)message;
System.out.println(String.format("[destination] event = %s", msg.event()));
msg.confirm(true);
}
}
}
and that we're starting again from an empty journal, you should see
[processor] event = foo (1)
[destination] event = 'processed 1 event messages so far'
on stdout
during a first application run. When running the application again, you'll see that the event-sourced processor
receives the complete event message history but the destination
only receives the last event message produced by processor
(which corresponds the the single event message sent to processor
during the current application run):
[processor] event = foo (1)
[processor] event = foo (2)
[destination] event = 'processed 2 event messages so far'
When receiving event messages from a channel, destinations must confirm the receipt of that message by calling Message.confirm()
which asynchronously writes a confirmation (an acknowledgement) to the journal that the message has been successfully delivered. Later, you'll also see how confirmation functionality can be added to destinations with the stackable Confirm
trait.
This First steps guide is a rather low-level introduction to the Eventsourced library. More advanced library features are covered in the following sections.
The Eventsourced
trait has already been discussed in section First steps. It can be combined with the stackable Receiver
, Emitter
and/or Confirm
traits where the Eventsourced
trait must always be the last modification i.e.
Scala:
new MyActor with Receiver with Confirm with Eventsourced
Java:
public class MyActor extends UntypedEventsourcedConfirmingReceiver
The Eventsourced Java API provides some predefined combinations of stackable traits as abstract base classes. For example, UntypedEventsourcedConfirmingReceiver
is defined as
abstract class UntypedEventsourcedReceiver extends UntypedActor with Receiver with Confirm with Eventsourced
Other predefined combinations of stackable traits in the Java API are described in the following subsections. Refer to the Untyped*
abstract classes in the API docs for all predefined combinations.
An actor that receives event Message
s often wants to pattern-match against the contained event
directly instead of the whole event message. This can be achieved by modifying it with the Receiver
trait during instantiation (Scala API) or extending the abstract UntypedReceiver
class (Java API).
Scala:
class MyActor extends Actor {
def receive = {
case event => println("received event %s" format event)
}
}
val myActor = system.actorOf(Props(new MyActor with Receiver))
myActor ! Message("foo")
Java:
public class MyActor extends UntypedReceiver {
@Override
public void onReceive(Object event) throws Exception {
System.out.println(String.format("received event = %s", event));
}
}
final ActorRef myActor = system.actorOf(Props.create(MyActor.class));
myActor.tell(Message.create("foo"), null);
In the above example, sending Message("foo")
to myActor
will write received event foo
to stdout
. The Receiver
trait stores the received event message as current event message in a field, extracts the contained event
from that message and calls the receive
(or onReceive
) method of MyActor
with event
as argument. If MyActor
wants to have access to the current event message it must be defined with a Receiver
self-type and call the message
method (Scala API) or just call the message()
method (Java API).
Scala:
class MyActor extends Actor { this: Receiver =>
def receive = {
case event => {
// obtain current event message
val currentMessage = message
// …
println("received event %s" format event)
}
}
}
Java:
public class MyActor extends UntypedReceiver {
@Override
public void onReceive(Object event) throws Exception {
// obtain current event message
Message currentMessage = message();
// …
System.out.println(String.format("received event = %s", event));
}
}
The Receiver
trait can also be combined with the stackable Eventsourced
and/or Confirm
traits where Receiver
must always be the first modification. For example:
Scala:
new MyActor with Receiver with Eventsourced
Java:
public class MyActor extends UntypedEventsourcedReceiver
Refer to the API docs for further details.
Where a Receiver
modification allows actors to pattern-match against incoming events directly instead of whole event Message
s, an Emitter
introduces a corresponding simplification on the sending (outgoing) side. It allows actors to send (emit) events to channels without having to deal with whole event Message
s. An emitter can also lookup channels by name (or id, see below).
Scala:
class MyActor extends Actor { this: Emitter =>
def receive = {
case event => {
// emit event to channel "myChannel"
emitter("myChannel") sendEvent ("received: %s" format event)
}
}
}
// create register channel under name "myChannel"
extension.channelOf(DefaultChannelProps(1, destination).withName("myChannel"))
val myActor = system.actorOf(Props(new MyActor with Emitter))
Java:
public class MyActor extends UntypedEmitter {
@Override
public void onReceive(Object event) throws Exception {
// emit event to channel "myChannel"
emitter("myChannel").sendEvent(String.format("received: %s", event), getSelf());
}
}
// create register channel under name "myChannel"
extension.channelOf(DefaultChannelProps.create(1, destination).withName("myChannel"), system);
final ActorRef myActor = extension.processorOf(Props.create(MyActor.class), system);
Event messages sent by an emitter to a channel are always derived from (i.e. are a copy of) the current event message (an Emitter
is also Receiver
and maintains a current event message, see also section Receiver). A call to the emitter
method with a channel name as argument creates a MessageEmitter
object that captures the named channel and the current event message. Calling sendEvent
on that object modifies the captured event message with the specified event argument and sends the updated event message to the channel (see also channel usage hints). A MessageEmitter
object can also be sent to other actors (or threads) and be used there i.e. a MessageEmitter
object is thread-safe. Channels can also be referred to by id when creating a MessageEmitter
i.e. there's no need to define a custom channel name:
Scala:
class MyActor extends Actor { this: Emitter =>
def receive = {
case event => {
// emit event to channel with id 1
emitter(1) sendEvent ("received: %s" format event)
}
}
}
// create register channel
extension.channelOf(DefaultChannelProps(1, destination))
Java:
public class MyActor extends UntypedEmitter {
@Override
public void onReceive(Object event) throws Exception {
// emit event to channel with id 1
emitter(1).sendEvent(String.format("received: %s", event), getSelf());
}
}
// create register channel
extension.channelOf(DefaultChannelProps.create(1, destination), system);
The Emitter
trait can also be combined with the stackable Eventsourced
and/or Confirm
traits where Emitter
must always be the first modification. For example:
Scala:
new MyActor with Emitter with Eventsourced
Java:
public class MyActor extends UntypedEventsourcedEmitter
Refer to the API docs for further details.
The receipt of event messages from channels must be confirmed by calling confirm()
or confirm(true)
on the received event Message
. Applications can also negatively confirm an event message receipt by calling confirm(false)
. This, for example, causes a reliable channel to redeliver the event message.
Instead of calling confirm(true)
or confirm(false)
explicitly, actors can also be modified with the stackable Confirm
trait. This trait calls confirm(true)
on the received event message when the modified actor's receive
(or onReceive
) method returns normally and confirm(false)
when it throws an exception.
This trait can either be used standalone
Scala:
new MyActor with Confirm
Java:
public class MyActor extends UntypedConfirmingActor
or in combination with the stackable Receiver
, Emitter
and/or Eventsourced
traits where the Confirm
modification must be made after a Receiver
or Emitter
modification but before an Eventsourced
modification. For example:
Scala:
new MyActor with Receiver with Confirm with Eventsourced
Java:
public class MyActor extends UntypedEventsourcedConfirmingReceiver
Refer to the API docs for further details.
This section modifies (and simplifies) the example from section First steps by making use of the stackable Receiver
, Emitter
and Confirm
traits. In particular, for the Scala API
Processor
will be modified withEmitter
(in addition toEventsourced
)Destination
will be modified withReceiver
andConfirm
For the Java API
Processor
will extendUntypedEventsourcedEmitter
Destination
will extendUntypedConfirmingReceiver
Code from this section is contained in StackableTraits.scala and StackableTraits.java. It can be executed from the sbt prompt with
Scala:
> project eventsourced-examples
> run-main org.eligosource.eventsourced.guide.StackableTraits
Java:
> project eventsourced-examples
> run-main org.eligosource.eventsourced.guide.japi.StackableTraits
The new Scala definition of Processor
now has a self-type Emitter
and pattern-matches against events directly. The Java definition of Processor
extends UntypedEventsourcedEmitter
and also receives events directly (instead of Message
s).
Scala:
class Processor extends Actor { this: Emitter =>
var counter = 0
def receive = {
case event => {
counter = counter + 1
println("[processor] event = %s (%d)" format (event, counter))
emitter("destination") sendEvent ("processed %d events so far" format counter)
}
}
}
Java:
public class Processor extends UntypedEventsourcedEmitter {
private int counter = 0;
@Override
public int id() {
return 1;
}
@Override
public void onReceive(Object event) throws Exception {
counter = counter + 1;
System.out.println(String.format("[processor] event = %s (%d)", event, counter));
emitter("destination").sendEvent(String.format("processed %d event messages so far", counter), getSelf());
}
}
Instead of passing the channel via the constructor it is now looked-up by name ("destination"
). The channel name is specified during channel creation.
Scala:
extension.channelOf(DefaultChannelProps(1, destination).withName("destination"))
Java:
extension.channelOf(DefaultChannelProps.create(1, destination).withName("destination"), system);
The Scala Processor
must be instantiated with an additional Emitter
modification to conform to the Processor
self-type. No further modification is needed for the Java Processor
.
Scala:
val processor: ActorRef = extension.processorOf(Props(new Processor with Emitter with Eventsourced { val id = 1 } ))
Java:
final ActorRef processor = extension.processorOf(Props.create(Processor.class), system);
The new definition of Destination
Scala:
class Destination extends Actor {
def receive = {
case event => {
println("[destination] event = '%s'" format event)
}
}
}
Java:
public class Destination extends UntypedConfirmingReceiver {
@Override
public void onReceive(Object event) throws Exception {
System.out.println(String.format("[destination] event = %s", event));
}
}
pattern-matches against events directly and leaves event message receipt confirmation to the Confirm
trait. The Scala Destination
must be instantiated with a Receiver
and a Confirm
modification and again, no further modification is needed for the Java Destination
.
Scala:
val destination: ActorRef = system.actorOf(Props(new Destination with Receiver with Confirm))
Java:
final ActorRef destination = system.actorOf(Props.create(Destination.class));
The Eventsourced library preserves sender references for all
- message exchanges with actors that are modified with
Eventsourced
,Receiver
,Emitter
and/orConfirm
(or extend any of the abstractUntyped*
base classes from the Java API) and - message exchanges with destination actors via channels
i.e. event-sourced actor applications can make use of sender references in the same way as plain actor applications. If you know how sender references work with Akka actors, the following will sound familiar to you.
For example, taking the code from section First steps as a starting point, Processor
can be extended to reply to message senders as follows.
Scala:
class Processor(destination: ActorRef) extends Actor {
// …
def receive = {
case msg: Message => {
// …
// reply to sender
sender ! ("done processing event = %s" format msg.event)
}
}
}
Java:
public class Processor extends UntypedEventsourcedActor {
// …
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof Message) {
// …
getSender().tell(String.format("done processing event = %s", msg.event()), getSelf());
}
}
}
Applications can now ask the processor
and will get a response asynchronously.
Scala:
processor ? Message("foo") onSuccess {
case response => println(response)
}
Java:
ask(processor, Message.create("foo"), 5000L).onSuccess(new OnSuccess<Object>() {
@Override
public void onSuccess(Object response) throws Throwable {
System.out.println(response);
}
}, system.dispatcher());
No surprise here. The sender reference in this example represents the future that is returned from the ?
or ask
method call. But what happens during a replay? During a replay, the sender reference will be deadLetters
because Eventsourced
processors don't store sender references in the journal. The main reason for this is that applications usually do not want to redundantly reply to senders during replays.
Instead of replying to the sender, the processor can also forward the sender reference to a destination and let the destination reply to the sender. This even works if the destination is wrapped by a channel because a channel simply forwards sender references when delivering event messages to destinations. For that reason, a ReliableChannel
needs to store sender references (in contrast to processors), so that sender references are even available after a reliable channel has been restarted. If a stored sender reference is a remote reference, it remains valid even after recovery from a JVM crash (i.e. a crash of the JVM the reliable channel is running in) provided the remote sender is still available.
Scala:
class Processor(destination: ActorRef) extends Actor {
var counter = 0
def receive = {
case msg: Message => {
// …
// forward modified event message to destination (together with sender reference)
destination forward msg.copy(event = "processed %d event messages so far" format counter)
}
}
}
class Destination extends Actor {
def receive = {
case msg: Message => {
// …
// reply to sender
sender ! ("done processing event = %s (%d)" format msg.event)
}
}
}
val destination: ActorRef = system.actorOf(Props[Destination])
val channel: ActorRef = extension.channelOf(DefaultChannelProps(1, destination))
val processor: ActorRef = extension.processorOf(Props(new Processor(channel) with Eventsourced { val id = 1 } ))
Java:
public class Processor extends UntypedEventsourcedActor {
private ActorRef destination;
private int counter = 0;
public Processor(ActorRef destination) {
this.destination = destination;
}
@Override
public int id() {
return 1;
}
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof Message) {
Message msg = (Message)message;
// forward modified event message to destination (together with sender reference)
destination.forward(msg.withEvent(String.format("processed %d event messages so far", counter)), getContext());
}
}
}
public class Destination extends UntypedActor {
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof Message) {
Message msg = (Message)message;
// …
// reply to sender
getSender().tell(String.format("done processing event = %s", msg.event()), getSelf());
}
}
}
final ActorRef destination = system.actorOf(Props.create(Destination.class));
final ActorRef channel = extension.channelOf(DefaultChannelProps.create(1, destination), system);
final ActorRef processor = extension.processorOf(Props.create(Processor.class, channel), system);
When using a MessageEmitter
(see also section Emitter) applications can choose between methods sendEvent
and forwardEvent
where sendEvent
takes an (implicit) sender reference as parameter and forwardEvent
forwards the current sender reference. They work in the same way as the tell
(!
) and forward
methods on ActorRef
, respectively.
Code from this section is contained in SenderReferences.scala and SenderReferences.java. It can be executed from the sbt prompt with
Scala:
> project eventsourced-examples
> run-main org.eligosource.eventsourced.guide.SenderReferences
Java:
> project eventsourced-examples
> run-main org.eligosource.eventsourced.guide.japi.SenderReferences
A channel is an actor that keeps track of successfully delivered event messages. Channels are used by event-sourced actors (processors) to prevent redundant message delivery to destinations during event message replay. Some channels can also be used standalone i.e. independent of event-sourced actors.
A use case for channels is described in section External Updates of Martin Fowler's Event Sourcing article. A getting-started example is given in section Channel usage of the First steps guide
Currently, the library provides two different channel implementations, DefaultChannel
and ReliableChannel
, and a pattern on top of ReliableChannel
, a reliable request-reply channel. These are explained in the following subsections.
A default channel is a transient channel that delivers event messages to a destination actor. When the destination confirms the delivery of an event message by calling either confirm()
or confirm(true)
on the received Message
object, a confirmation (an acknowledgement) is asynchronously written to the journal. During a replay, event messages for which a confirmation exists won't be delivered again to the destination.
Event messages that are negatively confirmed by the destination (via a call to confirm(false)
on the received event message) will be re-delivered during the next event message replay. This is also the case for event messages for which no confirmation has been made. Therefore, in cases of negative or missing confirmations, the order of event messages received by a destination from a default channel may differ from the order of event messages produced by an event-sourced processor.
A DefaultChannel
is created and registered at an EventsourcingExtension
as follows.
val extension: EventsourcingExtension = …
val destination: ActorRef = …
val channelId: Int = …
val channel: ActorRef = extension.channelOf(DefaultChannelProps(channelId, destination))
The channelId
must be a positive integer and consistently defined across application runs. The map of registered channels can be obtained via the channels
method of EventsourcingExtension
which returns a map of type Map[Int, ActorRef]
where the mapping key is the channel id. Channels can optionally be registered under a custom name (see also section Emitter).
// …
val channelId: Int = …
val channelName: String = …
val channel: ActorRef = extension.channelOf(DefaultChannelProps(channelId, destination).withName(channelName))
The map of registered named channels can be obtained via the namedChannels
method which returns a map of type Map[String, ActorRef]
where the mapping key is the channel name.
A default channel preserves sender references. Applications can therefore use ?
and forward
as well to communicate with channel destinations via channel actor refs. However, special care must be taken when using ?
: replayed messages that have already been confirmed by a channel destination will be ignored by the corresponding channel and the destination cannot reply. Hence, the sender will see a reply timeout. This can be avoided by finding out in advance if the channel will ignore a message or not. Applications do that by investigating the Message.acks
list. If the channel's id is contained in that list, the message will be ignored and should not be used for asking.
val channelId: Int = …
val channel: ActorRef = …
if (!message.acks.contains(channelId)) channel ? message.copy(…) onComplete {
case result => …
}
See also usage-hints regarding message.copy(…)
.
A reliable channel is a persistent channel that writes event messages to a journal before delivering them to a destination actor. In contrast to a default channel, a reliable channel preserves the order of messages as produced by an event-sourced processor and attempts to re-deliver event messages on destination failures. Therefore, a reliable channel enables applications to recover from temporary destination failures without having to run an event message replay. Furthermore, a reliable channel can also recover from crashes of the JVM it is running in. This allows applications to guarantee at-least-once message delivery in case of both, destination failures and sender failures.
If a destination positively confirms the receipt of an event message, the stored message is removed from the channel and the next one is delivered. If a destination negatively confirms the receipt of an event message or if no confirmation is made (i.e. a timeout occurs), a re-delivery attempt is made after a certain redelivery delay. If the maximum number of re-delivery attempts have been made, the channel restarts itself after a certain restart delay and starts again with re-deliveries. If the maximum number of restarts has been reached, the channel stops message delivery and publishes a DeliveryStopped
event to the event stream of the actor system this channel belongs to. Applications can then re-activate the channel by calling the deliver(Int)
method of EventsourcingExtension
with the channel id as argument. Refer to the ReliableChannel
API docs for details.
A ReliableChannel
is created and registered in the same way as a default channel except that a ReliableChannelProps
configuration object is used.
// …
val channel: ActorRef = extension.channelOf(ReliableChannelProps(channelId, destination))
This configuration object additionally allows applications to configure a RedeliveryPolicy
for the channel.
A reliable channel preserves sender references. Applications can therefore use ?
and forward
as well to communicate with channel destinations via channel actor refs. Details have already been described in the default channel section. A reliable channel also stores sender references along with event messages so that they can be forwarded to destinations even after the channel has been restarted. If a stored sender reference is a remote reference, it remains valid even after recovery from a JVM crash (i.e. a crash of the JVM the reliable channel is running in) provided the remote sender is still available.
For those familiar with Akka, a reliable channel is similar to an Akka reliable proxy except that it additionally can recover from sender JVM crashes (see also section Remote destinations).
A reliable request-reply channel is a pattern implemented on top of a reliable channel. It mediates reliable request-reply interactions between a request sender (usually an Eventsourced
processor) and a destination. This channel has the following properties in addition to a plain reliable channel. It
- extracts requests from received
Message
s before sending them to the destination. - wraps replies from the destination into a
Message
before sending them back to the request sender. - sends a special
DestinationNotResponding
reply to the request sender if the destination doesn't reply within a configurable reply timeout. - sends a special
DestinationFailure
reply to the request sender if destination responds withStatus.Failure
. - guarantees at-least-once delivery of replies to the request sender (in addition to at-least-once delivery of requests to the destination).
- requires a positive receipt confirmation for a reply to mark a request-reply interaction as successfully completed.
- redelivers requests, and subsequently replies, on missing or negative receipt confirmations.
A reliable request-reply channel is created and registered in the same way as a reliable channel except that a ReliableRequestReplyChannelProps
configuration object is used.
// …
import org.eligosource.eventsourced.patterns.reliable.requestreply._
val channel: ActorRef = extension.channelOf(ReliableRequestReplyChannelProps(channelId, destination))
This configuration object additionally allows applications to configure a replyTimeout
for replies from the destination. A detailed usage example of a reliable request-reply channel is given in this article.
Channels must be activated before usage, see extension.deliver().
For channels to work properly, event-sourced processors must copy the processorId
and sequenceNr
values from a received (and journaled) input event message to output event messages. This is usually done by calling copy()
on the received input event message and updating only those fields that are relevant for the application such as event
or ack
, for example:
class Processor(channel: ActorRef) extends Actor {
def receive = {
case msg: Message => {
// …
channel ! msg.copy(event = …, ack = …)
}
}
}
When using a message emitter, this is done automatically.
Reliable channels and reliable request-reply channels can also be used independently of Eventsourced
processors (i.e. standalone). For standalone channel usage, senders must set the Message.processorId
of the sent Message
to 0
(which is the default value):
val channel = extension.channelOf(ReliableChannelProps(…))
channel ! Message("my event") // processorId == 0
This is equivalent to directly sending the Message.event
:
channel ! "my event"
A reliable channel internally wraps a received event into a Message
with processorId
set to 0
. Setting the processorId
to 0
causes a reliable channel to skip writing an acknowledgement. An acknowledgement always refers to an event message received by an Eventsourced
processor, so there's no need to write one in this case. Another (unrelated) use case for turning off writing acknowledgements is the emission of event message series in context of event-sourced channel usage.
Applications should consider using reliable channels whenever a sender processor should deliver messages to a remote destination at-least-once and in sending order, even in cases of
- network errors between sender and destination (remote actor ref remains valid but remote actor is temporarily unavailable)
- destination JVM crashes (remote actor ref becomes invalid) and
- sender JVM crashes (messages already received by a sender processor but not yet delivered to the remote destination must be automatically delivered when the sender recovers from a crash)
Using a remote actor ref as channel destination would work in case 1 but not in case 2. One possible way to deal with case 2, is to use a local actor (a destination proxy) that communicates with the remote destination via an ActorSelection
. An ActorSelection
can be created from an actor path and is not bound to the remote destination's life-cycle.
class DestinationProxy(destinationPath: String) extends Actor {
val destinationSelection: ActorSelection = context.actorSelection(destinationPath)
def receive = {
case msg => destinationSelection tell (msg, sender) // forward
}
}
val destinationPath: String = …
val proxy = system.actorOf(Props(new DestinationProxy(destinationPath)))
val channel = extension.channelOf(ReliableChannelProps(1, proxy))
Using an ActorSelection
also covers case 1, of course. Case 3 is automatically covered by the fact that a sender processor uses a reliable channel for sending messages to a destination. Here's an example:
class Sender extends Actor {
val id = 1
val ext = EventsourcingExtension(context.system)
val proxy = context.actorOf(Props(new DestinationProxy("akka.tcp://[email protected]:2852/user/destination")))
val channel = ext.channelOf(ReliableChannelProps(1, proxy))
def receive = {
case msg: Message => channel forward msg
}
}
// create and recover sender and its channel
val sender = extension.processorOf(Props(new Sender with Eventsourced))
sender ! Message("hello")
Special care must be taken if the remote destination actor is an Eventsourced
actor. In this case, the application must ensure that the destination actor can only be accessed remotely after it has been successfully recovered. This can be achieved, for example, by using an additional endpoint actor that simply forwards to the destination actor. The endpoint actor is registered under the destination path after the destination actor has been successfully recovered.
class DestinationEndpoint(destination: ActorRef) extends Actor {
def receive = {
case msg => destination forward msg
}
}
class Destination extends Actor {
val id = 2
def receive = {
case msg: Message => {
println(s"received ${msg.event}")
msg.confirm()
}
}
}
val destination = extension.processorOf(Props(new Destination with Eventsourced))
// wait for destination recovery to complete
extension.recover()
// make destination remotely accessible after recovery
system.actorOf(Props(new DestinationEndpoint(destination)), "destination")
This ensures that new remote messages never interleave with messages that are replayed to the destination actor during recovery.
Code from this section is contained in ReliableChannelExample.scala. The sender application can be started from sbt with
> project eventsourced-examples
> run-main org.eligosource.eventsourced.example.Sender
The (remote) destination can be started with
> project eventsourced-examples
> run-main org.eligosource.eventsourced.example.Destination
The sender application prompts the user to enter messages on stdin
which are then reliably sent to the remote destination.
A less reliable alternative to channels is communication via sender references. This means producing event messages to destinations that have been passed to a processor via sender references (along with an input event message). These sender references will be deadLetters
during a replay which also prevents redundant delivery. The main difference, however, is that the delivery guarantee changes from at-least-once to at-most-once.
Recovery is a procedure that re-creates the state of event-sourced applications consisting of Eventsourced
actors (processors) and channels. Recovery is usually done at application start, either after normal termination or after a crash (but can also be done any time later, even for individual processors and channels).
val system: ActorSystem = …
val journal: ActorRef = …
val extension = EventsourcingExtension(system, journal)
// create and register event-sourced processors
extension.processorOf(…)
// …
// create and register channels
extension.channelOf(…)
// …
// recover state of registered processors and activate channels
extension.recover()
// processors and channels are now ready to use
// …
The recover()
method first replays journaled event messages to all registered processors. By replaying the event message history, processors can recover state. Processors that emit event messages to one or more channels will also do so during replay. These channels will either ignore (discard) event messages that have already been successfully delivered (i.e. acknowledged) in previous application runs or buffer them for later delivery. After replay, the recover()
method triggers the delivery of buffered messages by activating channels.
If channels delivered event messages immediately instead of buffering them, delivered event messages could wrongly interleave with replayed event messages. This could lead to inconsistencies in event message ordering across application runs and therefore to inconsistencies in application state. Therefore, recovery must ensure that buffered event messages are only delivered after all replayed event messages have been added to their corresponding processors' mailboxes. This is especially important for the recovery of processors and channels that are connected to cyclic, directed graphs.
Recovery can be parameterized with replay parameters using the EventsourcingExtension.recover(Seq[ReplayParams])
method (or one of its overloaded definitions). ReplayParams
allow fine-grained control over state recovery of individual processors. For each processor to be recovered, an application must create a ReplayParams
instance. ReplayParams
specify
- whether replay should start from scratch, from a snapshot or from a given sequence number (lower sequence number bound).
- whether replay should end at current state or any state in the past (using an upper sequence number bound)
The following two subsections demonstrate some ReplayParams
usage examples. For more details, refer to the API docs of ReplayParams
and its companion object. For details about snapshot creation refer to the Snapshots section.
As already explained above
val extension: EventsourcingExtension = …
import extension._
recover()
recovers all processors with no lower and upper sequence number bound i.e. all event messages are replayed. This is equivalent to
recover(replayParams.allFromScratch)
or
recover(processors.keys.map(pid => ReplayParams(pid)).toSeq)
If an application only wants to recover specific processors it should create ReplayParams
only for these processors. For example
recover(Seq(ReplayParams(1), ReplayParams(2)))
only recovers processors with ids 1
and 2
. Upper and lower sequence number bounds can be specified as well.
recover(Seq(ReplayParams(1, toSequenceNr = 12651L), ReplayParams(2, fromSequenceNr = 10L)))
Here processor 1
will receive replayed event messages with sequence numbers within range 0
and 12651
(inclusive), processor 2
with receive event messages with sequence numbers starting from 10
with no upper sequence number bound.
During snapshot based recovery, a processor receives a SnapshotOffer
message before receiving the remaining event messages (if there are any). A processor uses a SnapshotOffer
message to restore its state.
Scala:
class Processor extends Actor {
var state = // …
def receive = {
case so: SnapshotOffer => state = so.snapshot.state
// …
}
}
Java:
public class Processor extends UntypedEventsourcedActor {
private Object state = // …
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof SnapshotOffer) {
SnapshotOffer so = (SnapshotOffer)message;
state = so.snapshot().state();
}
// …
}
}
Snapshot based recovery will only send a SnapshotOffer
message to a processor if one or more snapshots have been created for that processor before and these snapshots match the criteria in the corresponding ReplayParams
. Relevant criteria are toSequenceNr
and snapshotFilter
. If there are no snapshots for a processor or existing snapshots do not match ReplayParams
criteria, event messages will be replayed from scratch i.e. from sequence number 0
.
To recover all processors from their latest snapshot
recover(replayParams.allWithSnapshot)
can be used. This is equivalent to
recover(processors.keys.map(pid => ReplayParams(pid, snapshot = true)).toSeq)
Snapshot based recovery can also be made with upper sequence number bound.
recover(Seq(ReplayParams(1, snapshot = true, toSequenceNr = 12651L)))
This recovers processor 1
with the latest snapshot that has a sequence number <= 12651
. Remaining event messages (if there are any) are replayed up to sequence number 12651
(inclusive). Applications may also define further constraints on snapshots. For example
import scala.concurrent.duration._
val limit = System.currentTimeMillis - 24.hours.toMillis
recover(Seq(ReplayParams(1, snapshotFilter = snapshotMetadata => snapshotMetadata.timestamp < limit)))
uses the latest snapshot of processor 1
that is older than 24 hours. This is done with a snapshotFilter
that filters snapshots based on their timestamp. Snapshot filters operate on SnapshotMetadata
.
The recover
method waits for replayed messages being sent to all processors (via !
) but does not wait for replayed event messages being processed by these processors. However, any new message sent to any registered processor, after recover
successfully returned, will be processed after the replayed event messages. Applications that want to wait for processors to complete processing of replayed event messages, should use the awaitProcessing()
method of EventsourcingExtension
.
val extension: EventsourcingExtension = …
extension.recover()
extension.awaitProcessing()
This can be useful in situations where event-sourced processors maintain state via STM references and the application wants to ensure that the (externally visible) state is fully recovered before accepting new read requests from client applications. By default, the awaitProcessing()
method waits for all registered processors to complete processing but applications can also specify a subset of registered processors.
The recover
and awaitProcessing
methods block the calling thread. This may be convenient in scenarios where a main thread wants to recover the state of an event-sourced application before taking any further actions. In other scenarios, for example, where recovery is done for individual child processors (and channels) inside an actor (see OrderExampleReliable.scala), the non-blocking recovery API should be used:
val extension: EventsourcingExtension = …
val future = for {
_ <- extension.replay(…)
_ <- extension.deliver(…) // optional
_ <- extension.completeProcessing(…) // optional
} yield ()
future onSuccess {
case _ => // event-sourced processors now ready to use …
}
Here, the futures returned by replay
, deliver
and completeProcessing
are monadically composed with a for-comprehension which ensures a sequential execution of the corresponding asynchronous operations. When the composite future
completes, the recovered processors and channels are ready to use. More details in the API docs. The replay
method can also be parameterized with a ReplayParams
sequence (see section Replay parameters).
Channels can even be used by applications immediately after creation i.e. before activating them. This is especially important when event-sourced (parent) processors create new event-sourced child processors during handling of an event:
class Parent extends Actor { this: Receiver with Eventsourced =>
import context.dispatcher
var child: Option[ActorRef] = None
def receive = {
case event => {
// create child processor wrapped by channel
if (child.isEmpty) { child = Some(createChildProcessor(2, 2)) }
// channel can be used immediately
child.foreach(_ ! message.copy(…))
}
}
def createChildProcessor(pid: Int, cid: Int): ActorRef = {
implicit val recoveryTimeout = Timeout(10 seconds)
val childProcessor = extension.processorOf(Props(new Child with Receiver with Eventsourced { val id = pid } ))
val childChannel = extension.channelOf(DefaultChannelProps(cid, childProcessor))
for { // asynchronous, non-blocking recovery
_ <- extension.replay(Seq(ReplayParams(pid)))
_ <- extension.deliver(cid)
} yield ()
childChannel
}
}
class Child extends Actor { this: Receiver =>
def receive = {
case event => {
…
confirm()
}
}
}
Here, Parent
lazily creates a new childProcessor
(wrapped by a default channel) on receiving an event
. The childChannel
is used by Parent
immediately after creation i.e. concurrently to childProcessor
message replay and childChannel
activation. This is possible because a channel internally buffers new messages before its activation and delivers them to its destination after activation. This ensures that new messages will only be delivered to childProcessor
after childChannel
has been activated. During Parent
recovery, childChannel
will ignore messages that have already been successfully delivered to childActor
(i.e. confirmed by childActor
).
The behavior of Eventsourced
processors may depend on the state of other Eventsourced
processors. For example, processor A sends a message to processor B and processor B replies with a message that includes (part of) processor B's state. Depending on the state value included in the reply, processor A may take different actions. To ensure a proper recovery of such a setup, any state-conveying or state-dependent messages exchanged between processors A and B must be of type Message
(see also DependentStateRecoverySpec.scala). Exchanging state via non-journaled messages (i.e. messages of type other than Message
) can break consistent recovery. This is also the case if an Eventsourced
processor maintains state via an externally visible STM reference and another Eventsourced
processor directly reads from that reference. Communication between Eventsourced
processors is closely related to external queries and external updates.
Snapshots represent processor state at a certain point in time and can dramatically reduce recovery times. Snapshot capturing and saving is triggered by applications and does not delete entries from the event message history unless explicitly requested by an application.
Applications can create snapshots by sending an Eventsourced
processor a SnapshotRequest
message (Scala API) or SnapshotRequest.get()
message (Java API).
Scala:
import org.eligosource.eventsourced.core._
// …
val processor: ActorRef = …
processor ! SnapshotRequest
Java:
import org.eligosource.eventsourced.core.*;
// …
final ActorRef processor = …
processor.tell(SnapshotRequest.get(), …)
This will asynchronously capture and save a snapshot of processor
's state. The sender will be notified when the snapshot has been successfully saved.
Scala:
(processor ? SnapshotRequest).mapTo[SnapshotSaved].onComplete {
case Success(SnapshotSaved(processorId, sequenceNr, timestamp)) => …
case Failure(e) => …
}
Java:
ask(processor, SnapshotRequest.get(), 5000L).mapTo(Util.classTag(SnapshotSaved.class)).onComplete(new OnComplete<SnapshotSaved>() {
public void onComplete(Throwable failure, SnapshotSaved result) {
if (failure != null) { … } else { … }
}
}, system.dispatcher());
Alternatively, applications may also use the EventsourcingExtension.snapshot
method to trigger snapshot creation. For example,
Scala:
val extension: EventsourcingExtension = …
extension.snapshot(Set(1, 2)) onComplete {
case Success(snapshotSavedSet: Set[SnapshotSaved]) => …
case Failure(_) => …
}
Java:
Set<Integer> processorIds = new HashSet<Integer>();
processorIds.add(1);
processorIds.add(2);
extension.snapshot(processorIds, new Timeout(5000L)).onComplete(new OnComplete<Set<SnapshotSaved>>() {
public void onComplete(Throwable failure, Set<SnapshotSaved> snapshotSavedSet) {
if (failure != null) { … } else { … }
}
}, system.dispatcher());
creates snapshots of processors with ids 1
and 2
. The returned future (of type Future[scala.immutable.Set[SnapshotSaved]]
(Scala API) or Future<java.util.Set<SnapshotSaved>>
(Java API)) successfully completes when the snapshots of both processors have been successfully saved.
To participate in snapshot capturing, a processor must process SnapshotRequest
messages by calling their process
method with its current state
as argument:
Scala:
class Processor extends Actor {
var state = …
def receive = {
case sr: SnapshotRequest => sr.process(state)
// …
}
}
Java:
public class Processor extends UntypedEventsourcedActor {
private Object state = …
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof SnapshotRequest) {
SnapshotRequest sr = (SnapshotRequest)message;
sr.process(state, getContext());
}
// …
}
}
Calling process
will asynchronously save the state
argument together with (generated) snapshot metadata. Creating a new snapshot does not delete older snapshots unless explicitly requested by an application. Hence, there can be n snapshots per processor.
An example that demonstrates snapshot creation and snapshot based recovery is contained in SnapshotExample.scala and SnapshotExample.java. It can be executed from the sbt prompt with
Scala:
> project eventsourced-examples
> run-main org.eligosource.eventsourced.example.SnapshotExample
Java:
> project eventsourced-examples
> run-main org.eligosource.eventsourced.example.japi.SnapshotExample
Snapshotting is supported by all journals via the Hadoop FileSystem
abstraction. The default FileSystem
instance is the local filesystem i.e. snapshots are by default written locally unless configured otherwise by the application. Please refer to the Hadoop documentation how to create FileSystem
instances for HDFS, FTP, S3 etc. Application-defined FileSystem
instances can be configured in the following way:
Scala:
// …
import org.apache.hadoop.fs.FileSystem
// …
val hdfs: FileSystem = FileSystem.get(...)
val journal: ActorRef = LeveldbJournalProps(..., snapshotFilesystem = hdfs).createJournal
Java:
// …
import org.apache.hadoop.fs.FileSystem;
// …
final FileSystem hdfs = FileSystem.get(…);
final ActorRef journal = LeveldbJournalProps.create(...).withSnapshotFilesystem(hdfs).createJournal(system);
Find out more in the HadoopFilesystemSnapshottingProps API docs.
Actors that are modified with a stackable Receiver
, Emitter
and/or Eventsourced
trait (Scala API) or extend any of the abstract Untyped*
base classes (Java API) can change their behavior with the methods become()
and unbecome()
. These are defined on the Behavior
trait from which Receiver
, Emitter
and Eventsourced
inherit.
Actors that change their behavior with become()
and unbecome()
will keep the functionality introduced by a stackable Receiver
, Emitter
and/or Eventsourced
trait. For example, an actor that is modified with the Eventsourced
trait (Scala API) or extends UntypedEventsourcedActor
(Java API) will continue to journal event messages after having changed its behavior with become()
.
On the other hand, actors that change their behavior with context.become()
(Scala API) or getContext().become()
(Java API) will loose the functionality introduced by the stackable Receiver
, Emitter
and/or Eventsourced
traits (although the lost behavior can be recovered with context.unbecome()
or getContext().unbecome()
).
When a processor derives more than one output event message from a single input event message and emits those output messages to a single channel, it generates a series of event messages. For an event message series, the event processor should set the ack
field for all but the last emitted message to false
.
class Processor(channel: ActorRef) extends Actor {
def receive = {
case msg: Message => {
// …
channel ! msg.copy(event = "event 1", ack = false) // 1st message of series
channel ! msg.copy(event = "event 2", ack = false) // 2nd message of series
// …
channel ! msg.copy(event = "event n") // last message of series
}
}
}
Processors that use an emitter do that in the following way.
class Processor extends Actor { this: Emitter =>
def receive = {
case event => {
// …
emitter("channelName") send (msg => msg.copy(event = "event 1", ack = false)) // 1st message of series
emitter("channelName") send (msg => msg.copy(event = "event 2", ack = false)) // 2nd message of series
// …
emitter("channelName") sendEvent "event n"
}
}
}
This ensures that an acknowledgement is only written to the journal after the last message of a series has been successfully
- delivered by a default channel or
- stored by a reliable channel
Destinations, however, should confirm the receipt of every event message, regardless whether it belongs to a series or not.
Under certain failure conditions, channels may deliver event messages to destinations more than once. A typical example is that a destination positively confirms a message receipt but the application crashes shortly before that confirmation can be written to the journal. In this case, the destination will receive the event message again during recovery.
For these (but also other) reasons, channel destinations must be idempotent event message consumers which is an application-level concern. For example, an event message consumer that stores received purchase orders in a map (where the map key is the order id) is likely to be an idempotent consumer because receiving a purchase order only once or several times will lead to the same result: the purchase order is contained in the map only once. An event message consumer that counts the number of received purchase orders is not an idempotent consumer: a re-delivery will lead to a wrong counter value from a business logic perspective. In this case the event message consumer must implement some extra means to detect event message duplicates.
For detecting duplicates, applications should use identifiers with their events. Identifier values should be set by an event-sourced processor before an event is emitted via a channel. Channel destinations (or other downstream consumers) should keep track of identifiers of successfully processed events and compare them to identifiers of newly received events. A newly received event with an already known identifier can be considered as a duplicate (assuming that the emitting processor generates unique identifiers). For generating unique identifiers, processors can use the sequence number of received event messages:
case class MyEvent(details: Any, eventId: Long)
class Processor extends Actor { this: Emitter with Eventsourced =>
def receive = {
case event => {
// get sequence number of current event message
val snr: Long = sequenceNr
val details: Any = …
// …
emitter("channelName") sendEvent MyEvent(details, snr)
}
}
}
Using the sequence number has the advantage that consumers of emitted events only need to remember the identifier of the last successfully consumed event. If the identifier of a newly received event is less than or equal to that of the last consumed event then it is a duplicate and can therefore be ignored.
class Consumer extends Actor {
var lastEventId = 0L
def receive = {
case MyEvent(details, eventId) =>
if (eventId <= lastEventId) {
// duplicate
} else {
// …
lastEventId = eventId
}
}
}
Consumers that are event-sourced processors can store the event identifier as part of their state which will be recovered during an event message replay. Other consumers must store the identifier somewhere else.
Processors that emit event message series should use an event message index in addition to the sequence number to uniquely identify an emitted event:
case class MyEvent(details: Any, eventId: (Long, Int))
class Processor extends Actor { this: Emitter with Eventsourced =>
def receive = {
case event => {
// get sequence number of current event message
val snr: Long = sequenceNr
val details: Seq[Any] = …
// …
emitter("channelName") send (msg => msg.copy(event = MyEvent(details(0), (snr, 0)), ack = false))
emitter("channelName") send (msg => msg.copy(event = MyEvent(details(1), (snr, 1)), ack = false))
// …
}
}
}
Consumers should then compare the sequence number - index pairs for detecting duplicates.
Applications can configure custom serializers for events of event Message
s. Custom serializers are used for both, writing the event to a journal and for remote communication. They can be configured like any other Akka serializer. For example:
akka {
actor {
serializers {
custom = "example.MyEventSerializer"
}
serialization-bindings {
"example.MyEvent" = custom
}
}
}
Here, example.MyEvent
is an application-specific event type and example.MyEventSerializer
is an application-specific serializer that extends akka.serialization.Serializer
import akka.serialization.Serializer
class CustomEventSerializer extends Serializer {
def identifier = …
def includeManifest = true
def toBinary(o: AnyRef) = …
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]) = …
}
Event Messages themselves are serialized with a pre-configured, library-specific serializer. This serializer is automatically used for event Message
s when the eventsourced-journal-common-*.jar
is on the classpath of an Akka application.
The order management example in this section is taken from Martin Fowler's great LMAX article:
Imagine you are making an order for jelly beans by credit card. A simple retailing system would take your order information, use a credit card validation service to check your credit card number, and then confirm your order - all within a single operation. The thread processing your order would block while waiting for the credit card to be checked, but that block wouldn't be very long for the user, and the server can always run another thread on the processor while it's waiting.
In the LMAX architecture, you would split this operation into two. The first operation would capture the order information and finish by outputting an event (credit card validation requested) to the credit card company. The Business Logic Processor would then carry on processing events for other customers until it received a credit-card-validated event in its input event stream. On processing that event it would carry out the confirmation tasks for that order.
This can be implemented with the Eventsourced library as shown in the following diagram (legend is in Appendix A).
- We implement the mentioned Business Logic Processor processor as event-sourced actor (
OrderProcessor
). It processesOrderSubmitted
events by assigning submitted orders an id and storing them in a map (= state ofOrderProcessor
). For every submitted order it emits aCreditCardValidationRequested
event. CreditCardValidationRequested
events are processed by aCreditCardValidator
actor. It contacts an external credit card validation service and sendsCreditCardValidated
events back to theOrderProcessor
for every order with a valid credit card number. In the example implementation below, we won't actually use an external service to keep the implementation simple, but for real-world implementations, akka-camel would be a perfect fit here.- On receiving a
CreditCardValidated
event, the event-sourcedOrderProcessor
updates the status of corresponding order tovalidated = true
and sends anOrderAccepted
event, containing the updated order, toDestination
. It also replies the updated order to the initial sender.
The Order
domain object, the domain events and the OrderProcessor
are defined as follows:
// domain object
case class Order(id: Int = -1, details: String, validated: Boolean = false, creditCardNumber: String)
// domain events
case class OrderSubmitted(order: Order)
case class OrderAccepted(order: Order)
case class CreditCardValidationRequested(order: Order)
case class CreditCardValidated(orderId: Int)
// event-sourced order processor
class OrderProcessor extends Actor { this: Emitter =>
var orders = Map.empty[Int, Order] // processor state
def receive = {
case OrderSubmitted(order) => {
val id = orders.size
val upd = order.copy(id = id)
orders = orders + (id -> upd)
emitter("validation_requests") forwardEvent CreditCardValidationRequested(upd)
}
case CreditCardValidated(orderId) => {
orders.get(orderId).foreach { order =>
val upd = order.copy(validated = true)
orders = orders + (orderId -> upd)
sender ! upd
emitter("accepted_orders") sendEvent OrderAccepted(upd)
}
}
}
}
The OrderProcessor
uses a message emitter
to send CreditCardValidationRequested
events to CreditCardValidator
via the named "validation_requests"
channel. The forwardEvent
method not only sends the event but also forwards the initial sender reference. Upon receiving a CreditCardValidationRequested
event, the CreditCardValidator
runs a credit card validation in the background and sends a CreditCardValidated
event back to the OrderProcessor
class CreditCardValidator(orderProcessor: ActorRef) extends Actor { this: Receiver =>
def receive = {
case CreditCardValidationRequested(order) => {
val sdr = sender // initial sender
val msg = message // current event message
Future {
// do some credit card validation
// …
// and send back a successful validation result (preserving the initial sender)
orderProcessor tell (msg.copy(event = CreditCardValidated(order.id)), sdr)
}
}
}
}
The CreditCardValidator
again forwards the initial sender reference which finally enables the OrderProcessor
to reply to the initial sender when it receives the CreditCardValidated
event. The OrderProcessor
also sends an OrderAccepted
event to Destination
via the named "accepted_orders"
channel.
class Destination extends Actor {
def receive = {
case event => println("received event %s" format event)
}
}
Next step is to wire the collaborators and to recover them:
val extension: EventsourcingExtension = …
val processor = extension.processorOf(Props(new OrderProcessor with Emitter with Confirm with Eventsourced { val id = 1 }))
val validator = system.actorOf(Props(new CreditCardValidator(processor) with Receiver))
val destination = system.actorOf(Props(new Destination with Receiver with Confirm))
extension.channelOf(ReliableChannelProps(1, validator).withName("validation_requests"))
extension.channelOf(DefaultChannelProps(2, destination).withName("accepted_orders"))
extension.recover()
The named "validation requests"
channel is a reliable channel that re-delivers CreditCardValidationRequested
events in case of CreditCardValidator
failures (for example, when the external credit card validation service is temporarily unavailable). Furthermore, it should be noted that the CreditCardValidator
does not confirm event message deliveries (it neither calls confirm()
explicitly nor is it modified with the Confirm
trait during instantiation). Delivery confirmation will take place when the OrderProcessor
successfully processed the CreditCardValidated
event.
The Order processor
is now ready to receive OrderSubmitted
events.
processor ? Message(OrderSubmitted(Order(details = "jelly beans", creditCardNumber = "1234-5678-1234-5678"))) onSuccess {
case order: Order => println("received response %s" format order)
}
Running this example with an empty journal will write
received response Order(0,jelly beans,true,1234-5678-1234-5678)
received event OrderAccepted(Order(0,jelly beans,true,1234-5678-1234-5678))
to stdout
. You may observe a different line ordering when running the example. The submitted order was assigned an id
of 0
which corresponds to the initial size of the OrderProcessor
's orders
map. A second application run will first recover the previous application state, so that another order submission will generate an order id
of 1
.
received response Order(1,jelly beans,true,1234-5678-1234-5678)
received event OrderAccepted(Order(1,jelly beans,true,1234-5678-1234-5678))
The example code is contained in OrderExample.scala and can be executed from the sbt prompt with
> project eventsourced-examples
> run-main org.eligosource.eventsourced.example.OrderExample
An advanced version of this example, using a reliable request-reply channel, is discussed in Event sourcing and external service integration.
With a change since Akka 2.1, event-sourcing Akka FSMs is now pretty easy. The following state machine example is a Door
which can be in one of two states: Open
and Closed
.
sealed trait DoorState
case object Open extends DoorState
case object Closed extends DoorState
case class DoorMoved(state: DoorState, times: Int)
case class DoorNotMoved(state: DoorState, cmd: String)
case class NotSupported(cmd: Any)
class Door extends Actor with FSM[DoorState, Int] { this: Emitter =>
startWith(Closed, 0)
when(Closed) {
case Event("open", counter) => {
emit(DoorMoved(Open, counter + 1))
goto(Open) using(counter + 1)
}
}
when(Open) {
case Event("close", counter) => {
emit(DoorMoved(Closed, counter + 1))
goto(Closed) using(counter + 1)
}
}
whenUnhandled {
case Event(cmd @ ("open" | "close"), counter) => {
emit(DoorNotMoved(stateName, "cannot %s door" format cmd))
stay
}
case Event(cmd, counter) => {
emit(NotSupported(cmd))
stay
}
}
def emit(event: Any) = emitter("destination") forwardEvent event
}
On state changes, a door emits DoorMoved
events to the named "destination"
channel. DoorMoved
events contain the door's current state and the number of moves so far. On invalid attempts to move a door e.g. trying to open an opened door, a DoorNotMoved
event is emitted. The channel destination is an actor that simply prints received events to stdout
.
class Destination extends Actor {
def receive = { case event => println("received event %s" format event) }
}
After configuring the application
val system: ActorSystem = …
val extension: EventsourcingExtension = …
val destination = system.actorOf(Props(new Destination with Receiver with Confirm))
extension.channelOf(DefaultChannelProps(1, destination).withName("destination"))
extension.processorOf(Props(new Door with Emitter with Eventsourced { val id = 1 } ))
extension.recover()
val door = extension.processors(1)
we can start sending event messages to door
:
door ! Message("open")
door ! Message("close")
This will write
received event DoorMoved(Open,1)
received event DoorMoved(Closed,2)
to stdout
. When trying to attempt an invalid state change with
door ! Message("close")
the destination
will receive a DoorNotMoved
event:
received event DoorNotMoved(Closed,cannot close door)
Restarting the example application will recover the door's state so that
door ! Message("open")
door ! Message("close")
will produce
received event DoorMoved(Open,3)
received event DoorMoved(Closed,4)
The code from this section is contained in slightly modified form in FsmExample.scala.
This section makes the Door
state machine from the previous example highly-available in an Akka cluster. The Door
state machine is a cluster-wide singleton that is managed by NodeActor
s. There's one NodeActor
per cluster node listening to cluster events. If a NodeActor
becomes the master (= leader) it creates and recovers a Door
instance. The other NodeActor
s obtain a remote reference to the Door
instance on master.
Clients interact with the Door
singleton via NodeActor
s by sending them door commands ("open"
or "close"
). NodeActor
s accept commands on any cluster node, not only on master. A NodeActor
forwards these commands to the Door
as command Message
s. Event Message
s emitted by the Door
are sent to a remote Destination
actor via the named "destination"
channel. The Destination
creates a response from the received events and sends that response back to the initial sender. The application that runs the Destination
actor is not part of the cluster but a standalone remote application. It also hosts the journal that is used by the cluster nodes (which is a SPOF in this example but later versions will use a distributed journal).
When the master crashes, another node in the cluster becomes the master and recovers the Door
state machine. The remaining slave node renews its remote reference to the Door
instance on the new master.
Code from this section is contained in ClusterExample.scala, the configuration files used are journal.conf and cluster.conf. For a more detailed description of the example code, refer to the code comments. To run the distributed example application from sbt, first start the application that hosts the Destination
actor and the journal:
> run-main org.eligosource.eventsourced.example.DestinationApp
Then start the first seed node of the cluster
> run-main org.eligosource.eventsourced.example.NodeApp 2561
then the second seed node
> run-main org.eligosource.eventsourced.example.NodeApp 2562
and finally a third cluster node
> run-main org.eligosource.eventsourced.example.NodeApp
The above commands require that you're in the eventsourced-examples
project. You can switch to it via
> project eventsourced-examples
Most likely the first seed node will become the master which writes
MASTER: recovered door at akka://[email protected]:2561
to stdout
. The other nodes become slaves that write
SLAVE: referenced door at akka://[email protected]:2561
to stdout
. All nodes prompt the user to enter a door command:
command (open|close):
We will now enter commands on the last started cluster node (a slave node).
The Door
singleton is initially in closed state. Entering open
will open it:
command (open|close): open
moved 1 times: door now open
Then close it again:
command (open|close): close
moved 2 times: door now closed
Trying to close a closed door will result in an error:
command (open|close): close
cannot close door: door is closed
Now kill the master node with ctrl^c
. This will also destroy the Door
singleton. After 1-2 seconds, a new master has been determined by the cluster. The new master is going to recover the event-sourced Door
singleton. The slave will renew its remote reference to the Door
. To verify that the Door
has been properly recovered, open the door again:
command (open|close): open
moved 3 times: door now open
You can see that the Door
state (which contains the number of past moves) has been properly failed-over.
The Multicast
processor is a predefined Eventsourced
processor that forwards received event messages to multiple targets. Using a Multicast
processor with n targets is an optimization of having n Eventsourced
processors that receive the same event Message
s. Using a multicast processor, a received event message is journaled only once whereas with n Eventsourced
processors that message would be journaled n times (once for each processor). Using a Multicast
processor for a large number of targets can therefore significantly save disk space and increase throughput.
Applications can create a Multicast
processor with the multicast
factory method which is defined in package core
.
// …
import org.eligosource.eventsourced.core._
val extension: EventsourcingExtension = …
val processorId: Int = …
val target1: ActorRef = …
val target2: ActorRef = …
val multicast = extension.processorOf(Props(multicast(processorId, List(target1, target2))))
This is equivalent to
val multicast = extension.processorOf(Props(new Multicast(List(target1, target2), identity) with Eventsourced { val id = processorId } ))
Applications that want to modify received event Message
s, before they are forwarded to targets, can specify a transformer
function.
val transformer: Message => Any = msg => msg.event
val multicast = extension.processorOf(Props(multicast(1, List(target1, target2), transformer)))
In the above example, the transformer
function extracts the event
from a received event Message
. If the transformer
function is not specified, it defaults to the identity
function. Another Multicast
factory method is the decorator
method for creating a multicast processor with a single target.
TODO
- Eventsourced user forum
- Eventsourced developer forum
- Commercial support by Eligotech B.V.