Skip to content

Pipe to Self

In this guide you'll build a weather station that fetches forecasts from an external API without blocking its mailbox. Along the way you'll learn how pipe_to_self turns async operations into messages, how the actor stays responsive during slow I/O, and how to handle both success and failure.

The Problem

Actors process one message at a time. If your handler awaits a slow network call directly, the mailbox blocks — no other messages get through until the I/O completes. pipe_to_self solves this by running the coroutine in the background and delivering the result as a regular message.

External API

A simulated weather service with a 500ms delay:

async def fetch_weather(city: str) -> dict[str, str | float]:
    """Simulate a slow network call to a weather API."""
    await asyncio.sleep(0.5)
    forecasts: dict[str, dict[str, str | float]] = {
        "london": {"city": "London", "temp": 12.5, "sky": "cloudy"},
        "tokyo": {"city": "Tokyo", "temp": 28.0, "sky": "sunny"},
        "paris": {"city": "Paris", "temp": 18.3, "sky": "rainy"},
    }
    if city.lower() not in forecasts:
        raise ValueError(f"Unknown city: {city}")
    return forecasts[city.lower()]

Messages

The actor needs messages for requests, results, errors, and a ping to prove responsiveness:

@dataclass(frozen=True)
class FetchWeather:
    city: str


@dataclass(frozen=True)
class WeatherResult:
    city: str
    temp: float
    sky: str


@dataclass(frozen=True)
class WeatherFailed:
    city: str
    error: str


@dataclass(frozen=True)
class Ping:
    reply_to: ActorRef[str]


@dataclass(frozen=True)
class GetForecasts:
    reply_to: ActorRef[tuple[WeatherResult, ...]]


type WeatherMsg = FetchWeather | WeatherResult | WeatherFailed | Ping | GetForecasts

Notice WeatherResult and WeatherFailed — these are the messages the actor sends to itself when the background fetch completes or fails. The caller never sees them.

The Behavior

def weather_station(
    forecasts: tuple[WeatherResult, ...] = (),
) -> Behavior[WeatherMsg]:
    async def receive(
        ctx: ActorContext[WeatherMsg], msg: WeatherMsg
    ) -> Behavior[WeatherMsg]:
        match msg:
            case FetchWeather(city):
                ctx.pipe_to_self(
                    fetch_weather(city),
                    lambda data: WeatherResult(
                        city=str(data["city"]),
                        temp=float(data["temp"]),
                        sky=str(data["sky"]),
                    ),
                    on_failure=lambda exc: WeatherFailed(
                        city=city, error=str(exc)
                    ),
                )
                return Behaviors.same()

            case WeatherResult() as result:
                print(f"  Weather in {result.city}: {result.temp}C, {result.sky}")
                return weather_station((*forecasts, result))

            case WeatherFailed(city, error):
                print(f"  Failed to fetch {city}: {error}")
                return Behaviors.same()

            case Ping(reply_to):
                reply_to.tell("pong")
                return Behaviors.same()

            case GetForecasts(reply_to):
                reply_to.tell(forecasts)
                return Behaviors.same()

    return Behaviors.receive(receive)

Walking through each arm:

  • FetchWeather — calls ctx.pipe_to_self() with three arguments: the coroutine, a mapper that converts the raw API response into a WeatherResult message, and an on_failure handler that wraps exceptions into WeatherFailed. Returns Behaviors.same() immediately — the mailbox is free.
  • WeatherResult — the piped result arrives as a normal message. The actor appends it to state via behavior recursion.
  • WeatherFailed — the error handler fired. Log it and move on.
  • Ping — proves the mailbox is responsive even while fetches are in-flight.

Running It

async def main() -> None:
    async with ActorSystem() as system:
        station: ActorRef[WeatherMsg] = system.spawn(
            weather_station(), "weather-station"
        )

        # Fire off three fetch requests — all run concurrently in the background
        print("── Requesting forecasts (non-blocking) ──")
        station.tell(FetchWeather("london"))
        station.tell(FetchWeather("tokyo"))
        station.tell(FetchWeather("narnia"))  # will fail

        # The mailbox is NOT blocked — Ping is processed immediately
        reply = await system.ask(
            station, lambda r: Ping(reply_to=r), timeout=1.0
        )
        print(f"  Ping reply: {reply} (mailbox is responsive!)")

        # Wait for all fetches to complete
        await asyncio.sleep(1.0)

        # Read the collected results
        results = await system.ask(
            station, lambda r: GetForecasts(reply_to=r), timeout=1.0
        )
        print(f"\n── Collected {len(results)} forecasts ──")
        for r in results:
            print(f"  {r.city}: {r.temp}C, {r.sky}")


asyncio.run(main())

Output:

── Requesting forecasts (non-blocking) ──
  Ping reply: pong (mailbox is responsive!)
  Weather in London: 12.5C, cloudy
  Weather in Tokyo: 28.0C, sunny
  Failed to fetch narnia: Unknown city: narnia

── Collected 2 forecasts ──
  London: 12.5C, cloudy
  Tokyo: 28.0C, sunny

Notice: the Ping reply arrives before any weather results. The three fetches are running concurrently in the background, but the mailbox keeps processing other messages.

Run the Full Example

git clone https://github.com/gabfssilva/casty.git
cd casty
uv run python examples/guides/04_pipe_to_self.py

What you learned:

  • pipe_to_self runs a coroutine in the background and delivers the result as a message — no mailbox blocking.
  • Mappers convert raw results into typed messages: lambda data: WeatherResult(...).
  • on_failure converts exceptions into messages so the actor can handle errors as part of its normal message loop.
  • Multiple pipe_to_self calls run concurrently — the actor stays responsive to other messages.