Skip to content

Event Sourcing

Traditional persistence stores the current state — a row in a database with balance = 500. Event sourcing stores the sequence of facts that produced the stateDeposited(100), Deposited(500), Withdrawn(100). The current state is a derived value, computed by folding events from left to right.

This distinction has profound consequences:

  • Complete audit trail. Every state change is recorded as an immutable event. You can reconstruct the state at any point in time by replaying events up to that moment.
  • Recovery after failure. When an actor restarts (via supervision) or a process crashes and restarts, the actor replays its events from the journal and recovers its exact state. Supervision provides the restart; event sourcing provides the memory.
  • Separation of concerns. The decision to change state (command handling) is separated from the application of the change (event handling). Commands can be rejected; events are facts that have already occurred and cannot be rejected.

Event sourcing introduces three distinct concepts:

  • Commands — Messages from the outside world requesting a state change. "Deposit 100" is a command. It may be accepted or rejected (e.g., "insufficient funds" for a withdrawal).
  • Events — Immutable records of what actually happened. "Deposited 100" is an event. Events are persisted to a journal and never modified.
  • State — The current value derived from the event sequence. Computed by folding on_event over all persisted events from an initial state.

A regular Casty actor holds state in a closure — but closures cannot be replayed. Event sourcing requires a different behavior shape with two explicit functions:

  • on_event(state, event) -> state — A pure, synchronous function that applies one event to the current state. Used both for live processing and for recovery (replay).
  • on_command(ctx, state, command) -> Behavior — An async function that receives the current state and a command, decides what events to persist, and returns Behaviors.persisted(events=[...]).
# --- State ---

@dataclass(frozen=True)
class AccountState:
    balance: int
    tx_count: int

# --- Events (persisted to the journal) ---

@dataclass(frozen=True)
class Deposited:
    amount: int

@dataclass(frozen=True)
class Withdrawn:
    amount: int

type AccountEvent = Deposited | Withdrawn

# --- Commands (sent by the outside world) ---

@dataclass(frozen=True)
class Deposit:
    amount: int

@dataclass(frozen=True)
class Withdraw:
    amount: int
    reply_to: ActorRef[str]

@dataclass(frozen=True)
class GetBalance:
    reply_to: ActorRef[AccountState]

type AccountCommand = Deposit | Withdraw | GetBalance

# --- Event handler (pure — used for replay and live updates) ---

def on_event(state: AccountState, event: AccountEvent) -> AccountState:
    match event:
        case Deposited(amount):
            return AccountState(balance=state.balance + amount, tx_count=state.tx_count + 1)
        case Withdrawn(amount):
            return AccountState(balance=state.balance - amount, tx_count=state.tx_count + 1)
    return state

# --- Command handler (async — decides what events to persist) ---

async def on_command(ctx: Any, state: AccountState, cmd: AccountCommand) -> Any:
    match cmd:
        case Deposit(amount):
            return Behaviors.persisted(events=[Deposited(amount)])
        case Withdraw(amount, reply_to) if state.balance >= amount:
            reply_to.tell("ok")
            return Behaviors.persisted(events=[Withdrawn(amount)])
        case Withdraw(_, reply_to):
            reply_to.tell(f"insufficient funds (balance={state.balance})")
            return Behaviors.same()
        case GetBalance(reply_to):
            reply_to.tell(state)
            return Behaviors.same()
    return Behaviors.unhandled()

# --- Entity factory ---

journal = InMemoryJournal()

def bank_account(entity_id: str) -> Behavior[AccountCommand]:
    return Behaviors.event_sourced(
        entity_id=entity_id,
        journal=journal,
        initial_state=AccountState(balance=0, tx_count=0),
        on_event=on_event,
        on_command=on_command,
        snapshot_policy=SnapshotEvery(n_events=100),
    )

# --- Recovery demonstration ---

async def main() -> None:
    # Phase 1: Normal operations
    async with ActorSystem(name="bank") as system:
        account = system.spawn(bank_account("acc-001"), "account")
        await asyncio.sleep(0.1)

        account.tell(Deposit(1000))
        account.tell(Deposit(500))
        await asyncio.sleep(0.1)

        state = await system.ask(
            account, lambda r: GetBalance(reply_to=r), timeout=2.0
        )
        print(f"Balance: {state.balance}, transactions: {state.tx_count}")
        # Balance: 1500, transactions: 2

    # Phase 2: Recovery — new actor, same journal
    async with ActorSystem(name="bank") as system:
        account = system.spawn(bank_account("acc-001"), "account")
        await asyncio.sleep(0.1)

        state = await system.ask(
            account, lambda r: GetBalance(reply_to=r), timeout=2.0
        )
        print(f"Recovered balance: {state.balance}, transactions: {state.tx_count}")
        # Recovered balance: 1500, transactions: 2

asyncio.run(main())

In Phase 2, a new actor is spawned with the same entity_id. The framework automatically loads the latest snapshot (if any), replays events from the journal since that snapshot, and reconstructs the state. The actor resumes exactly where it left off.

The SnapshotEvery(n_events=100) policy periodically saves the current state to the journal. Without snapshots, recovery requires replaying every event from the beginning — acceptable for entities with few events, but expensive for long-lived entities with thousands.

Journal Backends

The EventJournal protocol is storage-agnostic — four methods (persist, load, save_snapshot, load_snapshot) and a kind property. Casty ships two implementations:

Backend Durability Use case
InMemoryJournal None (process-scoped) Tests, prototyping
SqliteJournal File on disk Single-node production, local development

SqliteJournal

Durable event sourcing backed by stdlib sqlite3. Zero external dependencies.

from casty import SqliteJournal

journal = SqliteJournal("data/events.db")

def bank_account(entity_id: str) -> Behavior[AccountCommand]:
    return Behaviors.event_sourced(
        entity_id=entity_id,
        journal=journal,
        initial_state=AccountState(balance=0, tx_count=0),
        on_event=on_event,
        on_command=on_command,
    )

WAL mode is enabled for concurrent reads. All writes go through asyncio.to_thread so the event loop is never blocked. Serialization defaults to pickle but is pluggable:

import json

journal = SqliteJournal(
    "data/events.db",
    serialize=lambda obj: json.dumps(obj, default=str).encode(),
    deserialize=lambda data: json.loads(data),
)

JournalKind — Local vs Centralized

Every journal declares its kind: local or centralized.

local (SQLite, InMemory)           centralized (PostgreSQL, S3, ...)
────────────────────────           ──────────────────────────────────
Each node has its own store.       All nodes share one store.
Replicas must persist events       Replicas skip persist — the
they receive from the primary.     primary's write is already
                                   visible to every node.

InMemoryJournal and SqliteJournal are both local. When you implement a journal backed by a shared database (PostgreSQL, DynamoDB, S3), set kind to centralized — the replication layer will automatically skip redundant writes on replica nodes:

from casty import JournalKind

class PostgresJournal:
    @property
    def kind(self) -> JournalKind:
        return JournalKind.centralized

    async def persist(self, entity_id, events): ...
    async def load(self, entity_id, from_sequence_nr=0): ...
    async def save_snapshot(self, entity_id, snapshot): ...
    async def load_snapshot(self, entity_id): ...

With a centralized journal, replicas still receive ReplicateEvents messages and maintain in-memory state (hot standby for fast failover), but they don't write to the journal since the data is already there.

Custom Backends

Implement the EventJournal protocol for any storage:

  1. Define persist, load, save_snapshot, load_snapshot with matching signatures.
  2. Set kind to local (node-local store) or centralized (shared store).
  3. Pass the instance to Behaviors.event_sourced(journal=...).

No inheritance needed — structural subtyping handles the rest.


Next: Cluster Sharding