Skip to content

Distributed

casty.Distributed

Facade for creating distributed data structures.

Spawns sharded actor regions lazily, per structure name.

Parameters:

Name Type Description Default
gateway EntityGateway

Backend that provides access to sharded entities. Both ClusteredActorSystem and ClusterClient satisfy this.

required
journal EventJournal | None

If provided, structures use event sourcing for durability.

None

Examples:

>>> d = Distributed(system)
>>> counter = d.counter("hits", shards=50)
>>> users = d.map[str, User]("users", shards=10)
>>> tags = d.set[str]("tags")
>>> jobs = d.queue[Task]("jobs")

map property

Create a distributed map via d.map[K, V]("name", shards=10).

Returns:

Type Description
MapAccessor

A subscriptable accessor parameterized by key and value types.

Examples:

>>> users = d.map[str, dict]("users")
>>> await users.put("alice", {"age": 30})

set property

Create a distributed set via d.set[V]("name", shards=10).

Returns:

Type Description
SetAccessor

A subscriptable accessor parameterized by the element type.

Examples:

>>> tags = d.set[str]("tags")
>>> await tags.add("python")
True

queue property

Create a distributed queue via d.queue[V]("name", shards=10).

Returns:

Type Description
QueueAccessor

A subscriptable accessor parameterized by the element type.

Examples:

>>> jobs = d.queue[str]("jobs")
>>> await jobs.enqueue("task-1")

__init__(gateway, *, journal=None)

counter(name, *, shards=100, timeout=5.0)

Create a distributed counter.

Parameters:

Name Type Description Default
name str

Logical counter name (used as entity ID).

required
shards int

Number of shards for the backing region.

100
timeout float

Default timeout for counter operations.

5.0

Returns:

Type Description
Counter

Examples:

>>> counter = d.counter("page-views")
>>> await counter.increment()
1

lock(name, *, shards=100, timeout=5.0)

Create a distributed lock.

Parameters:

Name Type Description Default
name str

Logical lock name (used as entity ID).

required
shards int

Number of shards for the backing region.

100
timeout float

Default timeout for lock operations.

5.0

Returns:

Type Description
Lock

Examples:

>>> lock = d.lock("my-resource")
>>> await lock.acquire()

semaphore(name, permits, *, shards=100, timeout=5.0)

Create a distributed semaphore with permits concurrent holders.

Parameters:

Name Type Description Default
name str

Logical semaphore name (used as entity ID).

required
permits int

Maximum number of concurrent holders.

required
shards int

Number of shards for the backing region.

100
timeout float

Default timeout for semaphore operations.

5.0

Returns:

Type Description
Semaphore

Examples:

>>> sem = d.semaphore("pool", permits=5)
>>> await sem.acquire()

barrier(name, *, node_id=None, shards=10, timeout=60.0)

Create a distributed barrier.

Parameters:

Name Type Description Default
name str

Logical barrier name (used as entity ID).

required
node_id str | None

Identifier for this node (defaults to host:port). Required when using ClusterClient as the gateway.

None
shards int

Number of shards for the backing region.

10
timeout float

Default timeout for the barrier wait.

60.0

Returns:

Type Description
Barrier

Examples:

>>> barrier = d.barrier("init-sync")
>>> await barrier.arrive(expected=3)

casty.EntityGateway

Bases: Protocol

Protocol for backends that provide access to sharded entities.

Both ClusteredActorSystem and ClusterClient satisfy this protocol structurally, allowing Distributed to work with either.

region_ref(key, factory, *, num_shards)

Return a ref that routes ShardEnvelope to the named region.

entity_ask(ref, msg_factory, *, timeout) async

Send a message and wait for a reply.

casty.Barrier

Client for a distributed barrier backed by a sharded actor.

Multiple nodes call arrive(expected) and all block until expected nodes have arrived.

Parameters:

Name Type Description Default
system ActorSystem

The actor system for sending messages.

required
region_ref ActorRef[ShardEnvelope[BarrierMsg]]

Reference to the shard proxy / region.

required
name str

Barrier name (used as entity ID).

required
node_id str

Identifier for this node (typically host:port).

required
timeout float

Default timeout for the barrier wait.

60.0

Examples:

>>> barrier = d.barrier("init-sync")
>>> await barrier.arrive(expected=3)

__init__(*, gateway, region_ref, name, node_id, timeout=60.0)

destroy() async

Destroy this barrier, stopping the backing entity actor.

Returns:

Type Description
bool

True if destroyed.

arrive(expected) async

Block until expected nodes have reached this barrier.

Parameters:

Name Type Description Default
expected int

Number of nodes that must arrive before all are released.

required

Examples:

>>> await barrier.arrive(expected=3)

casty.Counter

Client for a distributed counter backed by a sharded actor.

Provides increment, decrement, and get operations that route through the shard proxy to the correct entity.

Parameters:

Name Type Description Default
system ActorSystem

The actor system for sending messages.

required
region_ref ActorRef[ShardEnvelope[CounterMsg]]

Reference to the shard proxy / region.

required
name str

Counter name (used as entity ID).

required
timeout float

Default timeout for each operation.

5.0

Examples:

>>> counter = d.counter("hits")
>>> await counter.increment(5)
5
>>> await counter.get()
5

__init__(*, gateway, region_ref, name, timeout=5.0)

increment(amount=1) async

Increment the counter and return the new value.

Parameters:

Name Type Description Default
amount int

Value to add (default 1).

1

Returns:

Type Description
int

The counter value after incrementing.

Examples:

>>> await counter.increment()
1

decrement(amount=1) async

Decrement the counter and return the new value.

Parameters:

Name Type Description Default
amount int

Value to subtract (default 1).

1

Returns:

Type Description
int

The counter value after decrementing.

Examples:

>>> await counter.decrement()
-1

destroy() async

Destroy this counter, stopping the backing entity actor.

Returns:

Type Description
bool

True if destroyed.

get() async

Get the current counter value.

Returns:

Type Description
int

Examples:

>>> await counter.get()
0

casty.Dict

Client for a distributed key-value map backed by sharded actors.

Each key maps to a separate entity actor (entity-per-key pattern).

Parameters:

Name Type Description Default
system ActorSystem

The actor system for sending messages.

required
region_ref ActorRef[ShardEnvelope[MapEntryMsg]]

Reference to the shard proxy / region.

required
name str

Map name prefix (combined with key for entity ID).

required
timeout float

Default timeout for each operation.

5.0

Examples:

>>> users = d.map[str, dict]("users")
>>> await users.put("alice", {"age": 30})
>>> await users.get("alice")
{'age': 30}

__init__(*, gateway, region_ref, name, timeout=5.0)

put(key, value) async

Store a value for the key.

Returns:

Type Description
V | None

The previous value, or None if the key was new.

Examples:

>>> await users.put("alice", {"age": 30})

get(key, *, local=False) async

Get the value for the key, or None if not set.

Parameters:

Name Type Description Default
key K

The key to look up.

required
local bool

Reserved for future local-read optimization (currently unused).

False

Returns:

Type Description
V | None

Examples:

>>> await users.get("alice")
{'age': 30}

delete(key) async

Delete the key.

Returns:

Type Description
bool

True if the key existed, False otherwise.

Examples:

>>> await users.delete("alice")
True

contains(key) async

Check whether the key exists.

Returns:

Type Description
bool

Examples:

>>> await users.contains("alice")
True

casty.Lock

Client for a distributed mutual-exclusion lock.

Each Lock instance has a unique owner ID. acquire blocks until the lock is granted; try_acquire returns immediately.

Parameters:

Name Type Description Default
system ActorSystem

The actor system for sending messages.

required
region_ref ActorRef[ShardEnvelope[LockMsg]]

Reference to the shard proxy / region.

required
name str

Lock name (used as entity ID).

required
timeout float

Default timeout for each operation.

5.0

Examples:

>>> lock = d.lock("my-resource")
>>> await lock.acquire()
>>> await lock.release()
True

__init__(*, gateway, region_ref, name, timeout=5.0)

destroy() async

Destroy this lock, stopping the backing entity actor.

Returns:

Type Description
bool

True if destroyed.

acquire() async

Block until the lock is acquired.

Examples:

>>> await lock.acquire()

try_acquire() async

Try to acquire the lock without blocking.

Returns:

Type Description
bool

True if the lock was acquired, False if already held.

Examples:

>>> await lock.try_acquire()
True

release() async

Release the lock.

Returns:

Type Description
bool

True if released, False if this instance was not the holder.

Examples:

>>> await lock.release()
True

casty.Queue

Client for a distributed FIFO queue backed by a sharded actor.

Parameters:

Name Type Description Default
system ActorSystem

The actor system for sending messages.

required
region_ref ActorRef[ShardEnvelope[QueueMsg]]

Reference to the shard proxy / region.

required
name str

Queue name (used as entity ID).

required
timeout float

Default timeout for each operation.

5.0

Examples:

>>> jobs = d.queue[str]("jobs")
>>> await jobs.enqueue("task-1")
>>> await jobs.dequeue()
'task-1'

__init__(*, gateway, region_ref, name, timeout=5.0)

destroy() async

Destroy this queue, stopping the backing entity actor.

Returns:

Type Description
bool

True if destroyed.

enqueue(value) async

Append a value to the back of the queue.

Examples:

>>> await jobs.enqueue("task-1")

dequeue() async

Remove and return the front value, or None if empty.

Returns:

Type Description
V | None

Examples:

>>> await jobs.dequeue()
'task-1'

peek() async

Return the front value without removing, or None if empty.

Returns:

Type Description
V | None

Examples:

>>> await jobs.peek()
'task-1'

size() async

Return the number of items in the queue.

Returns:

Type Description
int

Examples:

>>> await jobs.size()
0

casty.Semaphore

Client for a distributed counting semaphore.

Each Semaphore instance has a unique owner ID. acquire blocks until a permit is available; try_acquire returns immediately.

Parameters:

Name Type Description Default
system ActorSystem

The actor system for sending messages.

required
region_ref ActorRef[ShardEnvelope[SemaphoreMsg]]

Reference to the shard proxy / region.

required
name str

Semaphore name (used as entity ID).

required
timeout float

Default timeout for each operation.

5.0

Examples:

>>> sem = d.semaphore("pool", permits=3)
>>> await sem.acquire()
>>> await sem.release()
True

__init__(*, gateway, region_ref, name, timeout=5.0)

destroy() async

Destroy this semaphore, stopping the backing entity actor.

Returns:

Type Description
bool

True if destroyed.

acquire() async

Block until a permit is acquired.

Examples:

>>> await sem.acquire()

try_acquire() async

Try to acquire a permit without blocking.

Returns:

Type Description
bool

True if a permit was acquired, False if none available.

Examples:

>>> await sem.try_acquire()
True

release() async

Release a permit.

Returns:

Type Description
bool

True if released, False if this instance was not a holder.

Examples:

>>> await sem.release()
True

casty.Set

Client for a distributed set backed by a sharded actor.

Parameters:

Name Type Description Default
system ActorSystem

The actor system for sending messages.

required
region_ref ActorRef[ShardEnvelope[SetMsg]]

Reference to the shard proxy / region.

required
name str

Set name (used as entity ID).

required
timeout float

Default timeout for each operation.

5.0

Examples:

>>> tags = d.set[str]("tags")
>>> await tags.add("python")
True
>>> await tags.contains("python")
True

__init__(*, gateway, region_ref, name, timeout=5.0)

destroy() async

Destroy this set, stopping the backing entity actor.

Returns:

Type Description
bool

True if destroyed.

add(value) async

Add a value to the set.

Returns:

Type Description
bool

True if added, False if already present.

Examples:

>>> await tags.add("python")
True

remove(value) async

Remove a value from the set.

Returns:

Type Description
bool

True if removed, False if not present.

Examples:

>>> await tags.remove("python")
True

contains(value) async

Check if a value is in the set.

Returns:

Type Description
bool

Examples:

>>> await tags.contains("python")
True

size() async

Get the number of elements in the set.

Returns:

Type Description
int

Examples:

>>> await tags.size()
1