Skip to content

Commit

Permalink
Merge pull request #414 from rabbitmq/aux-improvements
Browse files Browse the repository at this point in the history
New handle_aux/5 API that provides a better and safer API into Ra internals.
  • Loading branch information
kjnilsson authored Feb 19, 2024
2 parents 5271882 + 2320ccc commit fc394fc
Show file tree
Hide file tree
Showing 14 changed files with 662 additions and 337 deletions.
14 changes: 9 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,8 @@ See [Ra state machine tutorial](docs/internals/STATE_MACHINE_TUTORIAL.md)
for how to write more sophisticated state machines by implementing
the `ra_machine` behaviour.

A [Ra-based key/value store example](https://github.com/rabbitmq/ra-kv-store) is available
in a separate repository.
A [Ra-based key/value store example](https://github.com/rabbitmq/ra-kv-store)
is available in a separate repository.


## Documentation
Expand Down Expand Up @@ -427,20 +427,23 @@ in a separate repository.

## Logging

Ra will use default OTP `logger` by default, unless `logger_module` configuration key is used to override.
Ra will use default OTP `logger` by default, unless `logger_module`
configuration key is used to override.

To change log level to `debug` for all applications, use

``` erl
logger:set_primary_config(level, debug).
```

## Ra versioning

Ra attempts to follow [Semantic Versioning](https://semver.org/).

The modules that form part of the public API are:
* `ra`
* `ra_machine`
* `ra_machine` (behaviour callbacks only)
* `ra_aux`
* `ra_system`
* `ra_counters`
* `ra_leaderboard`
Expand All @@ -449,7 +452,8 @@ The modules that form part of the public API are:

## Copyright and License

(c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
(c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to
Broadcom Inc. and/or its subsidiaries.

Dual licensed under the Apache License Version 2.0 and
Mozilla Public License Version 2.0.
Expand Down
202 changes: 18 additions & 184 deletions docs/internals/INTERNALS.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,15 @@ transitions.

### Effect Application and Failure Handling

Under normal operation only the leader that first applies an entry will attempt the effect.
Followers process the same set of commands but simply throw away any effects returned by
the state machine.
Under normal operation only the leader that first applies an entry will attempt
the effect.
Followers process the same set of commands but simply throw away any effects
returned by the state machine.

To ensure we do not re-issue effects on recovery each `ra` server persists its `last_applied` index.
When the server restarts it replays its log until this point and throws away any resulting effects as they
should already have been issued.
To ensure we do not re-issue effects on recovery each `ra` server persists its
`last_applied` index.
When the server restarts it replays its log until this point and throws away any
resulting effects as they should already have been issued.

As the `last_applied` index is only persisted periodically there is a small
chance that some effects may be issued multiple times when all the servers in the
Expand All @@ -105,180 +107,8 @@ never be issued or reach their recipients. Ra makes no allowance for this.

It is worth taking this into account when implementing a state machine.

The [Automatic Repeat Query (ARQ)](https://en.wikipedia.org/wiki/Automatic_repeat_request) protocol
can be used to implement reliable communication (Erlang message delivery) given the
above limitations.

A number of effects are available to the user.

### Sending a message

The `{send_msg, pid(), Msg :: term()}` effect asynchronously sends a message
to the specified `pid`.

`ra` uses `erlang:send/3` with the `no_connect` and `no_suspend`
options which is the least reliable way of doing it. It does this so
that a state machine `send_msg` effect will never block the main `ra` process.

To ensure message reliability, [Automatic Repeat Query (ARQ)](https://en.wikipedia.org/wiki/Automatic_repeat_request)-like
protocols between the state machine and the receiver should be implemented
if needed.

### Monitoring

The `{monitor, process | node, pid() | node()}` effect will ask the `ra` leader to
monitor a process or node. If `ra` receives a `DOWN` for a process it
is monitoring it will commit a `{down, pid(), term()}` command to the log that
the state machine needs to handle. If it detects a monitored node as down or up
it will commit a `{nodeup | nodedown, node()}` command to the log.

Use `{demonitor, process | node, pid() | node()}` to stop monitoring a process
or a node.

All monitors are invalidated when the leader changes. State machines should
re-issue monitor effects when becoming leader using the `state_enter/2`
callback.

### Calling a function

The `{mod_call, module(), function(), Args :: [term()]}` effect will ask the leader
to call an arbitrary function. Care need to be taken not to block the `ra` process whilst doing so.
It is recommended that expensive operations are done in another process.

The `mod_call` effect is useful for e.g. updating an ETS table of committed entries
or similar.

### Setting a timer

The `{timer, Name :: term(), Time :: non_neg_integer() | infinity}` effects asks the Ra leader
to maintain a timer on behalf of the state machine and commit a `timeout` command
when the timer triggers. If setting the time to `infinity`, the timer will not be started
and any running timer with same name will be cancelled.

The timer is relative and setting another timer with the same name before the current
timer runs out results in the current timer being reset.

All timers are invalidated when the leader changes. State machines should
re-issue timer effects when becoming leader using the `state_enter/2`
callback.

### Reading a log

Use `{log, Indexes :: [ra_index()], fun(([user_command()]) -> effects()}` to read
commands from the log from the specified indexes and return a list of effects.

Effectively this effect transforms log entries into effects.

Potential use cases could be when a command contains large binary data and you
don't want to keep this in memory but load it on demand when needed for a side-effect.

This is an advanced feature and will only work as long as the command is still
in the log. If a `release_cursor` has been emitted with an index higher than this,
the command may no longer be in the log and the function will not be called.

There is currently no facility for reading partial data from a snapshot.

### Updating the Release Cursor (Snapshotting)

The `{release_cursor, RaftIndex, MachineState}`
effect can be used to give Ra cluster members a hint to trigger a snapshot.
This effect, when emitted, is evaluated on all nodes and not just the leader.

It is not guaranteed that a snapshot will be taken. A decision to take
a snapshot or to delay it is taken using a number of internal Ra state factors.
The goal is to minimise disk I/O activity when possible.

### Checkpointing

Checkpoints are nearly the same concept as snapshots. Snapshotting truncates
the log up to the snapshot's index, which might be undesirable for machines
which read from the log with the `{log, Indexes, Fun}` effect mentioned above.

The `{checkpoint, RaftIndex, MachineState}` effect can be used as a hint to
trigger a checkpoint. Like snapshotting, this effect is evaluated on all nodes
and when a checkpoint is taken, the machine state is saved to disk and can be
used for recovery when the machine restarts. A checkpoint being written does
not trigger any log truncation though.

The `{release_cursor, RaftIndex}` effect can then be used to promote any
existing checkpoint older than or equal to `RaftIndex` into a proper snapshot,
and any log entries older than the checkpoint's index are then truncated.

These two effects are intended for machines that use the `{log, Indexes, Fun}`
effect and can substantially improve machine recovery time compared to
snapshotting alone, especially when the machine needs to keep old log entries
around for a long time.

## State Machine Versioning

It is eventually necessary to make changes to the state machine
code. Any changes to a state machine that would result in a different end state when
the state is re-calculated from the log of entries (as is done when restarting a ra server)
should be considered breaking.

As Ra state machines need to be deterministic any changes to the logic inside the `apply/3` function
_needs to be enabled at the same index on all members of a Ra cluster_.

### Versioning API

Ra considers all state machines versioned starting with version 0. State machines
that need to be updated with breaking changes need to implement the optional
versioning parts of the `ra_machine` behaviour:

``` erlang
-type version() :: non_neg_integer().

-callback version() -> pos_integer().

-callback which_module(version()) -> module().

```

`version/0` returns the current version which is an integer that is
higher than any previously used version number. Whenever a breaking change is
made this should be incremented.

`which_module/1` maps a version to the module implementing it. This allows
developers to optionally keep entire modules for old versions instead of trying
to handle multiple versions in the same module.

E.g. when moving from version 0 of `my_machine` to version 1:

1. Copy and rename the `my_machine` module to `my_machine_v0`

2. Implement the breaking changes in the original module and bump the version.

``` erlang
version() -> 1.

which_module(1) -> my_machine;
which_module(0) -> my_machine_v0.

```

This would ensure that any entries added to the log are applied against the active machine version
at the time they were added, leading to a deterministic outcome.

For smaller (but still breaking) changes that can be handled in the original
module it is also possible to switch based on the `machine_version` key included in the meta
data passed to `apply/3`.

### Runtime Behaviour

New versions are enabled whenever there is a quorum of members with a higher version and
one of them is elected leader. The leader will commit the new version to the
log and each follower will move to the new version when this log entry is applied.
Followers that do not yet have the new version available will receive log entries from the leader
and update their logs but will not apply log entries. When they are upgraded and have
the new version, all outstanding log entries will be applied. In practical terms this means
that Ra nodes can be upgraded one by one.

In order to be upgradeable, the state machine implementation will need to handle the version
bump in the form of a command that is passed to the `apply/3` callback:
`{machine_version, OldVersion, NewVersion}`. This provides an
opportunity to transform the state data into a new form, if needed. Note that the version
bump may be for several versions so it may be necessary to handle multiple
state transformations.
See [State Machine Tutorial](docs/internals/STATE_MACHINE_TUTORIAL.md) for
further information on state machines and the effects available


### Limitations
Expand All @@ -291,13 +121,15 @@ Ra does not support the Erlang hot code swapping mechanism.
There are two approaches to forming a cluster:

* All cluster members can be known ahead of time
* All cluster members can be joining existing members dynamically (this implies that one "seed" member is chosen and started first)
* All cluster members can be joining existing members dynamically
(this implies that one "seed" member is chosen and started first)

### Fixed Set of Members Known on Startup

Use `ra:start_or_restart_cluster/3` on one of the nodes to set up a cluster.
This will either create a new cluster or restart an existing one.
As cluster membership is persisted in Ra logs, newly added nodes will be discovered from the log.
As cluster membership is persisted in Ra logs, newly added nodes will be
discovered from the log.

### Dynamically Joining Nodes

Expand Down Expand Up @@ -325,11 +157,13 @@ The cluster name is mostly a "human-friendly" name for a Ra cluster.
Something that identifies the entity the cluster is meant to represent.
The cluster name isn't strictly part of a cluster's identity.

For example, in RabbitMQ's quorum queues case cluster names are derived from queue's identity.
For example, in RabbitMQ's quorum queues case cluster names are derived
from queue's identity.

### Server ID

A Ra server is a Ra cluster member. Server ID is defined as a pair of `{atom(), node()}`.
A Ra server is a Ra cluster member. Server ID is defined as a pair
of `{atom(), node()}`.
Server ID combines a locally registered name and the Erlang node it resides on.

Since server IDs identify Ra cluster members, they need to be a
Expand Down
Loading

0 comments on commit fc394fc

Please sign in to comment.