Skip to content
krasserm edited this page Apr 11, 2013 · 15 revisions

Snapshot support is currently developed on branch wip-8-snapshots. The following gives an overview of the proposed snapshot API (feedback here or here). The snapshot API allows applications to store state snapshots of Eventsourced processors. State snapshots represent processor state at a certain point in time.

Recovering current processor state can be done either by replaying the full event message history or by initializing processor state with a snapshot and replaying only the remaining event messages that are newer than the snapshot. Processor state recovery using snapshots can dramatically reduce recovery times.

Snapshot creation

Applications can request snapshot storage by sending a processor the Snapshot message.

import org.eligosource.eventsourced.core._
// ...

val processor: ActorRef = ...

processor ! Snapshot

This will create and store a state snapshot for processor. The sender can also be notified when the snapshot has been successfully stored.

processor ? Snapshot onComplete {
  case Success(SnapshotSaved(pid, snr)) => ...
  case Failure(e)                       => ...
}

Alternatively, applications may also use the EventsourcingExtension to request snapshot creation and storage.

val extension: EventsourcingExtension = ...

extension.snapshot(Set(1, 2)) onComplete {
  case Success(_) => ...
  case Failure(_) => ...
}

This will create state snapshots of processors with ids 1 and 2. The returned Future successfully completes when the snapshots of both processors have been successfully stored.

To support snapshotting, a processor must handle SnapshotRequest messages by calling the process method with its current state as argument:

class Processor extends Actor {
  var state = ...

  def receive = {
    case sr: SnapshotRequest => sr.process(state)
    ...
  }
}

Calling process will asynchronously store the state argument together with metadata such as the processor id and the latest sequence number covered by that snapshot.

Snapshot based recovery

Snapshot based recovery of processors with ids 1 and 2 can be done as follows.

val extension: EventsourcingExtension = ...

extension.recover(Seq(1, 2).map(ReplayParams(_, snapshot = true)))

On the other hand

extension.recover(Seq(1, 2).map(ReplayParams(_, snapshot = false)))

or

extension.recover(Seq(1, 2).map(ReplayParams(_)))

does a recovery with the full event message history.

Snapshot based recovery for all registered processors can be done with

import extension._

extension.recover(replayParams.allWithSnapshot)

but

extension.recover(replayParams.allFromScratch)

or

extension.recover()

does again a recovery with the full event message history.

During recovery a processor is offered a state snapshot with a SnapshotOffer message before receiving the remaining event messages (if there are any). A processor handles a SnapshotOffer message to initialize its internal state.

class Processor extends Actor {
  var state = ...

  def receive = {
    case so: SnapshotOffer => state = so.snapshot.state
    ...
  }
}

Snapshot selection

Per processor, n snapshots can be stored. Which one to use for recovery can be specified via the ReplayParams.snapshotFilter predicate which is of type SnapshotMetadata => Boolean where SnapshotMetadata is defined as

trait SnapshotMetadata {
  def processorId: Int
  def sequenceNr: Long
  def timestamp: Long
}

During recovery ReplayParams.snapshotFilter is called with metadata of all stored snapshots and the latest (youngest) snapshot of those matching the predicate will be used for recovery. For example

extension.recover(Seq(ReplayParams(1, _.sequenceNr < 724L)))

will only consider snapshots with a sequenceNr less than 724 for a processor with id 1. Applying the same criteria to all processors can be done with

extension.recover(replayParams.allWithSnapshot(_.sequenceNr < 724L))

If not specified, the filter predicate defaults to _ => true i.e. the latest of all snapshots will be used.

Example application

There's also running SnapshotExample on branch wip-8-snapshots that will be extended as new features are added. It uses the in-memory journal which is currently the only journal that supports snapshotting. Once there is agreement on this proposal, other journals will soon be extended to support snapshot storage as well.

Backwards compatibility

All API changes are backwards compatible. Furthermore, the EventsourcingExtension methods

def recover(f: (Int) => Option[Long], waitAtMost: FiniteDuration): Unit
def replay(f: (Int) => Option[Long])(implicit timeout: Timeout): Future[Any]

have been depreceated and are now superseded by

def recover(params: Seq[ReplayParams], waitAtMost: FiniteDuration): Unit
def replay(params: Seq[ReplayParams])(implicit timeout: Timeout): Future[Any]

API docs will be published as soon as branch wip-8-snapshots is merged to master.

Clone this wiki locally