Reactive Streams over the Network¶
In this guide you'll build a sensor pipeline that streams data across a cluster and consumes it from an external client. Along the way you'll learn how stream_producer and stream_consumer work across nodes via service discovery, and how a ClusterClient can both consume and produce streams.
The Problem¶
The Reactive Streams guide showed how stream_producer and stream_consumer wire up a backpressured pipeline inside a single actor system. But in a distributed setting, the producer and consumer live on different machines. A sensor node pushes readings into a producer — how does a monitoring client on another machine iterate that stream?
The answer: the stream protocol is just actor messages. StreamElement, StreamDemand, StreamCompleted — they all flow through ActorRef, which is network-transparent. If two actors can exchange messages over TCP, they can run a stream between them. Nothing changes in the API.
The Data¶
Sensor readings and anomaly alerts, shared between all nodes and the client:
@dataclass(frozen=True)
class SensorReading:
sensor: str
value: float
timestamp: float
node: str
@dataclass(frozen=True)
class Alert:
sensor: str
value: float
threshold: float
timestamp: float
@dataclass(frozen=True)
class ConsumeAlerts:
producer: ActorRef[StreamProducerMsg[Alert]]
Producing on a Cluster Node¶
A cluster node spawns a stream_producer, registers it with a ServiceKey for discovery, and obtains a SinkRef — all in one place:
SENSOR_KEY: ServiceKey[StreamProducerMsg[SensorReading]] = ServiceKey("sensor")
producer_ref = system.spawn(
Behaviors.discoverable(stream_producer(), key=SENSOR_KEY),
f"sensor-{sensor}",
)
sink: SinkRef[SensorReading] = await system.ask(
producer_ref, lambda r: GetSink(reply_to=r), timeout=5.0
)
Behaviors.discoverable wraps the producer so the cluster receptionist knows about it. Any node or ClusterClient can now discover this producer via lookup(SENSOR_KEY). The SinkRef wraps an in-process asyncio.Queue — it never leaves this node.
The node pushes readings into the sink:
for i in range(20):
value = round(random.uniform(lo, hi), 2)
reading = SensorReading(
sensor=sensor,
value=value,
timestamp=time.time(),
node=node_name,
)
await sink.put(reading)
After all readings are pushed, sink.complete() signals the end of the stream.
Consuming from Another Cluster Node¶
A second cluster node can discover the producer via lookup and spawn a local stream_consumer wired to the remote producer's ref. The code is identical to a single-node stream — stream_consumer sends Subscribe to the remote producer, StreamElement messages travel over TCP, and StreamDemand messages travel back. SourceRef wraps a local queue — iterating it feels the same as a single-node stream.
Consuming from a ClusterClient¶
A ClusterClient connects from outside the cluster without joining it. It discovers producers via lookup and spawns a local stream_consumer for each one:
for i, instance in enumerate(producers):
producer_ref = instance.ref
consumer_name = f"sensor-consumer-{i}"
consumer_ref = client.spawn(
stream_consumer(producer_ref, timeout=10.0),
consumer_name,
)
source: SourceRef[SensorReading] = await client.ask(
consumer_ref, lambda r: GetSource(reply_to=r), timeout=5.0
)
node_host = instance.node.host if instance.node else "unknown"
node_port = instance.node.port if instance.node else 0
sensor_label = f"sensor@{node_host}:{node_port}"
task = asyncio.create_task(
consume_sensor(source, alert_sink, sensor_label)
)
tasks.append(task)
client.spawn() creates a local actor inside the client's internal actor system. That actor has a full ActorRef addressable over TCP — the cluster-side producer can send StreamElement messages to it. The pattern is identical to consuming from within the cluster.
Streaming Back: Client to Cluster¶
The client can also be the producer. It spawns a local stream_producer, obtains a SinkRef, and tells a cluster-side actor to subscribe. The alert monitor is discoverable under its own key:
ALERT_MONITOR_KEY: ServiceKey[ConsumeAlerts] = ServiceKey("alert-monitor")
# -- Spawn local alert producer and get its SinkRef --
alert_producer = client.spawn(stream_producer(), "alert-producer")
alert_sink: SinkRef[Alert] = await client.ask(
alert_producer, lambda r: GetSink(reply_to=r), timeout=5.0
)
# -- Discover alert monitor (with retries) --
for attempt in range(15):
monitor_listing = client.lookup(ALERT_MONITOR_KEY)
if monitor_listing and monitor_listing.instances:
monitor_ref = next(iter(monitor_listing.instances)).ref
monitor_ref.tell(ConsumeAlerts(producer=alert_producer))
log.info("%sAlert monitor%s connected", MAGENTA, RESET)
break
log.info(
"Alert monitor not found yet (attempt %d/15), retrying ...",
attempt + 1,
)
await asyncio.sleep(2.0)
else:
log.warning("No alert monitor found after 15 attempts")
The client discovers the alert monitor via lookup(ALERT_MONITOR_KEY) and sends ConsumeAlerts(producer=alert_producer). The cluster-side node receives the client's producer ref and spawns a stream_consumer wired to it — the same stream_consumer + SourceRef + async for pattern as every other direction:
consumer = system.spawn(
stream_consumer(producer_ref, timeout=30.0), "alert-consumer"
)
source: SourceRef[Alert] = await system.ask(
consumer, lambda r: GetSource(reply_to=r), timeout=5.0
)
count = 0
async for alert in source:
count += 1
log.warning(
"%sALERT #%d%s sensor=%s%s%s value=%.1f threshold=%.1f",
RED,
count,
RESET,
YELLOW,
alert.sensor,
RESET,
alert.value,
alert.threshold,
)
What Stays Local, What Crosses the Wire¶
SinkRef and SourceRef wrap asyncio.Queues — they never cross the wire. Each one lives on the same node as its actor:
| Object | Where it lives | Serialized? |
|---|---|---|
SinkRef |
Same node as the producer | No — wraps a local asyncio.Queue |
SourceRef |
Same node as the consumer | No — wraps a local asyncio.Queue |
ActorRef |
Anywhere — addressable by URI | Yes — casty://system@host:port/path |
StreamElement, StreamDemand, ... |
Actor messages | Yes — via the configured serializer |
This is why you always obtain SinkRef and SourceRef locally via GetSink / GetSource on the node where the actor lives. The stream protocol messages handle the rest.
Docker Compose¶
The examples/17_stream_pipeline/ directory contains the full example — 3 cluster nodes producing sensor readings and 1 external client consuming all streams, detecting anomalies, and pushing alerts back:
x-node: &node
build:
context: ../..
dockerfile: examples/17_stream_pipeline/Dockerfile.node
services:
node-1:
<<: *node
hostname: node-1
command:
[
"--port", "25520",
"--host", "auto",
"--bind-host", "0.0.0.0",
"--seed", "node-1:25520",
"--nodes", "3",
"--sensor", "temp",
]
node-2:
<<: *node
hostname: node-2
command:
[
"--port", "25520",
"--host", "auto",
"--bind-host", "0.0.0.0",
"--seed", "node-1:25520",
"--nodes", "3",
"--sensor", "humidity",
]
node-3:
<<: *node
hostname: node-3
command:
[
"--port", "25520",
"--host", "auto",
"--bind-host", "0.0.0.0",
"--seed", "node-1:25520",
"--nodes", "3",
"--sensor", "pressure",
"--alert-monitor",
]
client:
build:
context: ../..
dockerfile: examples/17_stream_pipeline/Dockerfile.client
hostname: client
command: ["--contact", "node-1:25520", "--producers", "3"]
depends_on:
- node-1
- node-2
- node-3
Run the Full Example¶
git clone https://github.com/gabfssilva/casty.git
cd casty/examples/17_stream_pipeline
docker compose up --build
What you learned:
- Streams work across nodes transparently — the same
Subscribe/StreamDemand/StreamElementprotocol flows over TCP with no extra configuration. SinkRefandSourceRefstay local — they wrapasyncio.Queues and are never serialized. OnlyActorRefmessages cross the wire.- Service discovery (
Behaviors.discoverable+lookup) is how consumers find remote producers in the cluster. ClusterClient.spawn()creates locally-addressable actors that cluster nodes can send messages to — enabling bidirectional streaming from outside the cluster.