Skip to content

Clustered

In this guide you'll form a two-node cluster with sharded entities distributed across nodes. Along the way you'll learn how nodes discover each other via seed nodes, how ShardEnvelope routes messages to the right node, and how to query entities from anywhere in the cluster.

Messages and Entity Factory

A simple counter entity. Each entity_id gets its own actor instance, placed on whichever node the shard coordinator assigns:

@dataclass(frozen=True)
class Increment:
    amount: int


@dataclass(frozen=True)
class GetValue:
    reply_to: ActorRef[int]


type CounterMsg = Increment | GetValue


# ── Entity factory (one instance per entity_id) ─────────────────────


def counter_entity(entity_id: str) -> Behavior[CounterMsg]:
    def active(value: int = 0) -> Behavior[CounterMsg]:
        async def receive(_ctx: Any, msg: CounterMsg) -> Behavior[CounterMsg]:
            match msg:
                case Increment(amount):
                    print(f"  [{entity_id}] {value} + {amount} = {value + amount}")
                    return active(value + amount)
                case GetValue(reply_to):
                    reply_to.tell(value)
                    return Behaviors.same()

        return Behaviors.receive(receive)

    return active()

The factory receives the entity_id and returns a Behavior. Casty calls it lazily — the actor is only created when the first message for that entity arrives.

Forming a Cluster

Two ClusteredActorSystem instances. Node 1 starts independently; node 2 joins via seed_nodes:

def make_cluster() -> tuple[ClusteredActorSystem, ClusteredActorSystem]:
    node1 = ClusteredActorSystem(
        name="my-cluster",
        host="127.0.0.1",
        port=25520,
        node_id="node-1",
    )
    node2 = ClusteredActorSystem(
        name="my-cluster",
        host="127.0.0.1",
        port=25521,
        node_id="node-2",
        seed_nodes=[("127.0.0.1", 25520)],
    )
    return node1, node2

wait_for(2) blocks until both nodes reach MemberStatus.up. In production, each node runs in its own process — here we run both in one process for simplicity.

Spawning Sharded Entities

Both nodes spawn the same shard type. The coordinator distributes shards across the cluster:

def spawn_sharded(
    node1: ClusteredActorSystem, node2: ClusteredActorSystem
) -> tuple[ActorRef[ShardEnvelope[CounterMsg]], ActorRef[ShardEnvelope[CounterMsg]]]:
    proxy1 = node1.spawn(
        Behaviors.sharded(counter_entity, num_shards=10), "counters"
    )
    proxy2 = node2.spawn(
        Behaviors.sharded(counter_entity, num_shards=10), "counters"
    )
    return proxy1, proxy2

num_shards=10 means entity IDs are hashed into 10 buckets. Each bucket is assigned to one node. You send messages through the local proxy — routing to the correct node is transparent.

Routing and Querying

Messages are wrapped in ShardEnvelope(entity_id, message). The proxy routes based on entity ID:

async def send_and_query(
    node: ClusteredActorSystem, proxy: ActorRef[ShardEnvelope[CounterMsg]]
) -> None:
    # Route messages by entity_id — shards are distributed across nodes
    print("── Sending increments ──")
    proxy.tell(ShardEnvelope("alice", Increment(10)))
    proxy.tell(ShardEnvelope("alice", Increment(5)))
    proxy.tell(ShardEnvelope("bob", Increment(100)))
    proxy.tell(ShardEnvelope("carol", Increment(42)))
    await asyncio.sleep(0.5)

    # Query from either node — routing is transparent
    print("\n── Querying balances ──")
    for name in ("alice", "bob", "carol"):
        value: int = await node.ask(
            proxy,
            lambda r, eid=name: ShardEnvelope(eid, GetValue(reply_to=r)),
            timeout=5.0,
        )
        print(f"  {name}: {value}")

Notice: all queries go through proxy1 on node 1, but entities may live on node 2. The proxy handles the cross-node routing transparently — you never need to know which node owns which shard.

Running It

async def main() -> None:
    node1, node2 = make_cluster()

    async with node1, node2:
        await node1.wait_for(2)
        await node2.wait_for(2)
        print("── Cluster formed (2 nodes) ──\n")

        proxy1, proxy2 = spawn_sharded(node1, node2)
        await asyncio.sleep(1.0)

        await send_and_query(node1, proxy1)


asyncio.run(main())

Output:

── Cluster formed (2 nodes) ──

── Sending increments ──
  [alice] 0 + 10 = 10
  [alice] 10 + 5 = 15
  [bob] 0 + 100 = 100
  [carol] 0 + 42 = 42

── Querying balances ──
  alice: 15
  bob: 100
  carol: 42

Run the Full Example

git clone https://github.com/gabfssilva/casty.git
cd casty
uv run python examples/guides/06_clustered.py

What you learned:

  • ClusteredActorSystem extends ActorSystem with cluster membership and shard routing.
  • Seed nodes bootstrap cluster formation — node 2 connects to node 1's address to join.
  • Behaviors.sharded(factory, num_shards=N) distributes entities across nodes by hashing the entity ID.
  • ShardEnvelope(entity_id, msg) routes messages to the correct node transparently.
  • wait_for(n) blocks until the cluster has enough members — useful for quorum requirements.