Skip to content

Cluster Client

casty.ClusterClient

External client that routes messages to a Casty cluster.

Connects to cluster contact points via TCP, subscribes to topology updates, and routes ShardEnvelope messages directly to the node owning each shard — zero hops, no cluster membership.

Parameters:

Name Type Description Default
contact_points list[tuple[str, int]]

List of (host, port) for cluster nodes to connect to.

required
system_name str

Logical name of the cluster's actor system (must match).

required
client_host str

Advertised hostname for this client (for receiving responses).

'127.0.0.1'
client_port int

Advertised port for this client (0 for auto-assignment).

0
tls Config | None

Optional TlsConfig for TLS connections to cluster nodes. Only client_context is used (outbound connections). When None (default), plain TCP is used.

None

Examples:

>>> async with ClusterClient(
...     contact_points=[("10.0.1.10", 25520)],
...     system_name="my-cluster",
... ) as client:
...     counter = client.entity_ref("counter", num_shards=100)
...     counter.tell(ShardEnvelope("user-42", Increment(1)))
...     count = await client.ask(
...         counter,
...         lambda r: ShardEnvelope("user-42", GetCount(reply_to=r)),
...     )

__init__(*, contact_points, system_name, client_host='127.0.0.1', client_port=0, advertised_host=None, advertised_port=None, address_map=None, serializer=None, tls=None)

__aenter__() async

__aexit__(*exc) async

lookup(key)

Look up service instances registered in the cluster.

Reads from the locally cached topology snapshot — no network round-trip required. Returns an empty Listing if no topology has been received yet.

Parameters:

Name Type Description Default
key ServiceKey[M]

Typed service key to search for.

required

Returns:

Type Description
Listing[M]

Current set of instances matching the key.

spawn(behavior, name)

Spawn a local actor on this client's internal actor system.

The actor is remotely addressable — cluster nodes can send messages back to it via TCP.

Parameters:

Name Type Description Default
behavior Behavior[M]

The behavior for the new actor.

required
name str

Unique name for the actor.

required

Returns:

Type Description
ActorRef[M]

A ref to the newly spawned actor.

entity_ref(shard_type, *, num_shards)

Get a ref that routes ShardEnvelope messages to the cluster.

Creates a local proxy actor for the given shard type on first call. Subsequent calls with the same shard_type return the cached proxy.

Parameters:

Name Type Description Default
shard_type str

Name of the sharded entity type (must match the cluster's name).

required
num_shards int

Number of shards (must match the cluster's configuration).

required

Returns:

Type Description
ActorRef[ShardEnvelope[Any]]

A ref that accepts ShardEnvelope messages and routes them to the correct cluster node.

Examples:

>>> counter = client.entity_ref("counter", num_shards=100)
>>> counter.tell(ShardEnvelope("entity-1", Increment(1)))

ask(ref, msg_factory, *, timeout=5.0) async

Send a message and wait for a reply.

Creates a temporary remotely-addressable ref so the cluster can respond directly to this client via TCP.

Parameters:

Name Type Description Default
ref ActorRef[M]

Target actor (typically from entity_ref).

required
msg_factory Callable[[ActorRef[R]], M]

Factory that receives a reply-to ref and returns the message.

required
timeout float

Maximum seconds to wait.

5.0

Returns:

Type Description
R

The reply from the cluster actor.

Raises:

Type Description
TimeoutError

If no reply is received within timeout.

Examples:

>>> count = await client.ask(
...     counter_ref,
...     lambda r: ShardEnvelope("user-42", GetCount(reply_to=r)),
... )

region_ref(key, factory, *, num_shards)

Return a ref that routes ShardEnvelope to a cluster region.

Delegates to entity_ref; factory is ignored because the entities already exist on the cluster nodes.

entity_ask(ref, msg_factory, *, timeout) async

Send a message to a sharded entity and wait for a reply.

distributed()

Create a Distributed facade for this client.

Returns:

Type Description
Distributed

Examples:

>>> d = client.distributed()
>>> counter = d.counter("hits")