-
Notifications
You must be signed in to change notification settings - Fork 98
Snapshots
Snapshot support is currently developed on branch wip-8-snapshots. The following gives an overview of the proposed snapshot API (feedback here or here). The snapshot API allows applications to store state snapshots of Eventsourced
processors. State snapshots represent processor state at a certain point in time.
Recovering current processor state can be done either by replaying the full event message history or by initializing processor state with a snapshot and replaying only the remaining event messages that are newer than the snapshot. Processor state recovery using snapshots can dramatically reduce recovery times.
Applications can request snapshot storage by sending a processor the Snapshot
message.
import org.eligosource.eventsourced.core._
// ...
val processor: ActorRef = ...
processor ! Snapshot
This will create and store a state snapshot for processor
. The sender can also be notified when the snapshot has been successfully stored.
processor ? Snapshot onComplete {
case Success(SnapshotSaved(pid, snr)) => ...
case Failure(e) => ...
}
Alternatively, applications may also use the EventsourcingExtension
to request snapshot creation and storage.
val extension: EventsourcingExtension = ...
extension.snapshot(Set(1, 2)) onComplete {
case Success(_) => ...
case Failure(_) => ...
}
This will create state snapshots of processors with ids 1
and 2
. The returned Future
successfully completes when the snapshots of both processors have been successfully stored.
To support snapshotting, a processor must handle SnapshotRequest
messages by calling the process
method with its current state
as argument:
class Processor extends Actor {
var state = ...
def receive = {
case sr: SnapshotRequest => sr.process(state)
...
}
}
Calling process
will asynchronously store the state
argument together with metadata such as the processor id
and the latest sequence number covered by that snapshot.
Snapshot based recovery of processors with ids 1
and 2
can be done as follows.
val extension: EventsourcingExtension = ...
extension.recover(Seq(1, 2).map(ReplayParams(_, snapshot = true)))
On the other hand
extension.recover(Seq(1, 2).map(ReplayParams(_, snapshot = false)))
or
extension.recover(Seq(1, 2).map(ReplayParams(_)))
does a recovery with the full event message history.
Snapshot based recovery for all registered processors can be done with
import extension._
extension.recover(replayParams.allWithSnapshot)
but
extension.recover(replayParams.allFromScratch)
or
extension.recover()
does again a recovery with the full event message history.
During recovery a processor is offered a state snapshot with a SnapshotOffer
message before receiving the remaining event messages (if there are any). A processor handles a SnapshotOffer
message to initialize its internal state.
class Processor extends Actor {
var state = ...
def receive = {
case so: SnapshotOffer => state = so.snapshot.state
...
}
}
Per processor, n snapshots can be stored. Which one to use for recovery can be specified via the ReplayParams.snapshotFilter
predicate which is of type SnapshotMetadata => Boolean
where SnapshotMetadata
is defined as
trait SnapshotMetadata {
def processorId: Int
def sequenceNr: Long
def timestamp: Long
}
During recovery ReplayParams.snapshotFilter
is called with metadata of all stored snapshots and the latest (youngest) snapshot of those matching the predicate will be used for recovery. For example
extension.recover(Seq(ReplayParams(1, _.sequenceNr < 724L)))
will only consider snapshots with a sequenceNr
less than 724
for a processor with id 1
. Applying the same criteria to all processors can be done with
extension.recover(replayParams.allWithSnapshot(_.sequenceNr < 724L))
If not specified, the filter predicate defaults to _ => true
i.e. the latest of all snapshots will be used.
A recovery with or without snapshots can have an upper limit (in terms of sequence number) for restoring states in the past. Eventsourced will compose an upper limit (ReplayParams.toSequenceNr
) for a snapshotted replay with a user-defined ReplayParams.snapshotFilter
to select an appropriate snapshot. For example
extension.recover(Seq(ReplayParams(1, snapshotFilter = _.sequenceNr > 724, toSequenceNr = 712)))
will not select a snapshot for recovery because none of the selected snapshots (with a sequence number > 724
) match the upper sequence number limit of 712
.
There's also running SnapshotExample on branch wip-8-snapshots that will be extended as new features are added. It uses the in-memory journal which is currently the only journal that supports snapshotting. Once there is agreement on this proposal, other journals will soon be extended to support snapshot storage as well.
All API changes are backwards compatible. Furthermore, the EventsourcingExtension
methods
def recover(f: (Int) => Option[Long], waitAtMost: FiniteDuration): Unit
def replay(f: (Int) => Option[Long])(implicit timeout: Timeout): Future[Any]
have been depreceated and are now superseded by
def recover(params: Seq[ReplayParams], waitAtMost: FiniteDuration): Unit
def replay(params: Seq[ReplayParams])(implicit timeout: Timeout): Future[Any]
API docs will be published as soon as branch wip-8-snapshots is merged to master.