Job Executor¶
In this guide you'll build a distributed job executor that runs arbitrary Python functions across a 10-node Docker cluster. Along the way you'll learn how workers register via the receptionist, how CloudPickleSerializer sends lambdas over the wire, and how runtime pip install provisions dependencies on remote nodes.
The end result: 10 eigenvalue computations on 200x200 matrices, each on a different node, completing in the wall-clock time of a single job.
Messages¶
The job protocol has two request-reply pairs — one for installing dependencies, one for running functions:
JOB_WORKER_KEY: ServiceKey[Any] = ServiceKey("job-worker")
@dataclass(frozen=True)
class InstallDeps:
packages: tuple[str, ...]
reply_to: ActorRef[DepsInstalled | DepsFailed]
@dataclass(frozen=True)
class DepsInstalled:
node: str
@dataclass(frozen=True)
class DepsFailed:
node: str
error: str
@dataclass(frozen=True)
class RunJob:
fn: Callable[[], Any]
reply_to: ActorRef[JobResult]
@dataclass(frozen=True)
class JobResult:
node: str
value: Any | None = None
error: str | None = None
Every response carries node: str so the client can print which worker handled each job.
Worker Actor¶
The worker handles two message types. InstallDeps shells out to pip install via subprocess. RunJob executes the received function in a thread to avoid blocking the event loop:
type WorkerMsg = InstallDeps | RunJob | Shutdown
def job_worker(node_name: str, done: asyncio.Event) -> Behavior[WorkerMsg]:
async def receive(ctx: Any, msg: WorkerMsg) -> Behavior[WorkerMsg]:
match msg:
case InstallDeps(packages=packages, reply_to=reply_to):
log.info("[%s] installing %s", node_name, ", ".join(packages))
proc = await asyncio.create_subprocess_exec(
"uv", "pip", "install", "--quiet", *packages,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
_, stderr = await proc.communicate()
if proc.returncode == 0:
reply_to.tell(DepsInstalled(node=node_name))
else:
reply_to.tell(DepsFailed(node=node_name, error=stderr.decode()))
return Behaviors.same()
case RunJob(fn=fn, reply_to=reply_to):
try:
result = await asyncio.to_thread(fn)
reply_to.tell(JobResult(node=node_name, value=result))
except Exception:
The function fn arrives as a cloudpickle-serialized closure — it can reference any libraries installed on the node, even if they weren't present when the cluster started.
Node Entrypoint¶
Each node runs the same script. CLI args control port, seed node, and expected cluster size. The worker is wrapped with Behaviors.discoverable so the receptionist advertises it to the cluster:
tuple(
(s.split(":")[0], int(s.split(":")[1]))
for s in args.seed.split(",")
)
if args.seed
else ()
)
node_name = f"{host}:{args.port}"
done = asyncio.Event()
async with ClusteredActorSystem(
name="job-cluster",
host=host,
port=args.port,
node_id=node_name,
seed_nodes=seed_nodes,
bind_host=args.bind_host,
serializer=CloudPickleSerializer(),
) as system:
system.spawn(
wait_for(args.nodes) blocks until all 10 nodes are up before the node is ready.
Client¶
The client connects from outside the cluster via ClusterClient. It discovers workers through the receptionist, installs numpy on all of them, then fans out 10 jobs in parallel:
async with ClusterClient(
contact_points=contact_points,
system_name="job-cluster",
serializer=CloudPickleSerializer(),
) as client:
# 1. Discover workers
print(f"\n{BOLD}Waiting for {args.workers} workers...{RESET}")
workers: list[ServiceInstance[Any]] = []
while len(workers) < args.workers:
listing: Listing[Any] = client.lookup(JOB_WORKER_KEY)
workers = sorted(listing.instances, key=lambda w: str(w.node))
if len(workers) < args.workers:
await asyncio.sleep(0.5)
Runtime dependency installation — client.ask sends InstallDeps to each worker and waits for confirmation:
# 2. Install numpy on all workers
print(f"{BOLD}Installing numpy on {len(workers)} workers...{RESET}")
install_tasks = [
client.ask(
w.ref,
lambda reply_to, _w=w: InstallDeps(
packages=("numpy",), reply_to=reply_to
),
timeout=120.0,
)
for w in workers
]
install_results = await asyncio.gather(*install_tasks)
for result in install_results:
match result:
case DepsInstalled(node=node):
print(f" {GREEN}✓{RESET} {node}")
case DepsFailed(node=node, error=error):
The job factory creates a closure that imports numpy inside the function. This is key — the import happens on the remote node after pip install, not on the client:
# 3. Submit jobs in parallel
def make_job(job_id: int):
def compute():
import numpy as np
rng = np.random.default_rng(seed=job_id)
matrix = rng.standard_normal((200, 200))
eigenvalues = np.linalg.eigvals(matrix)
top3 = sorted(np.abs(eigenvalues), reverse=True)[:3]
return {
"job_id": job_id,
"matrix_shape": (200, 200),
"top_eigenvalues": [round(float(v), 2) for v in top3],
}
Fan-out: 10 jobs dispatched simultaneously via asyncio.gather, one per worker:
print(f"\n{BOLD}Submitting {len(workers)} jobs in parallel...{RESET}")
t0 = time.monotonic()
job_tasks = [
client.ask(
workers[i].ref,
lambda reply_to, _i=i: RunJob(fn=make_job(_i), reply_to=reply_to),
timeout=120.0,
)
for i in range(len(workers))
]
job_results: list[JobResult] = await asyncio.gather(*job_tasks)
elapsed = time.monotonic() - t0
for r in job_results:
match r:
case JobResult(node=node, value=value) if value is not None:
v = value
print(
f" {CYAN}job-{v['job_id']}{RESET} → {node} "
f"eigenvalues: {v['top_eigenvalues']}"
)
case JobResult(node=node, error=error):
print(f" {RED}job{RESET} → {node} ERROR: {error}")
Docker Setup¶
The docker-compose.yml defines 12 services: one seed node, nine workers, and one client. All worker nodes share the same image and entrypoint — only the hostname differs:
x-node: &node
build:
context: ../..
dockerfile: examples/18_job_executor/Dockerfile.node
services:
seed:
<<: *node
hostname: seed
command:
[
"--port", "25520",
"--host", "auto",
"--bind-host", "0.0.0.0",
"--seed", "seed:25520",
"--nodes", "10",
]
worker-1:
The client depends on all workers and connects to the seed:
hostname: client
command: ["--contact", "seed:25520", "--workers", "10"]
depends_on:
- seed
- worker-1
- worker-2
- worker-3
- worker-4
- worker-5
- worker-6
- worker-7
- worker-8
- worker-9
Running It¶
git clone https://github.com/gabfssilva/casty.git
cd casty/examples/18_job_executor
docker compose up --build
Expected output:
Waiting for 10 workers...
found 10 workers
Installing numpy on 10 workers...
✓ seed:25520
✓ worker-1:25520
✓ worker-2:25520
...
Submitting 10 jobs in parallel...
job-0 → seed:25520 eigenvalues: [14.23, 14.18, 14.05]
job-1 → worker-1:25520 eigenvalues: [14.21, 14.15, 14.09]
...
All 10 jobs completed in 0.32s
What you learned:
Behaviors.discoverableauto-registers actors with the receptionist for cluster-wide service discovery.CloudPickleSerializerserializes lambdas and closures over the wire — standard pickle can't do this.- Runtime
pip installvia subprocess lets workers provision dependencies on demand, without baking them into the Docker image. asyncio.gather+client.askfans out work across nodes in parallel — total time approaches single-job time.ClusterClientconnects from outside the cluster, discovers services, and sends messages without cluster membership.