Streaming Internals
Streaming Internals
Section titled “Streaming Internals”ProxAI treats streaming as a byte-carrier problem plus protocol-specific event observation. The upstream SSE bytes are preserved, while provider observers scan events, maintain state, and emit compact diagnostics.
Layering
Section titled “Layering”- 1pipeline/upstream_response.rsDetects successful SSE responses and dispatches to provider streaming handling.
- 2provider/<protocol>/response/streaming.rsConstructs the protocol-specific BodyObserver and returns an outbound streaming body.
- 3upstream/streaming.rsWraps reqwest bytes_stream, records metrics, invokes observer hooks, and enforces idle timeout.
translation/ does not perform provider-local stream observation. Provider streaming observation stays inside provider/; cross-protocol stream translation stays in translation/streaming.
BodyObserver lifecycle
Section titled “BodyObserver lifecycle”pub(crate) trait BodyObserver: Send + Unpin + 'static { fn on_chunk(&mut self, _chunk: &[u8]) -> BodyAction; fn on_stream_error(&mut self, error: &reqwest::Error); fn poll_pending_action(&mut self, _cx: &mut Context<'_>) -> BodyAction; fn on_stream_finished(&self, head: &UpstreamResponseHead, stats: UpstreamBodyStreamStats);}- 1Pull bytesRead the next chunk from reqwest response bytes_stream().
- 2Record carrier metricsUpdate common upstream stream byte/chunk stats.
- 3Scan SSE eventsUse SseEventScanner once per chunk inside the provider observer.
- 4Update protocol stateFeed parsed events into the provider state machine.
- 5Maybe interveneMost chunks continue unchanged; rare semantic failures inject an SSE error and close.
- 6Finish snapshotOn EOF/error/timeout/drop, emit a compact stream outcome snapshot.
Common provider observer shape
Section titled “Common provider observer shape”The three provider protocols now follow the same structure:
| File | Responsibility |
|---|---|
streaming.rs | Carrier hook implementation and lifecycle checks |
state.rs | Protocol event state and summary/outcome projection |
SseEventScanner | Chunk-to-event scanning, held by the observer |
Protocol event timelines
Section titled “Protocol event timelines”OpenAI Responses
Section titled “OpenAI Responses”- 1
response.createdStart of a Responses stream and initial response metadata.
- 2
response.output_item.addedA new output item such as message, reasoning, function_call, or MCP item.
- 3
response.output_text.delta / response.function_call_arguments.deltaIncremental text or tool-call argument bytes.
- 4
response.output_text.done / response.function_call_arguments.doneSemantic completion of a content part or tool-call arguments.
- 5
response.output_item.doneOutput item is complete.
- 6
response.completedTerminal event for a complete stream.
OpenAI Responses has extra tool-call semantics. If tool-call arguments start but never finish, ProxAI can inject a Responses-style SSE error event and close the stream.
OpenAI Chat Completions
Section titled “OpenAI Chat Completions”- 1
chat.completion.chunkDelta chunk containing role/content/tool_calls/finish_reason updates.
- 2
[DONE]Terminal sentinel. EOF without this sentinel is treated as a closed/incomplete stream.
Anthropic Messages
Section titled “Anthropic Messages”- 1
message_startStarts an Anthropic message stream.
- 2
content_block_startStarts a text, thinking, tool_use, or other content block.
- 3
content_block_deltaIncremental content for the current block.
- 4
content_block_stopCompletes a content block.
- 5
message_deltaCarries stop_reason, stop_sequence, usage, and other message-level deltas.
- 6
message_stopTerminal event for a complete Anthropic stream.
Translation stream lifecycle
Section titled “Translation stream lifecycle”Cross-protocol streaming translation uses a protocol-neutral four-phase carrier in translation/streaming.rs:
| Phase | Meaning |
|---|---|
`Waiting` | No semantic source message/chunk has initialized the translated stream yet. |
`Streaming(StreamingPhase<S>)` | Source deltas are active; pair-private state and target-representable output tracking live in StreamingPhase<S>. |
`Terminal(T)` | The source protocol has emitted its semantic terminal signal, but the carrier/source stop has not been fully consumed. |
`Stopped` | The translator has emitted its final target stream output. |
Terminal(T) is intentionally generic because source protocols terminate differently:
| Inbound source | Terminal payload | Reason |
|---|---|---|
Anthropic Messages | StreamingPhase<S> | message_delta carries terminal stop/usage semantics, but message_stop is still required. The translator keeps the full phase until message_stop consumes it. |
OpenAI Chat Completions | pair-specific T | finish_reason ends semantic deltas. Later [DONE], EOF, or usage-only chunks need only a target-specific terminal summary, such as ChatTerminalState for Anthropic output or () for Responses output. |
Protocol wrappers such as anthropic_messages::streaming and openai_chat_completions::streaming keep source-specific event ordering and error wording. Pair translators remain responsible for target representability checks, output item/block emission, and terminal flush behavior.
Stream identity ownership
Section titled “Stream identity ownership”Streaming identity (StreamIdentity) belongs to the protocol-neutral inbound lifecycle carrier, while source-protocol wrappers decide when to initialize or validate it.
| Inbound source | Where identity comes from | Why it is handled this way |
|---|---|---|
Anthropic Messages | message_start.message.id and message_start.message.model | Anthropic emits identity once at the explicit message_start semantic boundary. Later content_block_*, message_delta, and message_stop events do not repeat id/model, so the Anthropic wrapper initializes lifecycle identity at message_start and then relies on lifecycle ordering to prevent duplicate starts. |
OpenAI Chat Completions | Every chat.completion.chunk repeats id and model | Chat has no separate message_start event. The first semantic chunk initializes lifecycle identity, and later chunks are checked against it so pieces from different upstream responses are never merged into one translated message/response. |
InboundStreamLifecycle stores identity outside the phase enum because identity is stream-envelope metadata that remains stable across Streaming, Terminal, and Stopped. Pair-local stream state should keep only target-conversion state or derived identifiers, not another copy of the full source identity.
BodyAction
Section titled “BodyAction”pub(crate) enum BodyAction { Continue, InjectAndClose(Bytes),}Most observers return Continue. InjectAndClose is reserved for cases where ProxAI can produce a better client-facing stream failure than silently hanging or closing without context.
Translation pair internals
Section titled “Translation pair internals”Each translation direction’s streaming implementation (e.g. anthropic_messages -> openai_responses) is split into three submodules under a streaming/ directory:
streaming/├── mod.rs # translator: drives state + calls output, owns lifecycle and timing├── state.rs # accumulation state machine: fields + register/append/stop/snapshot└── output.rs # pure constructors: take data, return events; hold no streaming stateResponsibility boundaries
Section titled “Responsibility boundaries”- state only accumulates and queries state; it does not know about the event loop.
- output only constructs events/items; it holds no streaming state (no
&mut State). - mod drives state, calls output, and manages lifecycle.
Rule of thumb: if a function needs &mut State (advance counter, push items, read accumulated fields), it is state behavior and stays in state.rs; if it only takes data and returns an event, it is output construction and belongs in output.rs.
When types.rs is needed
Section titled “When types.rs is needed”When state and output share the same internal data structure, a streaming/types.rs is the neutral home to avoid state ↔ output circular dependencies.
Whether it is needed depends on the accumulation model:
| Pair | Accumulation model | Finalize model | Needs types.rs? |
|---|---|---|---|
ant → responses | each content_block accumulates independently | stop_block returns owned data; output consumes it | No (StreamBlock flows through output once, move semantics) |
chat → responses | whole stream accumulates text/tool items | finish_completed_stream is a state method that reads state fields multiple times | Yes (StreamTextItem/StreamToolItem are held by state and read repeatedly by output) |
Root cause: Anthropic’s accumulation unit is the content_block (explicit open/close lifecycle, moved away on stop), while Chat’s is the whole stream (finalized all at once at stream end, so state must retain data for repeated reads and terminal snapshot). The latter “state holds + output reads repeatedly” pattern naturally requires a neutral home for shared types.
Terminal state type choice
Section titled “Terminal state type choice”InboundStreamLifecycle<S, T> has two type parameters: S is the streaming-phase state type, T is the terminal-phase state type. They may be the same or different, depending on how much data the terminal events need.
| Pair | S (streaming) | T (terminal) | Why |
|---|---|---|---|
chat → ant | ChatStreamingState | ChatTerminalState (lightweight snapshot) | Terminal events are constructed on the spot by the translator; only finish_reason + refusal are needed. All blocks were closed during streaming. |
chat → responses | StreamingState | StreamingState (same type) | Terminal events (*.done × N + response.completed) need the full accumulated data (text/tool items/usage), constructed by the state method finish_completed_stream. |
Root cause is the timing and data requirements of terminal event construction:
- Anthropic target has an explicit block lifecycle (
content_block_stop); each block is closed and emitted during streaming, so byfinish_reasononly a few snapshot values remain — a lightweight terminal type suffices. - Responses target has no block-close equivalent; text deltas and tool arguments stream until
finish_reason, when*.done+response.completedare emitted all at once. Data must stay in state until the end, so terminal must be the full streaming state.