Skip to content

Streams

casty.stream_producer(*, buffer_size=0)

Buffered, demand-gated element source.

Parameters:

Name Type Description Default
buffer_size int

Maximum number of buffered elements. 0 means unbounded (backward compatible with the original tuple-based buffer).

0
Story
required

casty.stream_consumer(producer, *, timeout=30.0, initial_demand=16)

Subscription mediator between a stream_producer and a SourceRef.

Story: waitingactive → stopped.

casty.SinkRef

Input-side handle for pushing elements into a stream_producer.

Obtained via GetSink. Backed by the producer's internal asyncio.Queueput blocks when the buffer is full, providing input-side backpressure.

Parameters:

Name Type Description Default
queue Queue[E]

The producer's internal bounded queue.

required
producer ActorRef[StreamProducerMsg[E]]

Ref to the producer actor, used to nudge it after each put.

required

__init__(queue, producer, closed)

put(element) async

Push an element, blocking if the buffer is full.

Returns silently if the producer stops while waiting.

complete() async

Signal that no more elements will be pushed.

casty.SourceRef

Async iterator over a stream with automatic demand replenishment.

Parameters:

Name Type Description Default
queue Queue[StreamElement[E] | StreamCompleted]

Internal queue fed by the consumer actor's receive handler.

required
cancel Callable[[], None]

Callback to send StreamCancel to the consumer actor.

required
timeout float

Inactivity timeout in seconds before the stream ends.

required
request_demand Callable[[int], None]

Callback to send StreamDemand to the consumer actor.

required

__init__(queue, cancel, timeout, request_demand)

__aiter__() async

casty.GetSink dataclass

reply_to instance-attribute

__init__(reply_to)

casty.GetSource dataclass

reply_to instance-attribute

__init__(reply_to)

casty.CompleteStream dataclass

__init__()

casty.StreamSubscribe dataclass

consumer instance-attribute

demand = 0 class-attribute instance-attribute

__init__(consumer, demand=0)

casty.StreamDemand dataclass

n instance-attribute

__init__(n)

casty.StreamCancel dataclass

__init__()

casty.StreamElement dataclass

element instance-attribute

__init__(element)

casty.StreamCompleted dataclass

__init__()

casty.StreamProducerMsg = Push[E] | CompleteStream | Subscribe[E] | StreamDemand | StreamCancel | GetSink[E] | InputReady

casty.StreamConsumerMsg = GetSource[E] | StreamDemand | StreamCancel | StreamElement[E] | StreamCompleted