Cluster State¶
casty.ClusterState
dataclass
¶
Immutable, CRDT-mergeable snapshot of cluster membership.
Every node maintains its own ClusterState and periodically gossips it
to peers. Merging two states produces a new state that is the union of
members with vector-clock-ordered conflict resolution, guaranteeing
convergence without coordination.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
members
|
frozenset[Member]
|
Current cluster members. |
(lambda: frozenset[Member]())()
|
unreachable
|
frozenset[NodeAddress]
|
Nodes detected as unreachable by the failure detector. |
(lambda: frozenset[NodeAddress]())()
|
version
|
VectorClock
|
Causal version of this state. |
VectorClock()
|
shard_allocations
|
dict[str, dict[int, ShardAllocation]]
|
Per-shard-type allocation map (populated by the coordinator). |
(lambda: {})()
|
allocation_epoch
|
int
|
Monotonically increasing epoch for shard allocation changes. |
0
|
seen
|
frozenset[NodeAddress]
|
Nodes that have acknowledged this version (for convergence check). |
(lambda: frozenset[NodeAddress]())()
|
Examples:
>>> node = NodeAddress("10.0.0.1", 2551)
>>> m = Member(node, MemberStatus.up, frozenset(), id="node-1")
>>> s1 = ClusterState().add_member(m)
>>> s2 = ClusterState().add_member(m)
>>> merged = s1.merge_members(s2)
>>> len(merged)
1
members = field(default_factory=(lambda: frozenset[Member]()))
class-attribute
instance-attribute
¶
unreachable = field(default_factory=(lambda: frozenset[NodeAddress]()))
class-attribute
instance-attribute
¶
version = field(default_factory=VectorClock)
class-attribute
instance-attribute
¶
shard_allocations = field(default_factory=(lambda: {}))
class-attribute
instance-attribute
¶
allocation_epoch = 0
class-attribute
instance-attribute
¶
seen = field(default_factory=(lambda: frozenset[NodeAddress]()))
class-attribute
instance-attribute
¶
registry = field(default_factory=(lambda: frozenset[ServiceEntry]()))
class-attribute
instance-attribute
¶
is_converged
property
¶
Whether all active members have seen the current state version.
Returns:
| Type | Description |
|---|---|
bool
|
|
leader
property
¶
The current cluster leader, determined by lowest sorted address.
Returns:
| Type | Description |
|---|---|
NodeAddress or None
|
Address of the leader, or |
add_member(member)
¶
Return a new state with member added or replaced.
If a member with the same address already exists it is replaced.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
member
|
Member
|
The member to add. |
required |
Returns:
| Type | Description |
|---|---|
ClusterState
|
New state containing the member. |
merge_members(other)
¶
Merge member sets from two states, keeping self's version on conflict.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
other
|
ClusterState
|
The remote state to merge with. |
required |
Returns:
| Type | Description |
|---|---|
frozenset[Member]
|
Union of members; when both states contain the same address, self's entry wins (last-writer in dict update order). |
update_status(address, status)
¶
Return a new state with the given node's status changed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
address
|
NodeAddress
|
The node to update. |
required |
status
|
MemberStatus
|
The new status. |
required |
Returns:
| Type | Description |
|---|---|
ClusterState
|
New state with the updated member. |
mark_unreachable(address)
¶
Return a new state with address added to the unreachable set.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
address
|
NodeAddress
|
The node to mark unreachable. |
required |
Returns:
| Type | Description |
|---|---|
ClusterState
|
New state with updated unreachable set. |
mark_reachable(address)
¶
Return a new state with address removed from the unreachable set.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
address
|
NodeAddress
|
The node to mark reachable again. |
required |
Returns:
| Type | Description |
|---|---|
ClusterState
|
New state with updated unreachable set. |
with_allocations(shard_type, allocations, epoch)
¶
Return a new state with updated shard allocations for shard_type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
shard_type
|
str
|
The shard type name (e.g. |
required |
allocations
|
dict[int, ShardAllocation]
|
Shard-id to allocation mapping. |
required |
epoch
|
int
|
Allocation epoch for consistency tracking. |
required |
Returns:
| Type | Description |
|---|---|
ClusterState
|
New state with the updated allocations. |
__str__()
¶
diff(previous)
¶
Compute the changes between previous and this state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
previous
|
ClusterState or None
|
The previous state to diff against. |
required |
Returns:
| Type | Description |
|---|---|
ClusterChanges
|
A summary of joined/removed members and leader election. |
__init__(members=(lambda: frozenset[Member]())(), unreachable=(lambda: frozenset[NodeAddress]())(), version=VectorClock(), shard_allocations=(lambda: {})(), allocation_epoch=0, seen=(lambda: frozenset[NodeAddress]())(), registry=(lambda: frozenset[ServiceEntry]())())
¶
casty.Member
dataclass
¶
A single node's membership record in the cluster.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
address
|
NodeAddress
|
Network address of the member. |
required |
status
|
MemberStatus
|
Current lifecycle status. |
required |
roles
|
frozenset[str]
|
Roles advertised by this member (e.g. |
required |
id
|
NodeId
|
Human-readable identifier for this node (e.g. |
required |
Examples:
casty.MemberStatus
¶
Bases: Enum
Lifecycle status of a cluster member.
Members progress through statuses in order::
joining -> up -> leaving -> down -> removed
Only the cluster leader promotes joining to up once gossip has
converged.
Examples:
joining = auto()
class-attribute
instance-attribute
¶
up = auto()
class-attribute
instance-attribute
¶
leaving = auto()
class-attribute
instance-attribute
¶
down = auto()
class-attribute
instance-attribute
¶
removed = auto()
class-attribute
instance-attribute
¶
merge_priority
property
¶
Priority key for CRDT merge on concurrent vector clocks.
Returns a (alive, value) tuple where alive statuses always win
over terminal ones. This ensures a rejoining node (joining)
beats stale down/removed gossip.
casty.NodeAddress
dataclass
¶
Network address identifying a cluster node.
Ordered lexicographically by (host, port) so that deterministic leader
election can sort members by address.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str
|
Hostname or IP address of the node. |
required |
port
|
int
|
TCP port the node listens on. |
required |
Examples:
casty.NodeId = str
¶
casty.VectorClock
dataclass
¶
Logical clock for tracking causal ordering of cluster state versions.
Each node increments its own entry on every state mutation. Two clocks can be compared for happens-before or concurrency, and merged to produce a least upper bound.
Examples:
>>> node_a = NodeAddress("a", 1)
>>> node_b = NodeAddress("b", 1)
>>> vc = VectorClock().increment(node_a).increment(node_a)
>>> vc.version_of(node_a)
2
>>> merged = vc.merge(VectorClock().increment(node_b))
>>> merged.version_of(node_a), merged.version_of(node_b)
(2, 1)
version_of(node)
¶
Return the version counter for node, defaulting to 0.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
node
|
NodeAddress
|
The node whose version to look up. |
required |
Returns:
| Type | Description |
|---|---|
int
|
The current counter for node. |
increment(node)
¶
Return a new clock with node's counter incremented by one.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
node
|
NodeAddress
|
The node whose counter to increment. |
required |
Returns:
| Type | Description |
|---|---|
VectorClock
|
A new clock with the updated counter. |
merge(other)
¶
Merge two clocks by taking the element-wise maximum.
This is the least upper bound (join) operation, guaranteeing CRDT convergence.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
other
|
VectorClock
|
The clock to merge with. |
required |
Returns:
| Type | Description |
|---|---|
VectorClock
|
A new clock where each entry is |
is_before(other)
¶
Return whether this clock strictly happens-before other.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
other
|
VectorClock
|
The clock to compare against. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
|
is_concurrent_with(other)
¶
Return whether this clock is concurrent with other.
Two clocks are concurrent when neither happens-before the other and they are not equal.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
other
|
VectorClock
|
The clock to compare against. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
|
__init__(_versions=(lambda: dict[NodeAddress, int]())())
¶
casty.ShardAllocation
dataclass
¶
Tracks which node owns a shard and where its replicas live.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
primary
|
NodeAddress
|
Node that owns the shard and handles writes. |
required |
replicas
|
tuple[NodeAddress, ...]
|
Nodes holding passive replica copies. |
()
|
Examples:
casty.ServiceEntry
dataclass
¶
A registered service in the cluster registry.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Service name (e.g. |
required |
node
|
NodeAddress
|
Node where the actor lives. |
required |
path
|
str
|
Actor path on that node. |
required |