Skip to content

Cluster Broadcast

Sharding routes messages to a single entity via its ID. But some scenarios require sending a message to all nodes — configuration updates, cache invalidation, system-wide announcements. Behaviors.broadcasted() wraps any behavior so that tell() automatically fans out to every cluster member and ask() collects all responses:

@dataclass(frozen=True)
class Announcement:
    text: str
    reply_to: ActorRef[Ack]

@dataclass(frozen=True)
class Ack:
    from_node: str

def listener() -> Behavior[Announcement]:
    node = socket.gethostname()

    async def receive(_ctx, msg: Announcement) -> Behavior[Announcement]:
        msg.reply_to.tell(Ack(from_node=node))
        return Behaviors.same()

    return Behaviors.receive(receive)

async def main() -> None:
    async with ClusteredActorSystem(
        name="demo", host="127.0.0.1", port=25520,
        node_id="node-1",
    ) as system:
        # BroadcastRef — tell/ask fan out to ALL nodes
        ref: BroadcastRef[Announcement] = system.spawn(
            Behaviors.broadcasted(listener()), "listener"
        )

        # ask() returns tuple[Ack, ...] — one per node
        acks: tuple[Ack, ...] = await system.ask(
            ref,
            lambda r: Announcement(text="Hello cluster!", reply_to=r),
            timeout=5.0,
        )
        for ack in acks:
            print(f"Ack from {ack.from_node}")

asyncio.run(main())

Under the hood, each node spawns a local copy of the actor at /_bcast-{name} and a proxy at /{name}. The proxy tracks cluster membership via gossip and fans out messages to all up members — locally or over TCP. The BroadcastRef[M] subclass of ActorRef[M] enables typesafe overloads: ask(BroadcastRef, ...) returns tuple[R, ...] instead of R.


Next: Cluster Singleton