Pipe to Self¶
In this guide you'll build a weather station that fetches forecasts from an external API without blocking its mailbox. Along the way you'll learn how pipe_to_self turns async operations into messages, how the actor stays responsive during slow I/O, and how to handle both success and failure.
The Problem¶
Actors process one message at a time. If your handler awaits a slow network call directly, the mailbox blocks — no other messages get through until the I/O completes. pipe_to_self solves this by running the coroutine in the background and delivering the result as a regular message.
External API¶
A simulated weather service with a 500ms delay:
async def fetch_weather(city: str) -> dict[str, str | float]:
"""Simulate a slow network call to a weather API."""
await asyncio.sleep(0.5)
forecasts: dict[str, dict[str, str | float]] = {
"london": {"city": "London", "temp": 12.5, "sky": "cloudy"},
"tokyo": {"city": "Tokyo", "temp": 28.0, "sky": "sunny"},
"paris": {"city": "Paris", "temp": 18.3, "sky": "rainy"},
}
if city.lower() not in forecasts:
raise ValueError(f"Unknown city: {city}")
return forecasts[city.lower()]
Messages¶
The actor needs messages for requests, results, errors, and a ping to prove responsiveness:
@dataclass(frozen=True)
class FetchWeather:
city: str
@dataclass(frozen=True)
class WeatherResult:
city: str
temp: float
sky: str
@dataclass(frozen=True)
class WeatherFailed:
city: str
error: str
@dataclass(frozen=True)
class Ping:
reply_to: ActorRef[str]
@dataclass(frozen=True)
class GetForecasts:
reply_to: ActorRef[tuple[WeatherResult, ...]]
type WeatherMsg = FetchWeather | WeatherResult | WeatherFailed | Ping | GetForecasts
Notice WeatherResult and WeatherFailed — these are the messages the actor sends to itself when the background fetch completes or fails. The caller never sees them.
The Behavior¶
def weather_station(
forecasts: tuple[WeatherResult, ...] = (),
) -> Behavior[WeatherMsg]:
async def receive(
ctx: ActorContext[WeatherMsg], msg: WeatherMsg
) -> Behavior[WeatherMsg]:
match msg:
case FetchWeather(city):
ctx.pipe_to_self(
fetch_weather(city),
lambda data: WeatherResult(
city=str(data["city"]),
temp=float(data["temp"]),
sky=str(data["sky"]),
),
on_failure=lambda exc: WeatherFailed(
city=city, error=str(exc)
),
)
return Behaviors.same()
case WeatherResult() as result:
print(f" Weather in {result.city}: {result.temp}C, {result.sky}")
return weather_station((*forecasts, result))
case WeatherFailed(city, error):
print(f" Failed to fetch {city}: {error}")
return Behaviors.same()
case Ping(reply_to):
reply_to.tell("pong")
return Behaviors.same()
case GetForecasts(reply_to):
reply_to.tell(forecasts)
return Behaviors.same()
return Behaviors.receive(receive)
Walking through each arm:
- FetchWeather — calls
ctx.pipe_to_self()with three arguments: the coroutine, a mapper that converts the raw API response into aWeatherResultmessage, and anon_failurehandler that wraps exceptions intoWeatherFailed. ReturnsBehaviors.same()immediately — the mailbox is free. - WeatherResult — the piped result arrives as a normal message. The actor appends it to state via behavior recursion.
- WeatherFailed — the error handler fired. Log it and move on.
- Ping — proves the mailbox is responsive even while fetches are in-flight.
Running It¶
async def main() -> None:
async with ActorSystem() as system:
station: ActorRef[WeatherMsg] = system.spawn(
weather_station(), "weather-station"
)
# Fire off three fetch requests — all run concurrently in the background
print("── Requesting forecasts (non-blocking) ──")
station.tell(FetchWeather("london"))
station.tell(FetchWeather("tokyo"))
station.tell(FetchWeather("narnia")) # will fail
# The mailbox is NOT blocked — Ping is processed immediately
reply = await system.ask(
station, lambda r: Ping(reply_to=r), timeout=1.0
)
print(f" Ping reply: {reply} (mailbox is responsive!)")
# Wait for all fetches to complete
await asyncio.sleep(1.0)
# Read the collected results
results = await system.ask(
station, lambda r: GetForecasts(reply_to=r), timeout=1.0
)
print(f"\n── Collected {len(results)} forecasts ──")
for r in results:
print(f" {r.city}: {r.temp}C, {r.sky}")
asyncio.run(main())
Output:
── Requesting forecasts (non-blocking) ──
Ping reply: pong (mailbox is responsive!)
Weather in London: 12.5C, cloudy
Weather in Tokyo: 28.0C, sunny
Failed to fetch narnia: Unknown city: narnia
── Collected 2 forecasts ──
London: 12.5C, cloudy
Tokyo: 28.0C, sunny
Notice: the Ping reply arrives before any weather results. The three fetches are running concurrently in the background, but the mailbox keeps processing other messages.
Run the Full Example¶
git clone https://github.com/gabfssilva/casty.git
cd casty
uv run python examples/guides/04_pipe_to_self.py
What you learned:
pipe_to_selfruns a coroutine in the background and delivers the result as a message — no mailbox blocking.- Mappers convert raw results into typed messages:
lambda data: WeatherResult(...). on_failureconverts exceptions into messages so the actor can handle errors as part of its normal message loop.- Multiple
pipe_to_selfcalls run concurrently — the actor stays responsive to other messages.