Event Sourcing¶
casty.EventJournal
¶
Bases: Protocol
Protocol for event journal backends.
Any class that implements persist, load, save_snapshot, and
load_snapshot satisfies this protocol via structural subtyping.
The kind property tells the replication layer whether replicas need
to persist events themselves (local) or can skip persistence because
the store is shared (centralized).
Examples:
kind
property
¶
Whether this journal is node-local or centralized.
persist(entity_id, events)
async
¶
Append events to the journal for a given entity.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
str
|
Unique identifier of the entity. |
required |
events
|
Sequence[PersistedEvent[Any]]
|
Events to persist, in order. |
required |
Examples:
load(entity_id, from_sequence_nr=0)
async
¶
Load events for an entity starting from a sequence number.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
str
|
Unique identifier of the entity. |
required |
from_sequence_nr
|
int
|
Minimum sequence number (inclusive). Defaults to 0. |
0
|
Returns:
| Type | Description |
|---|---|
list[PersistedEvent[Any]]
|
Events with |
Examples:
save_snapshot(entity_id, snapshot)
async
¶
Save a snapshot for an entity, replacing any previous one.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
str
|
Unique identifier of the entity. |
required |
snapshot
|
Snapshot[Any]
|
The snapshot to store. |
required |
Examples:
casty.InMemoryJournal
¶
In-memory event journal for testing and development.
Stores events and snapshots in plain dictionaries. Data is lost when
the process exits. Satisfies the EventJournal protocol.
Always local — each process has its own independent store.
Examples:
>>> from casty import InMemoryJournal, PersistedEvent
>>> journal = InMemoryJournal()
>>> await journal.persist("e1", [PersistedEvent(1, "created", 0.0)])
>>> events = await journal.load("e1")
>>> len(events)
1
kind
property
¶
__init__()
¶
persist(entity_id, events)
async
¶
Append events to the in-memory store.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
str
|
Unique identifier of the entity. |
required |
events
|
Sequence[PersistedEvent[Any]]
|
Events to persist. |
required |
Examples:
load(entity_id, from_sequence_nr=0)
async
¶
Load events from the in-memory store.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
str
|
Unique identifier of the entity. |
required |
from_sequence_nr
|
int
|
Minimum sequence number (inclusive). Defaults to 0. |
0
|
Returns:
| Type | Description |
|---|---|
list[PersistedEvent[Any]]
|
Matching events in insertion order. |
Examples:
save_snapshot(entity_id, snapshot)
async
¶
Save a snapshot, replacing any previous one for this entity.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
str
|
Unique identifier of the entity. |
required |
snapshot
|
Snapshot[Any]
|
The snapshot to store. |
required |
Examples:
casty.SqliteJournal
¶
SQLite-backed event journal for durable persistence.
Uses Python's stdlib sqlite3 with WAL mode for concurrent reads and
asyncio.to_thread for non-blocking I/O. A threading.Lock serializes
all writes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str | Path
|
Path to the SQLite database file, or |
':memory:'
|
serialize
|
Callable[[Any], bytes]
|
Serializer for events and snapshot state. Defaults to |
dumps
|
deserialize
|
Callable[[bytes], Any]
|
Deserializer for events and snapshot state. Defaults to |
loads
|
Examples:
>>> from casty import SqliteJournal, PersistedEvent
>>> journal = SqliteJournal()
>>> await journal.persist("e1", [PersistedEvent(1, "created", 0.0)])
>>> events = await journal.load("e1")
>>> len(events)
1
kind
property
¶
__init__(path=':memory:', *, serialize=pickle.dumps, deserialize=pickle.loads)
¶
persist(entity_id, events)
async
¶
load(entity_id, from_sequence_nr=0)
async
¶
save_snapshot(entity_id, snapshot)
async
¶
load_snapshot(entity_id)
async
¶
close()
¶
Close the underlying SQLite connection.
casty.JournalKind
¶
Bases: Enum
Declares whether a journal backend is node-local or centralized.
local
Each node has its own independent store (e.g. SQLite file).
Replicas must persist events they receive via ReplicateEvents.
centralized
All nodes share the same durable store (e.g. PostgreSQL, S3).
Replicas skip persistence — the primary's write is already visible
to every node.
casty.PersistedEvent
dataclass
¶
An event wrapped with journal metadata.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sequence_nr
|
int
|
Monotonically increasing sequence number for the entity. |
required |
event
|
E
|
The domain event payload. |
required |
timestamp
|
float
|
Unix timestamp when the event was persisted. |
required |
Examples:
casty.Snapshot
dataclass
¶
A point-in-time snapshot of an entity's state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sequence_nr
|
int
|
The sequence number up to which this snapshot covers. |
required |
state
|
S
|
The serialized actor state at snapshot time. |
required |
timestamp
|
float
|
Unix timestamp when the snapshot was taken. |
required |
Examples: