Skip to content

Cluster State

casty.ClusterState dataclass

Immutable, CRDT-mergeable snapshot of cluster membership.

Every node maintains its own ClusterState and periodically gossips it to peers. Merging two states produces a new state that is the union of members with vector-clock-ordered conflict resolution, guaranteeing convergence without coordination.

Parameters:

Name Type Description Default
members frozenset[Member]

Current cluster members.

(lambda: frozenset[Member]())()
unreachable frozenset[NodeAddress]

Nodes detected as unreachable by the failure detector.

(lambda: frozenset[NodeAddress]())()
version VectorClock

Causal version of this state.

VectorClock()
shard_allocations dict[str, dict[int, ShardAllocation]]

Per-shard-type allocation map (populated by the coordinator).

(lambda: {})()
allocation_epoch int

Monotonically increasing epoch for shard allocation changes.

0
seen frozenset[NodeAddress]

Nodes that have acknowledged this version (for convergence check).

(lambda: frozenset[NodeAddress]())()

Examples:

>>> node = NodeAddress("10.0.0.1", 2551)
>>> m = Member(node, MemberStatus.up, frozenset(), id="node-1")
>>> s1 = ClusterState().add_member(m)
>>> s2 = ClusterState().add_member(m)
>>> merged = s1.merge_members(s2)
>>> len(merged)
1

members = field(default_factory=(lambda: frozenset[Member]())) class-attribute instance-attribute

unreachable = field(default_factory=(lambda: frozenset[NodeAddress]())) class-attribute instance-attribute

version = field(default_factory=VectorClock) class-attribute instance-attribute

shard_allocations = field(default_factory=(lambda: {})) class-attribute instance-attribute

allocation_epoch = 0 class-attribute instance-attribute

seen = field(default_factory=(lambda: frozenset[NodeAddress]())) class-attribute instance-attribute

registry = field(default_factory=(lambda: frozenset[ServiceEntry]())) class-attribute instance-attribute

is_converged property

Whether all active members have seen the current state version.

Returns:

Type Description
bool

True when every non-down, non-removed member is in seen.

leader property

The current cluster leader, determined by lowest sorted address.

Returns:

Type Description
NodeAddress or None

Address of the leader, or None if no members are up.

add_member(member)

Return a new state with member added or replaced.

If a member with the same address already exists it is replaced.

Parameters:

Name Type Description Default
member Member

The member to add.

required

Returns:

Type Description
ClusterState

New state containing the member.

merge_members(other)

Merge member sets from two states, keeping self's version on conflict.

Parameters:

Name Type Description Default
other ClusterState

The remote state to merge with.

required

Returns:

Type Description
frozenset[Member]

Union of members; when both states contain the same address, self's entry wins (last-writer in dict update order).

update_status(address, status)

Return a new state with the given node's status changed.

Parameters:

Name Type Description Default
address NodeAddress

The node to update.

required
status MemberStatus

The new status.

required

Returns:

Type Description
ClusterState

New state with the updated member.

mark_unreachable(address)

Return a new state with address added to the unreachable set.

Parameters:

Name Type Description Default
address NodeAddress

The node to mark unreachable.

required

Returns:

Type Description
ClusterState

New state with updated unreachable set.

mark_reachable(address)

Return a new state with address removed from the unreachable set.

Parameters:

Name Type Description Default
address NodeAddress

The node to mark reachable again.

required

Returns:

Type Description
ClusterState

New state with updated unreachable set.

with_allocations(shard_type, allocations, epoch)

Return a new state with updated shard allocations for shard_type.

Parameters:

Name Type Description Default
shard_type str

The shard type name (e.g. "counter").

required
allocations dict[int, ShardAllocation]

Shard-id to allocation mapping.

required
epoch int

Allocation epoch for consistency tracking.

required

Returns:

Type Description
ClusterState

New state with the updated allocations.

__str__()

diff(previous)

Compute the changes between previous and this state.

Parameters:

Name Type Description Default
previous ClusterState or None

The previous state to diff against. None means empty.

required

Returns:

Type Description
ClusterChanges

A summary of joined/removed members and leader election.

__init__(members=(lambda: frozenset[Member]())(), unreachable=(lambda: frozenset[NodeAddress]())(), version=VectorClock(), shard_allocations=(lambda: {})(), allocation_epoch=0, seen=(lambda: frozenset[NodeAddress]())(), registry=(lambda: frozenset[ServiceEntry]())())

casty.Member dataclass

A single node's membership record in the cluster.

Parameters:

Name Type Description Default
address NodeAddress

Network address of the member.

required
status MemberStatus

Current lifecycle status.

required
roles frozenset[str]

Roles advertised by this member (e.g. {"frontend", "backend"}).

required
id NodeId

Human-readable identifier for this node (e.g. "worker-1").

required

Examples:

>>> m = Member(NodeAddress("10.0.0.1", 2551), MemberStatus.up, frozenset(), id="node-1")
>>> m.status
<MemberStatus.up: ...>

address instance-attribute

status instance-attribute

roles instance-attribute

id instance-attribute

__init__(address, status, roles, id)

casty.MemberStatus

Bases: Enum

Lifecycle status of a cluster member.

Members progress through statuses in order::

joining -> up -> leaving -> down -> removed

Only the cluster leader promotes joining to up once gossip has converged.

Examples:

>>> MemberStatus.joining
<MemberStatus.joining: ...>
>>> MemberStatus.up.name
'up'

joining = auto() class-attribute instance-attribute

up = auto() class-attribute instance-attribute

leaving = auto() class-attribute instance-attribute

down = auto() class-attribute instance-attribute

removed = auto() class-attribute instance-attribute

merge_priority property

Priority key for CRDT merge on concurrent vector clocks.

Returns a (alive, value) tuple where alive statuses always win over terminal ones. This ensures a rejoining node (joining) beats stale down/removed gossip.

casty.NodeAddress dataclass

Network address identifying a cluster node.

Ordered lexicographically by (host, port) so that deterministic leader election can sort members by address.

Parameters:

Name Type Description Default
host str

Hostname or IP address of the node.

required
port int

TCP port the node listens on.

required

Examples:

>>> addr = NodeAddress(host="127.0.0.1", port=2551)
>>> addr.host
'127.0.0.1'
>>> NodeAddress("a", 1) < NodeAddress("b", 1)
True

host instance-attribute

port instance-attribute

__lt__(other)

__init__(host, port)

casty.NodeId = str

casty.VectorClock dataclass

Logical clock for tracking causal ordering of cluster state versions.

Each node increments its own entry on every state mutation. Two clocks can be compared for happens-before or concurrency, and merged to produce a least upper bound.

Examples:

>>> node_a = NodeAddress("a", 1)
>>> node_b = NodeAddress("b", 1)
>>> vc = VectorClock().increment(node_a).increment(node_a)
>>> vc.version_of(node_a)
2
>>> merged = vc.merge(VectorClock().increment(node_b))
>>> merged.version_of(node_a), merged.version_of(node_b)
(2, 1)

version_of(node)

Return the version counter for node, defaulting to 0.

Parameters:

Name Type Description Default
node NodeAddress

The node whose version to look up.

required

Returns:

Type Description
int

The current counter for node.

increment(node)

Return a new clock with node's counter incremented by one.

Parameters:

Name Type Description Default
node NodeAddress

The node whose counter to increment.

required

Returns:

Type Description
VectorClock

A new clock with the updated counter.

merge(other)

Merge two clocks by taking the element-wise maximum.

This is the least upper bound (join) operation, guaranteeing CRDT convergence.

Parameters:

Name Type Description Default
other VectorClock

The clock to merge with.

required

Returns:

Type Description
VectorClock

A new clock where each entry is max(self[node], other[node]).

is_before(other)

Return whether this clock strictly happens-before other.

Parameters:

Name Type Description Default
other VectorClock

The clock to compare against.

required

Returns:

Type Description
bool

True if every entry in self is <= the corresponding entry in other and at least one is strictly less.

is_concurrent_with(other)

Return whether this clock is concurrent with other.

Two clocks are concurrent when neither happens-before the other and they are not equal.

Parameters:

Name Type Description Default
other VectorClock

The clock to compare against.

required

Returns:

Type Description
bool

True if the clocks are causally unrelated.

__init__(_versions=(lambda: dict[NodeAddress, int]())())

casty.ShardAllocation dataclass

Tracks which node owns a shard and where its replicas live.

Parameters:

Name Type Description Default
primary NodeAddress

Node that owns the shard and handles writes.

required
replicas tuple[NodeAddress, ...]

Nodes holding passive replica copies.

()

Examples:

>>> alloc = ShardAllocation(primary=NodeAddress("10.0.0.1", 25520))
>>> alloc.replicas
()

primary instance-attribute

replicas = () class-attribute instance-attribute

__init__(primary, replicas=())

casty.ServiceEntry dataclass

A registered service in the cluster registry.

Parameters:

Name Type Description Default
key str

Service name (e.g. "payment-service").

required
node NodeAddress

Node where the actor lives.

required
path str

Actor path on that node.

required

key instance-attribute

node instance-attribute

path instance-attribute

__init__(key, node, path)