Skip to content

Cluster

casty.Cluster

High-level handle for starting and querying a cluster on an ActorSystem.

Wraps the internal cluster actor, providing start, get_state, and shutdown methods.

Parameters:

Name Type Description Default
system ActorSystem

The actor system that will host the cluster actor.

required
config ClusterConfig

Cluster network and seed configuration.

required
remote_transport RemoteTransport or None

Transport for cross-node communication. None for local-only.

None
system_name str

Logical name of the actor system (used in actor addresses).

''
gossip_interval float

Seconds between gossip rounds.

1.0
gossip_fanout int

Number of peers contacted per gossip round.

3
heartbeat_interval float

Seconds between heartbeat state polls.

0.5
availability_interval float

Seconds between failure-detector availability checks.

2.0
failure_detector_config FailureDetectorConfig or None

Tuning for the phi accrual failure detector.

None

Examples:

>>> cluster = Cluster(system, ClusterConfig("127.0.0.1", 2551, node_id="node-1"))
>>> await cluster.start()
>>> state = await cluster.get_state()
>>> len(state.members)
1

ref property

The cluster actor's ref.

Returns:

Type Description
ActorRef[ClusterCmd]

Reference to the running cluster actor.

Raises:

Type Description
RuntimeError

If the cluster has not been started yet.

__init__(system, config, *, remote_transport=None, local_transport=None, system_name='', gossip_interval=1.0, gossip_fanout=3, heartbeat_interval=0.5, availability_interval=2.0, failure_detector_config=None, event_stream=None)

start() async

Spawn the cluster actor and begin membership protocol.

After this method returns the cluster is actively gossiping and monitoring heartbeats.

get_state(*, timeout=5.0) async

Request the current cluster state from the topology actor.

Parameters:

Name Type Description Default
timeout float

Maximum seconds to wait for a reply.

5.0

Returns:

Type Description
ClusterState

The latest cluster state snapshot.

get_receptionist(*, timeout=5.0) async

Ask the cluster actor for the receptionist ref.

shutdown() async

Gracefully shut down the cluster actor.

casty.ClusterConfig dataclass

Configuration for joining or forming a cluster.

Parameters:

Name Type Description Default
host str

Bind address for this node.

required
port int

TCP port for this node.

required
seed_nodes tuple[tuple[str, int], ...]

Initial contact points for cluster formation. An empty tuple means this node forms a new single-node cluster.

()
node_id NodeId

Human-readable identifier for this node (e.g. "worker-1").

required
roles frozenset[str]

Roles to advertise for this node.

(lambda: frozenset[str]())()

Examples:

>>> cfg = ClusterConfig("127.0.0.1", 2551, node_id="node-1", seed_nodes=(("127.0.0.1", 2552),))
>>> cfg.host, cfg.port
('127.0.0.1', 2551)

host instance-attribute

port instance-attribute

node_id instance-attribute

seed_nodes = () class-attribute instance-attribute

roles = field(default_factory=(lambda: frozenset[str]())) class-attribute instance-attribute

__init__(host, port, node_id, seed_nodes=(), roles=(lambda: frozenset[str]())())

casty.ResolveNode dataclass

Query to resolve a NodeId to its NodeAddress.

node_id instance-attribute

reply_to instance-attribute

__init__(node_id, reply_to)

casty.TopologySnapshot dataclass

Full cluster topology pushed to subscribers.

Parameters:

Name Type Description Default
members frozenset[Member]

Current cluster members with their statuses.

required
leader NodeAddress | None

Address of the current cluster leader, or None.

required
shard_allocations dict[str, dict[int, ShardAllocation]]

Per-shard-type mapping of shard_id to allocation (primary + replicas).

required
allocation_epoch int

Monotonically increasing epoch for allocation consistency.

required
unreachable frozenset[NodeAddress]

Nodes currently marked unreachable by the failure detector.

(lambda: frozenset[NodeAddress]())()
registry frozenset[ServiceEntry]

Service registry entries replicated across the cluster.

(lambda: frozenset[ServiceEntry]())()

members instance-attribute

leader instance-attribute

shard_allocations instance-attribute

allocation_epoch instance-attribute

unreachable = field(default_factory=(lambda: frozenset[NodeAddress]())) class-attribute instance-attribute

registry = field(default_factory=(lambda: frozenset[ServiceEntry]())) class-attribute instance-attribute

__init__(members, leader, shard_allocations, allocation_epoch, unreachable=(lambda: frozenset[NodeAddress]())(), registry=(lambda: frozenset[ServiceEntry]())())

casty.SubscribeTopology dataclass

Request to receive topology updates.

Parameters:

Name Type Description Default
reply_to ActorRef[TopologySnapshot]

Ref that will receive TopologySnapshot messages.

required

reply_to instance-attribute

__init__(reply_to)

casty.UnsubscribeTopology dataclass

Request to stop receiving topology updates.

Parameters:

Name Type Description Default
subscriber ActorRef[TopologySnapshot]

The ref to remove from the subscriber list.

required

subscriber instance-attribute

__init__(subscriber)