Supervision¶
In this guide you'll build supervised workers that recover from failures automatically. Along the way you'll learn how supervision strategies decide what happens on crash, how lifecycle hooks observe the recovery process, and how supervision trees protect entire hierarchies of actors.
Messages¶
A worker processes tasks. Some tasks are "bad" and cause it to crash:
@dataclass(frozen=True)
class Process:
task: str
@dataclass(frozen=True)
class GetProcessed:
reply_to: ActorRef[tuple[str, ...]]
@dataclass(frozen=True)
class CrashOnPurpose:
pass
type WorkerMsg = Process | GetProcessed | CrashOnPurpose
Process carries the task payload. CrashOnPurpose forces a RuntimeError — we'll use it to test how different strategies respond to different exception types.
The Worker¶
The worker raises on bad input and accumulates results via behavior recursion:
def worker(processed: tuple[str, ...] = ()) -> Behavior[WorkerMsg]:
async def receive(
ctx: ActorContext[WorkerMsg], msg: WorkerMsg
) -> Behavior[WorkerMsg]:
match msg:
case Process(task) if task == "bad":
raise ValueError(f"Cannot process: {task}")
case Process(task):
print(f" [{ctx.self.address.path}] Processed: {task}")
return worker((*processed, task))
case GetProcessed(reply_to):
reply_to.tell(processed)
return Behaviors.same()
case CrashOnPurpose():
raise RuntimeError("Intentional crash!")
return Behaviors.receive(receive)
No try/except anywhere — the worker doesn't know it's supervised. Supervision is a concern of the parent, not the actor itself.
Supervision Strategies¶
A OneForOneStrategy decides what to do when a child fails. The simplest form restarts on every error:
def always_restart() -> OneForOneStrategy:
return OneForOneStrategy(
max_restarts=3,
within=60.0,
decider=lambda _exc: Directive.restart,
)
max_restarts=3 within 60.0 seconds means: after 3 crashes in a sliding window, stop trying and let the actor die. This prevents infinite restart loops.
For more control, pass a decider that inspects the exception:
def selective_strategy() -> OneForOneStrategy:
def decide(exc: Exception) -> Directive:
match exc:
case ValueError():
print(f" ValueError — restarting: {exc}")
return Directive.restart
case RuntimeError():
print(f" RuntimeError — stopping: {exc}")
return Directive.stop
case _:
return Directive.escalate
return OneForOneStrategy(max_restarts=5, within=60.0, decider=decide)
ValueError → restart (transient). RuntimeError → stop (fatal). Anything else → escalate to the parent.
Lifecycle Hooks¶
Behaviors.with_lifecycle() wraps a behavior with hooks that fire at key moments:
def resilient_worker() -> Behavior[WorkerMsg]:
async def on_start(ctx: ActorContext[WorkerMsg]) -> None:
print(f" [{ctx.self.address.path}] Started")
async def on_stop(ctx: ActorContext[WorkerMsg]) -> None:
print(f" [{ctx.self.address.path}] Stopped")
async def on_restart(ctx: ActorContext[WorkerMsg], exc: Exception) -> None:
print(f" [{ctx.self.address.path}] Restarting after: {exc}")
return Behaviors.with_lifecycle(
Behaviors.supervise(worker(), selective_strategy()),
pre_start=on_start,
post_stop=on_stop,
pre_restart=on_restart,
)
Notice the composition: worker() → Behaviors.supervise() → Behaviors.with_lifecycle(). Each layer adds a concern without modifying the inner behavior. pre_restart receives the exception that caused the failure — useful for logging or cleanup.
Supervision Trees¶
A supervisor spawns children and wraps each with a strategy. If a child crashes, only that child is affected:
@dataclass(frozen=True)
class DelegateWork:
task: str
worker_index: int
@dataclass(frozen=True)
class GetReport:
reply_to: ActorRef[str]
type SupervisorMsg = DelegateWork | GetReport
def team_supervisor(num_workers: int = 3) -> Behavior[SupervisorMsg]:
async def setup(ctx: ActorContext[SupervisorMsg]) -> Behavior[SupervisorMsg]:
workers = tuple(
ctx.spawn(
Behaviors.supervise(worker(), always_restart()),
f"worker-{i}",
)
for i in range(num_workers)
)
return supervising(workers)
return Behaviors.setup(setup)
def supervising(
workers: tuple[ActorRef[WorkerMsg], ...],
) -> Behavior[SupervisorMsg]:
async def receive(
ctx: ActorContext[SupervisorMsg], msg: SupervisorMsg
) -> Behavior[SupervisorMsg]:
match msg:
case DelegateWork(task, worker_index):
idx = worker_index % len(workers)
workers[idx].tell(Process(task))
return Behaviors.same()
case GetReport(reply_to):
reply_to.tell(f"Team has {len(workers)} workers")
return Behaviors.same()
return Behaviors.receive(receive)
team_supervisor uses Behaviors.setup() to spawn workers as children in pre_start. Each child is independently supervised with always_restart(). The supervisor delegates work by index — when worker-2 crashes on a bad task, it restarts silently and handles the next task.
Running It¶
async def main() -> None:
async with ActorSystem() as system:
# 1. Basic supervision: restart on failure
print("── Basic supervision ──")
ref: ActorRef[WorkerMsg] = system.spawn(
Behaviors.supervise(worker(), always_restart()),
"basic-worker",
)
ref.tell(Process("hello"))
ref.tell(Process("bad")) # crashes → restarts
ref.tell(Process("world")) # continues after restart
await asyncio.sleep(0.2)
# 2. Selective strategy + lifecycle hooks
print("\n── Selective strategy + lifecycle hooks ──")
ref2: ActorRef[WorkerMsg] = system.spawn(
resilient_worker(), "selective-worker"
)
ref2.tell(Process("good"))
await asyncio.sleep(0.1)
ref2.tell(Process("bad")) # ValueError → restart
await asyncio.sleep(0.1)
ref2.tell(CrashOnPurpose()) # RuntimeError → stop
await asyncio.sleep(0.2)
# 3. Supervision tree: parent manages children
print("\n── Supervision tree ──")
supervisor: ActorRef[SupervisorMsg] = system.spawn(
team_supervisor(3), "team"
)
supervisor.tell(DelegateWork("task-a", 0))
supervisor.tell(DelegateWork("task-b", 1))
supervisor.tell(DelegateWork("bad", 2)) # worker-2 crashes, restarts
supervisor.tell(DelegateWork("task-c", 2)) # worker-2 handles after restart
await asyncio.sleep(0.3)
report: str = await system.ask(
supervisor, lambda r: GetReport(reply_to=r), timeout=5.0
)
print(f" {report}")
asyncio.run(main())
Output:
── Basic supervision ──
[/basic-worker] Processed: hello
[/basic-worker] Processed: world
── Selective strategy + lifecycle hooks ──
[/selective-worker] Started
[/selective-worker] Processed: good
ValueError — restarting: Cannot process: bad
[/selective-worker] Restarting after: Cannot process: bad
[/selective-worker] Started
RuntimeError — stopping: Intentional crash!
[/selective-worker] Stopped
── Supervision tree ──
[/team/worker-0] Processed: task-a
[/team/worker-1] Processed: task-b
[/team/worker-2] Processed: task-c
Team has 3 workers
Run the Full Example¶
git clone https://github.com/gabfssilva/casty.git
cd casty
uv run python examples/guides/03_supervision.py
What you learned:
OneForOneStrategydecides per-child recovery: restart, stop, or escalate.- Custom deciders inspect the exception to choose different directives for different failure modes.
- Lifecycle hooks (
pre_start,post_stop,pre_restart) observe the actor lifecycle without modifying behavior. - Supervision trees protect hierarchies — a crashing child is restarted without affecting siblings or the parent.