Skip to content

Sharding

casty.ReplicationConfig dataclass

Configuration for shard replication.

Controls how many replica copies are maintained and how many acknowledgements are required before a write is considered committed.

Parameters:

Name Type Description Default
replicas int

Number of replica copies per shard (0 disables replication).

0
min_acks int

Minimum replica acknowledgements required before confirming a write.

0
ack_timeout float

Seconds to wait for replica acknowledgements.

5.0

Examples:

>>> ReplicationConfig(replicas=2, min_acks=1, ack_timeout=3.0)
ReplicationConfig(replicas=2, min_acks=1, ack_timeout=3.0)

replicas = 0 class-attribute instance-attribute

min_acks = 0 class-attribute instance-attribute

ack_timeout = 5.0 class-attribute instance-attribute

__init__(replicas=0, min_acks=0, ack_timeout=5.0)

casty.ReplicateEvents dataclass

Internal message: replicate persisted events to a replica node.

Sent by the primary shard region to replica regions after persisting new events locally.

Parameters:

Name Type Description Default
entity_id str

Entity whose events are being replicated.

required
shard_id int

Shard that owns the entity.

required
events tuple[PersistedEvent[Any], ...]

Events to replicate.

required
reply_to ActorRef[Any] | None

Optional ref to acknowledge replication.

None

Examples:

>>> ReplicateEvents("user-1", shard_id=7, events=(evt,))
ReplicateEvents(entity_id='user-1', shard_id=7, ...)

entity_id instance-attribute

shard_id instance-attribute

events instance-attribute

reply_to = None class-attribute instance-attribute

__init__(entity_id, shard_id, events, reply_to=None)

casty.ReplicateEventsAck dataclass

Acknowledgement that a replica has persisted replicated events.

Parameters:

Name Type Description Default
entity_id str

Entity whose events were replicated.

required
sequence_nr int

Highest sequence number persisted by the replica.

required

Examples:

>>> ReplicateEventsAck("user-1", sequence_nr=5)
ReplicateEventsAck(entity_id='user-1', sequence_nr=5)

entity_id instance-attribute

sequence_nr instance-attribute

__init__(entity_id, sequence_nr)

casty.ClusteredActorSystem

Bases: ActorSystem

Actor system with cluster membership, sharding, and remote messaging.

Extends ActorSystem to transparently distribute ShardedBehavior actors across cluster nodes. Handles gossip-based membership, failure detection, shard coordination, and TCP transport.

Use as an async context manager to start/stop the cluster lifecycle.

Parameters:

Name Type Description Default
name str

Logical system name (shared across cluster nodes).

required
host str

Advertised hostname for this node.

required
port int

Advertised port for this node (use 0 for auto-assignment).

required
seed_nodes tuple[tuple[str, int], ...] | None

Initial contact points for cluster join.

None
roles frozenset[str]

Roles assigned to this node (for role-aware shard placement).

frozenset()
bind_host str | None

Network interface to bind to (defaults to host).

None
config CastyConfig | None

Full configuration object.

None
tls Config | None

TLS configuration for inter-node communication. Use TlsConfig.from_paths(...) for file-based setup or pass pre-built ssl.SSLContext via TlsConfig(server_context=..., client_context=...).

None
required_quorum int | None

If set, __aenter__ blocks until this many nodes are up.

None

Examples:

>>> async with ClusteredActorSystem(
...     name="my-app", host="127.0.0.1", port=25520,
...     seed_nodes=[("127.0.0.1", 25520)],
... ) as system:
...     ref = system.spawn(Behaviors.sharded(my_entity, num_shards=50), "things")
...     ref.tell(ShardEnvelope("abc", DoSomething()))

receptionist property

The cluster-wide receptionist for service discovery.

self_node property

The NodeAddress representing this cluster member.

__init__(*, name, host, port, node_id, seed_nodes=None, roles=frozenset(), bind_host=None, config=None, tls=None, required_quorum=None, serializer=None)

__make_ref__(id, deliver)

from_config(config, *, host=None, port=None, node_id=None, seed_nodes=None, bind_host=None, tls=None, required_quorum=None) classmethod

Create a ClusteredActorSystem from a CastyConfig.

Reads host, port, seed nodes, and roles from the [cluster] section of the config. Keyword arguments override config values.

Parameters:

Name Type Description Default
config CastyConfig

Parsed configuration (typically from load_config).

required

Returns:

Type Description
ClusteredActorSystem

Raises:

Type Description
ValueError

If the config has no [cluster] section.

Examples:

>>> config = load_config(Path("casty.toml"))
>>> system = ClusteredActorSystem.from_config(config)

__aenter__() async

spawn(behavior, name, *, mailbox=None)

spawn(
    behavior: BroadcastedBehavior[M], name: str
) -> BroadcastRef[M]
spawn(
    behavior: ShardedBehavior[M], name: str
) -> ActorRef[ShardEnvelope[M]]
spawn(
    behavior: SingletonBehavior[M], name: str
) -> ActorRef[M]
spawn(
    behavior: Behavior[M],
    name: str,
    *,
    mailbox: Mailbox[M] | None = None,
) -> ActorRef[M]

ask(ref, msg_factory, *, timeout) async

ask(
    ref: BroadcastRef[M],
    msg_factory: Callable[[ActorRef[R]], M],
    *,
    timeout: float,
) -> tuple[R, ...]
ask(
    ref: ActorRef[M],
    msg_factory: Callable[[ActorRef[R]], M],
    *,
    timeout: float,
) -> R

Ask with remote-addressable temp reply ref.

When ref is a BroadcastRef, the message is fanned out to all cluster members and responses are collected into a tuple[R, ...].

get_cluster_state(*, timeout=5.0) async

Query the current cluster membership state.

Returns:

Type Description
ClusterState

Snapshot of members, their statuses, and the vector clock.

Examples:

>>> state = await system.get_cluster_state()
>>> len(state.members)
3

wait_for(n, *, timeout=60.0) async

Block until at least n members have status up.

Parameters:

Name Type Description Default
n int

Minimum number of up members required.

required
timeout float

Seconds to wait before raising TimeoutError.

60.0

Returns:

Type Description
ClusterState

The cluster state once the quorum is reached.

Examples:

>>> state = await system.wait_for(3, timeout=30.0)

barrier(name, n, *, timeout=60.0) async

Distributed barrier -- blocks until n nodes have reached this point.

Parameters:

Name Type Description Default
name str

Barrier name (shared across nodes).

required
n int

Number of nodes that must arrive before all are released.

required
timeout float

Seconds to wait before raising TimeoutError.

60.0

Examples:

>>> await system.barrier("setup-done", n=3)

lookup(path, *, node=None, timeout=5.0)

lookup(
    path: ServiceKey[M], *, timeout: float = 5.0
) -> Coroutine[Any, Any, Listing[M]]
lookup(
    path: str, *, node: NodeId | NodeAddress | None = None
) -> ActorRef[Any] | None

Look up an actor by path or find services by key.

When path is a str, performs a path-based lookup (optionally on a remote node). When it is a ServiceKey, queries the cluster receptionist and returns an awaitable Listing.

Parameters:

Name Type Description Default
path str | ServiceKey

Actor path (e.g. "/my-actor") or a typed service key.

required
node NodeId | NodeAddress | None

Target node for path-based lookup. None means local.

None
timeout float

Seconds to wait for the receptionist (only for ServiceKey).

5.0

Returns:

Type Description
ActorRef[Any] | None | Coroutine[Any, Any, Listing]

For path lookups, the actor reference or None. For service key lookups, an awaitable Listing.

Examples:

>>> ref = system.lookup("/my-actor")
>>> remote = system.lookup("/my-actor", node="worker-1")
>>> listing = await system.lookup(ServiceKey[PaymentMsg]("payment"))

region_ref(key, factory, *, num_shards)

Return a ref that routes ShardEnvelope to a sharded region.

Spawns a new shard region if one with the given key does not already exist.

entity_ask(ref, msg_factory, *, timeout) async

Send a message to a sharded entity and wait for a reply.

distributed(*, journal=None)

Create a Distributed facade for this system.

Parameters:

Name Type Description Default
journal EventJournal | None

If provided, data structures use event sourcing for persistence.

None

Returns:

Type Description
Distributed

Examples:

>>> d = system.distributed()
>>> counter = d.counter("hits")

shutdown() async

Shut down the cluster node, closing transport and all actors.

casty.ShardEnvelope dataclass

Envelope that routes a message to a specific entity within a shard region.

Wraps a message M together with an entity_id used for deterministic shard assignment. The shard proxy computes the target shard from the entity_id and forwards the envelope to the owning node.

Parameters:

Name Type Description Default
entity_id str

Logical identifier of the target entity.

required
message M

The payload message delivered to the entity actor.

required

Examples:

>>> ref.tell(ShardEnvelope("user-42", Deposit(amount=100)))

entity_id instance-attribute

message instance-attribute

__init__(entity_id, message)