Event Sourcing¶
In this guide you'll build a bank account that persists every transaction as an event. Along the way you'll learn the command-event-state flow, how actors recover from the journal on restart, and how snapshots speed up recovery.
The Idea¶
Instead of saving current state ("balance is 120"), event sourcing saves what happened ("deposited 100, deposited 50, withdrew 30"). State is derived by replaying events. This gives you a full audit trail and the ability to reconstruct any past state.
State¶
A frozen dataclass holding the derived state:
Events¶
Events are facts that already happened. They're persisted to the journal and replayed during recovery:
@dataclass(frozen=True)
class Deposited:
amount: int
@dataclass(frozen=True)
class Withdrawn:
amount: int
type AccountEvent = Deposited | Withdrawn
Commands¶
Commands are requests from the outside world. The actor decides whether to accept or reject them:
@dataclass(frozen=True)
class Deposit:
amount: int
@dataclass(frozen=True)
class Withdraw:
amount: int
reply_to: ActorRef[str]
@dataclass(frozen=True)
class GetBalance:
reply_to: ActorRef[int]
type AccountCmd = Deposit | Withdraw | GetBalance
Notice the separation: Deposit (command) vs Deposited (event). The command is the intent; the event is the outcome.
Event Handler¶
A pure function that applies one event to the current state. No side effects — this is called both during normal operation and during recovery:
def apply_event(state: AccountState, event: AccountEvent) -> AccountState:
match event:
case Deposited(amount):
return AccountState(
balance=state.balance + amount,
transactions=state.transactions + 1,
)
case Withdrawn(amount):
return AccountState(
balance=state.balance - amount,
transactions=state.transactions + 1,
)
Command Handler¶
The command handler decides which events to persist. It returns Behaviors.persisted([...]) to queue events, or Behaviors.same() for read-only operations:
async def handle_command(
ctx: ActorContext[AccountCmd], state: AccountState, cmd: AccountCmd
) -> Behavior[AccountCmd]:
match cmd:
case Deposit(amount):
print(f" Depositing {amount}")
return Behaviors.persisted([Deposited(amount)])
case Withdraw(amount, reply_to) if amount <= state.balance:
print(f" Withdrawing {amount}")
reply_to.tell("ok")
return Behaviors.persisted([Withdrawn(amount)])
case Withdraw(amount, reply_to):
reply_to.tell(f"insufficient funds (balance: {state.balance})")
return Behaviors.same()
case GetBalance(reply_to):
reply_to.tell(state.balance)
return Behaviors.same()
Walking through the arms:
- Deposit — always succeeds. Persists a
Depositedevent. - Withdraw (sufficient funds) — replies "ok", persists
Withdrawn. - Withdraw (insufficient) — replies with error, persists nothing.
- GetBalance — read-only. No events, just a reply.
Wiring It Together¶
Behaviors.event_sourced() ties the pieces together:
def bank_account(entity_id: str, journal: InMemoryJournal) -> Behavior[AccountCmd]:
return Behaviors.event_sourced(
entity_id=entity_id,
journal=journal,
initial_state=AccountState(),
on_event=apply_event,
on_command=handle_command,
snapshot_policy=SnapshotEvery(n_events=5),
)
SnapshotEvery(n_events=5) tells the journal to save a full state snapshot after every 5 events. On recovery, the actor loads the snapshot first and only replays events after it — much faster for entities with long histories.
Running It¶
The example runs in three phases: transactions, recovery, and snapshot recovery:
async def main() -> None:
journal = InMemoryJournal()
# --- Phase 1: build up state ---
print("── Phase 1: transactions ──")
async with ActorSystem(name="bank") as system:
account: ActorRef[AccountCmd] = system.spawn(
bank_account("acct-1", journal), "account"
)
account.tell(Deposit(100))
account.tell(Deposit(50))
await asyncio.sleep(0.1)
result = await system.ask(
account,
lambda r: Withdraw(30, reply_to=r),
timeout=5.0,
)
print(f" Withdraw result: {result}")
result = await system.ask(
account,
lambda r: Withdraw(999, reply_to=r),
timeout=5.0,
)
print(f" Withdraw result: {result}")
balance = await system.ask(
account, lambda r: GetBalance(reply_to=r), timeout=5.0
)
print(f" Balance: {balance}")
# Show what was persisted
events = await journal.load("acct-1")
print(f"\n── Journal has {len(events)} events ──")
for e in events:
print(f" #{e.sequence_nr}: {e.event}")
# --- Phase 2: recover from journal ---
print("\n── Phase 2: recovery ──")
async with ActorSystem(name="bank") as system:
account = system.spawn(bank_account("acct-1", journal), "account")
await asyncio.sleep(0.1)
balance = await system.ask(
account, lambda r: GetBalance(reply_to=r), timeout=5.0
)
print(f" Recovered balance: {balance}")
# --- Phase 3: demonstrate snapshot recovery ---
print("\n── Phase 3: snapshot recovery ──")
snap_journal = InMemoryJournal()
async with ActorSystem(name="bank") as system:
account = system.spawn(
bank_account("acct-2", snap_journal), "account"
)
for _ in range(7):
account.tell(Deposit(10))
await asyncio.sleep(0.2)
balance = await system.ask(
account, lambda r: GetBalance(reply_to=r), timeout=5.0
)
print(f" Balance after 7 deposits: {balance}")
snapshot = await snap_journal.load_snapshot("acct-2")
events = await snap_journal.load("acct-2")
print(f" Snapshot at seq #{snapshot.sequence_nr}: {snapshot.state}" if snapshot else " No snapshot")
print(f" Journal has {len(events)} events total")
print(f" Recovery replays only {len(events) - (snapshot.sequence_nr if snapshot else 0)} events")
Output:
── Phase 1: transactions ──
Depositing 100
Depositing 50
Withdrawing 30
Withdraw result: ok
Withdraw result: insufficient funds (balance: 120)
Balance: 120
── Journal has 3 events ──
#1: Deposited(amount=100)
#2: Deposited(amount=50)
#3: Withdrawn(amount=30)
── Phase 2: recovery ──
Recovered balance: 120
── Phase 3: snapshot recovery ──
Depositing 10
...
Balance after 7 deposits: 70
Snapshot at seq #5: AccountState(balance=50, transactions=5)
Journal has 7 events total
Recovery replays only 2 events
Phase 2 shows recovery: a new actor with the same entity_id replays all events from the journal and arrives at the same state. Phase 3 shows snapshot optimization: after 7 deposits, a snapshot was taken at event #5. On recovery, only 2 events (6 and 7) need replaying.
Run the Full Example¶
git clone https://github.com/gabfssilva/casty.git
cd casty
uv run python examples/guides/05_event_sourcing.py
What you learned:
- Commands are requests; events are facts. Commands may be rejected; events are always applied.
on_eventis a pure function that applies events to state — used for both normal operation and recovery.on_commanddecides which events to persist. ReturnBehaviors.persisted([...])orBehaviors.same().- Recovery replays all events from the journal, reconstructing state automatically.
- Snapshots speed up recovery by checkpointing state at regular intervals.