Skip to content

Event Sourcing

casty.EventJournal

Bases: Protocol

Protocol for event journal backends.

Any class that implements persist, load, save_snapshot, and load_snapshot satisfies this protocol via structural subtyping.

The kind property tells the replication layer whether replicas need to persist events themselves (local) or can skip persistence because the store is shared (centralized).

Examples:

>>> from casty import InMemoryJournal
>>> journal: EventJournal = InMemoryJournal()

kind property

Whether this journal is node-local or centralized.

persist(entity_id, events) async

Append events to the journal for a given entity.

Parameters:

Name Type Description Default
entity_id str

Unique identifier of the entity.

required
events Sequence[PersistedEvent[Any]]

Events to persist, in order.

required

Examples:

>>> await journal.persist("acct-1", [PersistedEvent(1, "evt", 0.0)])

load(entity_id, from_sequence_nr=0) async

Load events for an entity starting from a sequence number.

Parameters:

Name Type Description Default
entity_id str

Unique identifier of the entity.

required
from_sequence_nr int

Minimum sequence number (inclusive). Defaults to 0.

0

Returns:

Type Description
list[PersistedEvent[Any]]

Events with sequence_nr >= from_sequence_nr.

Examples:

>>> events = await journal.load("acct-1", from_sequence_nr=5)

save_snapshot(entity_id, snapshot) async

Save a snapshot for an entity, replacing any previous one.

Parameters:

Name Type Description Default
entity_id str

Unique identifier of the entity.

required
snapshot Snapshot[Any]

The snapshot to store.

required

Examples:

>>> snap = Snapshot(sequence_nr=10, state=100, timestamp=1.0)
>>> await journal.save_snapshot("acct-1", snap)

load_snapshot(entity_id) async

Load the latest snapshot for an entity, if one exists.

Parameters:

Name Type Description Default
entity_id str

Unique identifier of the entity.

required

Returns:

Type Description
Snapshot[Any] | None

The stored snapshot, or None if no snapshot exists.

Examples:

>>> snap = await journal.load_snapshot("acct-1")

casty.InMemoryJournal

In-memory event journal for testing and development.

Stores events and snapshots in plain dictionaries. Data is lost when the process exits. Satisfies the EventJournal protocol.

Always local — each process has its own independent store.

Examples:

>>> from casty import InMemoryJournal, PersistedEvent
>>> journal = InMemoryJournal()
>>> await journal.persist("e1", [PersistedEvent(1, "created", 0.0)])
>>> events = await journal.load("e1")
>>> len(events)
1

kind property

__init__()

persist(entity_id, events) async

Append events to the in-memory store.

Parameters:

Name Type Description Default
entity_id str

Unique identifier of the entity.

required
events Sequence[PersistedEvent[Any]]

Events to persist.

required

Examples:

>>> await journal.persist("e1", [PersistedEvent(1, "x", 0.0)])

load(entity_id, from_sequence_nr=0) async

Load events from the in-memory store.

Parameters:

Name Type Description Default
entity_id str

Unique identifier of the entity.

required
from_sequence_nr int

Minimum sequence number (inclusive). Defaults to 0.

0

Returns:

Type Description
list[PersistedEvent[Any]]

Matching events in insertion order.

Examples:

>>> events = await journal.load("e1", from_sequence_nr=2)

save_snapshot(entity_id, snapshot) async

Save a snapshot, replacing any previous one for this entity.

Parameters:

Name Type Description Default
entity_id str

Unique identifier of the entity.

required
snapshot Snapshot[Any]

The snapshot to store.

required

Examples:

>>> snap = Snapshot(sequence_nr=5, state="s", timestamp=1.0)
>>> await journal.save_snapshot("e1", snap)

load_snapshot(entity_id) async

Load the latest snapshot for an entity.

Parameters:

Name Type Description Default
entity_id str

Unique identifier of the entity.

required

Returns:

Type Description
Snapshot[Any] | None

The stored snapshot, or None.

Examples:

>>> snap = await journal.load_snapshot("e1")

casty.SqliteJournal

SQLite-backed event journal for durable persistence.

Uses Python's stdlib sqlite3 with WAL mode for concurrent reads and asyncio.to_thread for non-blocking I/O. A threading.Lock serializes all writes.

Parameters:

Name Type Description Default
path str | Path

Path to the SQLite database file, or ":memory:" for an in-memory database (useful for testing).

':memory:'
serialize Callable[[Any], bytes]

Serializer for events and snapshot state. Defaults to pickle.dumps.

dumps
deserialize Callable[[bytes], Any]

Deserializer for events and snapshot state. Defaults to pickle.loads.

loads

Examples:

>>> from casty import SqliteJournal, PersistedEvent
>>> journal = SqliteJournal()
>>> await journal.persist("e1", [PersistedEvent(1, "created", 0.0)])
>>> events = await journal.load("e1")
>>> len(events)
1

kind property

__init__(path=':memory:', *, serialize=pickle.dumps, deserialize=pickle.loads)

persist(entity_id, events) async

load(entity_id, from_sequence_nr=0) async

save_snapshot(entity_id, snapshot) async

load_snapshot(entity_id) async

close()

Close the underlying SQLite connection.

casty.JournalKind

Bases: Enum

Declares whether a journal backend is node-local or centralized.

local Each node has its own independent store (e.g. SQLite file). Replicas must persist events they receive via ReplicateEvents.

centralized All nodes share the same durable store (e.g. PostgreSQL, S3). Replicas skip persistence — the primary's write is already visible to every node.

local = 'local' class-attribute instance-attribute

centralized = 'centralized' class-attribute instance-attribute

casty.PersistedEvent dataclass

An event wrapped with journal metadata.

Parameters:

Name Type Description Default
sequence_nr int

Monotonically increasing sequence number for the entity.

required
event E

The domain event payload.

required
timestamp float

Unix timestamp when the event was persisted.

required

Examples:

>>> from casty import PersistedEvent
>>> evt = PersistedEvent(sequence_nr=1, event="deposited", timestamp=1.0)
>>> evt.sequence_nr
1

sequence_nr instance-attribute

event instance-attribute

timestamp instance-attribute

__init__(sequence_nr, event, timestamp)

casty.Snapshot dataclass

A point-in-time snapshot of an entity's state.

Parameters:

Name Type Description Default
sequence_nr int

The sequence number up to which this snapshot covers.

required
state S

The serialized actor state at snapshot time.

required
timestamp float

Unix timestamp when the snapshot was taken.

required

Examples:

>>> from casty import Snapshot
>>> snap = Snapshot(sequence_nr=10, state={"balance": 100}, timestamp=2.0)
>>> snap.state
{'balance': 100}

sequence_nr instance-attribute

state instance-attribute

timestamp instance-attribute

__init__(sequence_nr, state, timestamp)