Skip to content

Job Executor

In this guide you'll build a distributed job executor that runs arbitrary Python functions across a 10-node Docker cluster. Along the way you'll learn how workers register via the receptionist, how CloudPickleSerializer sends lambdas over the wire, and how runtime pip install provisions dependencies on remote nodes.

The end result: 10 eigenvalue computations on 200x200 matrices, each on a different node, completing in the wall-clock time of a single job.

Messages

The job protocol has two request-reply pairs — one for installing dependencies, one for running functions:

JOB_WORKER_KEY: ServiceKey[Any] = ServiceKey("job-worker")


@dataclass(frozen=True)
class InstallDeps:
    packages: tuple[str, ...]
    reply_to: ActorRef[DepsInstalled | DepsFailed]


@dataclass(frozen=True)
class DepsInstalled:
    node: str


@dataclass(frozen=True)
class DepsFailed:
    node: str
    error: str


@dataclass(frozen=True)
class RunJob:
    fn: Callable[[], Any]
    reply_to: ActorRef[JobResult]


@dataclass(frozen=True)
class JobResult:
    node: str
    value: Any | None = None
    error: str | None = None

Every response carries node: str so the client can print which worker handled each job.

Worker Actor

The worker handles two message types. InstallDeps shells out to pip install via subprocess. RunJob executes the received function in a thread to avoid blocking the event loop:

type WorkerMsg = InstallDeps | RunJob | Shutdown


def job_worker(node_name: str, done: asyncio.Event) -> Behavior[WorkerMsg]:
    async def receive(ctx: Any, msg: WorkerMsg) -> Behavior[WorkerMsg]:
        match msg:
            case InstallDeps(packages=packages, reply_to=reply_to):
                log.info("[%s] installing %s", node_name, ", ".join(packages))
                proc = await asyncio.create_subprocess_exec(
                    "uv", "pip", "install", "--quiet", *packages,
                    stdout=asyncio.subprocess.PIPE,
                    stderr=asyncio.subprocess.PIPE,
                )
                _, stderr = await proc.communicate()
                if proc.returncode == 0:
                    reply_to.tell(DepsInstalled(node=node_name))
                else:
                    reply_to.tell(DepsFailed(node=node_name, error=stderr.decode()))
                return Behaviors.same()

            case RunJob(fn=fn, reply_to=reply_to):
                try:
                    result = await asyncio.to_thread(fn)
                    reply_to.tell(JobResult(node=node_name, value=result))
                except Exception:

The function fn arrives as a cloudpickle-serialized closure — it can reference any libraries installed on the node, even if they weren't present when the cluster started.

Node Entrypoint

Each node runs the same script. CLI args control port, seed node, and expected cluster size. The worker is wrapped with Behaviors.discoverable so the receptionist advertises it to the cluster:

    tuple(
        (s.split(":")[0], int(s.split(":")[1]))
        for s in args.seed.split(",")
    )
    if args.seed
    else ()
)
node_name = f"{host}:{args.port}"

done = asyncio.Event()

async with ClusteredActorSystem(
    name="job-cluster",
    host=host,
    port=args.port,
    node_id=node_name,
    seed_nodes=seed_nodes,
    bind_host=args.bind_host,
    serializer=CloudPickleSerializer(),
) as system:
    system.spawn(

wait_for(args.nodes) blocks until all 10 nodes are up before the node is ready.

Client

The client connects from outside the cluster via ClusterClient. It discovers workers through the receptionist, installs numpy on all of them, then fans out 10 jobs in parallel:

async with ClusterClient(
    contact_points=contact_points,
    system_name="job-cluster",
    serializer=CloudPickleSerializer(),
) as client:
    # 1. Discover workers
    print(f"\n{BOLD}Waiting for {args.workers} workers...{RESET}")
    workers: list[ServiceInstance[Any]] = []
    while len(workers) < args.workers:
        listing: Listing[Any] = client.lookup(JOB_WORKER_KEY)
        workers = sorted(listing.instances, key=lambda w: str(w.node))
        if len(workers) < args.workers:
            await asyncio.sleep(0.5)

Runtime dependency installation — client.ask sends InstallDeps to each worker and waits for confirmation:

# 2. Install numpy on all workers
print(f"{BOLD}Installing numpy on {len(workers)} workers...{RESET}")
install_tasks = [
    client.ask(
        w.ref,
        lambda reply_to, _w=w: InstallDeps(
            packages=("numpy",), reply_to=reply_to
        ),
        timeout=120.0,
    )
    for w in workers
]
install_results = await asyncio.gather(*install_tasks)
for result in install_results:
    match result:
        case DepsInstalled(node=node):
            print(f"  {GREEN}{RESET} {node}")
        case DepsFailed(node=node, error=error):

The job factory creates a closure that imports numpy inside the function. This is key — the import happens on the remote node after pip install, not on the client:

# 3. Submit jobs in parallel
def make_job(job_id: int):
    def compute():
        import numpy as np

        rng = np.random.default_rng(seed=job_id)
        matrix = rng.standard_normal((200, 200))
        eigenvalues = np.linalg.eigvals(matrix)
        top3 = sorted(np.abs(eigenvalues), reverse=True)[:3]
        return {
            "job_id": job_id,
            "matrix_shape": (200, 200),
            "top_eigenvalues": [round(float(v), 2) for v in top3],
        }

Fan-out: 10 jobs dispatched simultaneously via asyncio.gather, one per worker:

print(f"\n{BOLD}Submitting {len(workers)} jobs in parallel...{RESET}")
t0 = time.monotonic()

job_tasks = [
    client.ask(
        workers[i].ref,
        lambda reply_to, _i=i: RunJob(fn=make_job(_i), reply_to=reply_to),
        timeout=120.0,
    )
    for i in range(len(workers))
]
job_results: list[JobResult] = await asyncio.gather(*job_tasks)

elapsed = time.monotonic() - t0
for r in job_results:
    match r:
        case JobResult(node=node, value=value) if value is not None:
            v = value
            print(
                f"  {CYAN}job-{v['job_id']}{RESET}{node}  "
                f"eigenvalues: {v['top_eigenvalues']}"
            )
        case JobResult(node=node, error=error):
            print(f"  {RED}job{RESET}{node}  ERROR: {error}")

Docker Setup

The docker-compose.yml defines 12 services: one seed node, nine workers, and one client. All worker nodes share the same image and entrypoint — only the hostname differs:

x-node: &node
  build:
    context: ../..
    dockerfile: examples/18_job_executor/Dockerfile.node

services:
  seed:
    <<: *node
    hostname: seed
    command:
      [
        "--port", "25520",
        "--host", "auto",
        "--bind-host", "0.0.0.0",
        "--seed", "seed:25520",
        "--nodes", "10",
      ]

  worker-1:

The client depends on all workers and connects to the seed:

hostname: client
command: ["--contact", "seed:25520", "--workers", "10"]
depends_on:
  - seed
  - worker-1
  - worker-2
  - worker-3
  - worker-4
  - worker-5
  - worker-6
  - worker-7
  - worker-8
  - worker-9

Running It

git clone https://github.com/gabfssilva/casty.git
cd casty/examples/18_job_executor
docker compose up --build

Expected output:

Waiting for 10 workers...
  found 10 workers

Installing numpy on 10 workers...
  ✓ seed:25520
  ✓ worker-1:25520
  ✓ worker-2:25520
  ...

Submitting 10 jobs in parallel...
  job-0 → seed:25520       eigenvalues: [14.23, 14.18, 14.05]
  job-1 → worker-1:25520   eigenvalues: [14.21, 14.15, 14.09]
  ...

All 10 jobs completed in 0.32s

What you learned:

  • Behaviors.discoverable auto-registers actors with the receptionist for cluster-wide service discovery.
  • CloudPickleSerializer serializes lambdas and closures over the wire — standard pickle can't do this.
  • Runtime pip install via subprocess lets workers provision dependencies on demand, without baking them into the Docker image.
  • asyncio.gather + client.ask fans out work across nodes in parallel — total time approaches single-job time.
  • ClusterClient connects from outside the cluster, discovers services, and sends messages without cluster membership.