Skip to content

Pipe to Self

Actors process one message at a time. When an actor needs to call an async function — an HTTP request, a database query, file I/O — awaiting it inside the handler blocks the mailbox. No other messages can be processed until the coroutine completes.

Request-reply solves actor-to-actor communication, but what about plain async functions that aren't actors?

ctx.pipe_to_self() dispatches a coroutine as a background asyncio.Task and sends the mapped result back to the actor's mailbox, preserving the sequential message-processing guarantee without blocking.

@dataclass(frozen=True)
class FetchUser:
    user_id: str

@dataclass(frozen=True)
class UserFound:
    name: str

@dataclass(frozen=True)
class UserFetchFailed:
    error: str

@dataclass(frozen=True)
class GetResult:
    reply_to: ActorRef[str]

type FetcherMsg = FetchUser | UserFound | UserFetchFailed | GetResult


async def fetch_user_from_api(user_id: str) -> str:
    """Simulate an async API call."""
    await asyncio.sleep(0.1)
    return f"User-{user_id}"


def fetcher(result: str = "") -> Behavior[FetcherMsg]:
    async def receive(ctx: ActorContext[FetcherMsg], msg: FetcherMsg) -> Behavior[FetcherMsg]:
        match msg:
            case FetchUser(user_id=uid):
                ctx.pipe_to_self(
                    fetch_user_from_api(uid),
                    lambda name: UserFound(name=name),
                    on_failure=lambda exc: UserFetchFailed(error=str(exc)),
                )
                return Behaviors.same()
            case UserFound(name=name):
                return fetcher(result=name)
            case UserFetchFailed(error=error):
                return fetcher(result=f"error: {error}")
            case GetResult(reply_to=reply_to):
                reply_to.tell(result)
                return Behaviors.same()

    return Behaviors.receive(receive)


async def main() -> None:
    async with ActorSystem("pipe-demo") as system:
        ref = system.spawn(fetcher(), "fetcher")

        ref.tell(FetchUser(user_id="42"))
        await asyncio.sleep(0.2)

        result = await system.ask(ref, lambda r: GetResult(reply_to=r), timeout=2.0)
        print(f"Result: {result}")  # Result: User-42


asyncio.run(main())

The actor receives FetchUser, kicks off the async operation in the background, and immediately returns Behaviors.same() — the mailbox stays open. When the coroutine completes, the mapper wraps the result into a UserFound message that arrives through the normal mailbox, processed in order with everything else.

Error handling

The optional on_failure parameter maps exceptions to messages:

ctx.pipe_to_self(
    fetch_user_from_api(user_id),
    lambda name: UserFound(name=name),
    on_failure=lambda exc: UserFetchFailed(error=str(exc)),
)

If on_failure is omitted and the coroutine raises, a warning is logged and the exception is discarded. The actor continues processing other messages normally.

Signature

def pipe_to_self[T](
    self,
    coro: Awaitable[T],
    mapper: Callable[[T], M],
    on_failure: Callable[[Exception], M] | None = None,
) -> None
Parameter Type Description
coro Awaitable[T] The async operation to run in background
mapper Callable[[T], M] Maps the successful result to a message for this actor
on_failure Callable[[Exception], M] \| None Maps a failure to a message. If None, failures are logged and discarded

Warning

The coroutine runs outside the actor's sequential message processing. Do not capture mutable state from the closure — by the time the coroutine completes, the actor may have moved to a different behavior with different state. The result should arrive as a message and be handled like any other.


Next: Actor Hierarchies