Skip to content

Reactive Streams over the Network

In this guide you'll build a sensor pipeline that streams data across a cluster and consumes it from an external client. Along the way you'll learn how stream_producer and stream_consumer work across nodes via service discovery, and how a ClusterClient can both consume and produce streams.

The Problem

The Reactive Streams guide showed how stream_producer and stream_consumer wire up a backpressured pipeline inside a single actor system. But in a distributed setting, the producer and consumer live on different machines. A sensor node pushes readings into a producer — how does a monitoring client on another machine iterate that stream?

The answer: the stream protocol is just actor messages. StreamElement, StreamDemand, StreamCompleted — they all flow through ActorRef, which is network-transparent. If two actors can exchange messages over TCP, they can run a stream between them. Nothing changes in the API.

The Data

Sensor readings and anomaly alerts, shared between all nodes and the client:

@dataclass(frozen=True)
class SensorReading:
    sensor: str
    value: float
    timestamp: float
    node: str


@dataclass(frozen=True)
class Alert:
    sensor: str
    value: float
    threshold: float
    timestamp: float


@dataclass(frozen=True)
class ConsumeAlerts:
    producer: ActorRef[StreamProducerMsg[Alert]]

Producing on a Cluster Node

A cluster node spawns a stream_producer, registers it with a ServiceKey for discovery, and obtains a SinkRef — all in one place:

SENSOR_KEY: ServiceKey[StreamProducerMsg[SensorReading]] = ServiceKey("sensor")

producer_ref = system.spawn(
    Behaviors.discoverable(stream_producer(), key=SENSOR_KEY),
    f"sensor-{sensor}",
)
sink: SinkRef[SensorReading] = await system.ask(
    producer_ref, lambda r: GetSink(reply_to=r), timeout=5.0
)

Behaviors.discoverable wraps the producer so the cluster receptionist knows about it. Any node or ClusterClient can now discover this producer via lookup(SENSOR_KEY). The SinkRef wraps an in-process asyncio.Queue — it never leaves this node.

The node pushes readings into the sink:

for i in range(20):
    value = round(random.uniform(lo, hi), 2)
    reading = SensorReading(
        sensor=sensor,
        value=value,
        timestamp=time.time(),
        node=node_name,
    )
    await sink.put(reading)

After all readings are pushed, sink.complete() signals the end of the stream.

Consuming from Another Cluster Node

A second cluster node can discover the producer via lookup and spawn a local stream_consumer wired to the remote producer's ref. The code is identical to a single-node stream — stream_consumer sends Subscribe to the remote producer, StreamElement messages travel over TCP, and StreamDemand messages travel back. SourceRef wraps a local queue — iterating it feels the same as a single-node stream.

Consuming from a ClusterClient

A ClusterClient connects from outside the cluster without joining it. It discovers producers via lookup and spawns a local stream_consumer for each one:

for i, instance in enumerate(producers):
    producer_ref = instance.ref
    consumer_name = f"sensor-consumer-{i}"

    consumer_ref = client.spawn(
        stream_consumer(producer_ref, timeout=10.0),
        consumer_name,
    )
    source: SourceRef[SensorReading] = await client.ask(
        consumer_ref, lambda r: GetSource(reply_to=r), timeout=5.0
    )

    node_host = instance.node.host if instance.node else "unknown"
    node_port = instance.node.port if instance.node else 0
    sensor_label = f"sensor@{node_host}:{node_port}"

    task = asyncio.create_task(
        consume_sensor(source, alert_sink, sensor_label)
    )
    tasks.append(task)

client.spawn() creates a local actor inside the client's internal actor system. That actor has a full ActorRef addressable over TCP — the cluster-side producer can send StreamElement messages to it. The pattern is identical to consuming from within the cluster.

Streaming Back: Client to Cluster

The client can also be the producer. It spawns a local stream_producer, obtains a SinkRef, and tells a cluster-side actor to subscribe. The alert monitor is discoverable under its own key:

ALERT_MONITOR_KEY: ServiceKey[ConsumeAlerts] = ServiceKey("alert-monitor")

# -- Spawn local alert producer and get its SinkRef --
alert_producer = client.spawn(stream_producer(), "alert-producer")
alert_sink: SinkRef[Alert] = await client.ask(
    alert_producer, lambda r: GetSink(reply_to=r), timeout=5.0
)

# -- Discover alert monitor (with retries) --
for attempt in range(15):
    monitor_listing = client.lookup(ALERT_MONITOR_KEY)
    if monitor_listing and monitor_listing.instances:
        monitor_ref = next(iter(monitor_listing.instances)).ref
        monitor_ref.tell(ConsumeAlerts(producer=alert_producer))
        log.info("%sAlert monitor%s connected", MAGENTA, RESET)
        break
    log.info(
        "Alert monitor not found yet (attempt %d/15), retrying ...",
        attempt + 1,
    )
    await asyncio.sleep(2.0)
else:
    log.warning("No alert monitor found after 15 attempts")

The client discovers the alert monitor via lookup(ALERT_MONITOR_KEY) and sends ConsumeAlerts(producer=alert_producer). The cluster-side node receives the client's producer ref and spawns a stream_consumer wired to it — the same stream_consumer + SourceRef + async for pattern as every other direction:

consumer = system.spawn(
    stream_consumer(producer_ref, timeout=30.0), "alert-consumer"
)
source: SourceRef[Alert] = await system.ask(
    consumer, lambda r: GetSource(reply_to=r), timeout=5.0
)

count = 0
async for alert in source:
    count += 1
    log.warning(
        "%sALERT #%d%s  sensor=%s%s%s  value=%.1f  threshold=%.1f",
        RED,
        count,
        RESET,
        YELLOW,
        alert.sensor,
        RESET,
        alert.value,
        alert.threshold,
    )

What Stays Local, What Crosses the Wire

SinkRef and SourceRef wrap asyncio.Queues — they never cross the wire. Each one lives on the same node as its actor:

Object Where it lives Serialized?
SinkRef Same node as the producer No — wraps a local asyncio.Queue
SourceRef Same node as the consumer No — wraps a local asyncio.Queue
ActorRef Anywhere — addressable by URI Yes — casty://system@host:port/path
StreamElement, StreamDemand, ... Actor messages Yes — via the configured serializer

This is why you always obtain SinkRef and SourceRef locally via GetSink / GetSource on the node where the actor lives. The stream protocol messages handle the rest.

Docker Compose

The examples/17_stream_pipeline/ directory contains the full example — 3 cluster nodes producing sensor readings and 1 external client consuming all streams, detecting anomalies, and pushing alerts back:

x-node: &node
  build:
    context: ../..
    dockerfile: examples/17_stream_pipeline/Dockerfile.node

services:
  node-1:
    <<: *node
    hostname: node-1
    command:
      [
        "--port", "25520",
        "--host", "auto",
        "--bind-host", "0.0.0.0",
        "--seed", "node-1:25520",
        "--nodes", "3",
        "--sensor", "temp",
      ]
  node-2:
    <<: *node
    hostname: node-2
    command:
      [
        "--port", "25520",
        "--host", "auto",
        "--bind-host", "0.0.0.0",
        "--seed", "node-1:25520",
        "--nodes", "3",
        "--sensor", "humidity",
      ]
  node-3:
    <<: *node
    hostname: node-3
    command:
      [
        "--port", "25520",
        "--host", "auto",
        "--bind-host", "0.0.0.0",
        "--seed", "node-1:25520",
        "--nodes", "3",
        "--sensor", "pressure",
        "--alert-monitor",
      ]
  client:
    build:
      context: ../..
      dockerfile: examples/17_stream_pipeline/Dockerfile.client
    hostname: client
    command: ["--contact", "node-1:25520", "--producers", "3"]
    depends_on:
      - node-1
      - node-2
      - node-3

Run the Full Example

git clone https://github.com/gabfssilva/casty.git
cd casty/examples/17_stream_pipeline
docker compose up --build

What you learned:

  • Streams work across nodes transparently — the same Subscribe/StreamDemand/StreamElement protocol flows over TCP with no extra configuration.
  • SinkRef and SourceRef stay local — they wrap asyncio.Queues and are never serialized. Only ActorRef messages cross the wire.
  • Service discovery (Behaviors.discoverable + lookup) is how consumers find remote producers in the cluster.
  • ClusterClient.spawn() creates locally-addressable actors that cluster nodes can send messages to — enabling bidirectional streaming from outside the cluster.