流式内部实现
流式内部实现
Section titled “流式内部实现”本文说明 ProxAI 中三个 provider protocol 的 streaming response 处理流程:
协议处理范围
Section titled “协议处理范围”OpenAI Responses — src/provider/openai/responses:output item 生命周期 + sequence_number + tool argument stall 诊断
OpenAI Chat Completions — src/provider/openai/chat_completions:chunk delta + finish_reason + usage
Anthropic Messages — src/provider/anthropic_messages:content_block_start/delta/stop + message_delta 终态
重点:provider response 层如何观察上游 SSE 字节流、维护协议状态、判断流结束/异常,并生成日志与诊断信息。
- 1pipeline/upstream_response.rs判断成功响应是否为 SSE,并分发到 provider streaming 处理。
- 2provider/<protocol>/response/streaming.rs构造协议专属 BodyObserver,返回 outbound streaming body。
- 3upstream/streaming.rs包装 reqwest bytes_stream,记录指标,调用 observer hooks,并执行 idle timeout。
典型事件时间线
Section titled “典型事件时间线”OpenAI Responses 事件
Section titled “OpenAI Responses 事件”- 1
response.createdResponses stream 起点和初始 response metadata。
- 2
response.output_item.added新增 message、reasoning、function_call 或 MCP output item。
- 3
response.output_text.delta / response.function_call_arguments.delta文本或工具调用参数增量。
- 4
response.output_text.done / response.function_call_arguments.donecontent part 或工具调用 arguments 语义完成。
- 5
response.output_item.doneoutput item 完成。
- 6
response.completed完整流的 terminal event。
OpenAI Chat Completions 事件
Section titled “OpenAI Chat Completions 事件”- 1
chat.completion.chunk包含 role/content/tool_calls/finish_reason 的增量 chunk。
- 2
[DONE]终止 sentinel;没有它的 EOF 会被视为 closed/incomplete。
Anthropic Messages 事件
Section titled “Anthropic Messages 事件”- 1
message_startAnthropic message stream 开始。
- 2
content_block_starttext、thinking、tool_use 等 content block 开始。
- 3
content_block_delta当前 block 的增量内容。
- 4
content_block_stopcontent block 完成。
- 5
message_delta携带 stop_reason、stop_sequence、usage 等 message-level delta。
- 6
message_stop完整 Anthropic stream 的 terminal event。
streaming response 的处理大致分为三层:
pipeline/upstream_response.rs 判断上游 2xx response 是否为 SSE -> provider::handle_streaming_success_response(...)
provider/<protocol>/response/streaming.rs 构造协议专属 BodyObserver 调用 upstream::prepare_response_stream(...) 返回保留原始响应语义的 outbound streaming body
upstream/streaming.rs 包装 reqwest bytes_stream 记录通用 stream 指标 调用 BodyObserver 生命周期 hook其中 translation/ 不参与 provider response 观察和 provider-local normalization。provider streaming 处理仍然位于 provider/ 边界内。
通用 streaming carrier
Section titled “通用 streaming carrier”核心通用入口在:
src/upstream/streaming.rs主要类型:
pub(crate) trait BodyObserver: Send + Unpin + 'static { fn on_chunk(&mut self, _chunk: &[u8]) -> BodyAction; fn on_stream_error(&mut self, error: &reqwest::Error); fn poll_pending_action(&mut self, _cx: &mut Context<'_>) -> BodyAction; fn on_stream_finished(&self, head: &UpstreamResponseHead, stats: UpstreamBodyStreamStats);}MonitoredUpstreamBodyStream 负责通用 stream carrier 行为:
- 从
reqwest::Response::bytes_stream()拉取 chunk。 - 记录通用 upstream stream 指标。
- 观察通用 upstream chunk 日志/捕获点。
- 调用协议专属
BodyObserver::on_chunk(...)。 - 处理 read idle timeout。
- 在 EOF、错误、超时、注入错误 chunk 或 drop 时调用
on_stream_finished(...)。
BodyAction 表达协议 observer 是否需要干预输出:
pub(crate) enum BodyAction { Continue, InjectAndClose(Bytes),}大多数协议返回 Continue,只有 OpenAI Responses 的工具参数超时/异常会注入一条 SSE error 并关闭流。
三协议共同结构
Section titled “三协议共同结构”三个协议的 streaming observer 现在基本一致:
streaming.rs - 持有协议 State - 持有 SseEventScanner - on_chunk: 1. scan chunk -> Vec<SseEvent> 2. state.observe_events(&events) 3. 做协议特有生命周期检查 - on_stream_error: 记录 UpstreamStreamError - on_stream_finished: 生成 snapshot 并 emit outcome
state.rs - 持有协议状态 - observe_events(&[SseEvent]) 将事件落入状态 - 生成 summary / error / terminal 状态也就是说:
streaming.rs负责 stream 生命周期和 carrier hook。state.rs负责协议事件到状态的归纳。SseEventScanner只在 observer 中持有,chunk 只扫描一次。- 不再有额外的
tracker.rswrapper。
Translation stream 生命周期
Section titled “Translation stream 生命周期”跨协议 streaming translation 在 translation/streaming.rs 中使用协议无关的四阶段 carrier:
| 阶段 | 含义 |
|---|---|
`Waiting` | 还没有语义 source message/chunk 初始化 translated stream。 |
`Streaming(StreamingPhase<S>)` | source delta 正在进行;pair-private state 和目标协议可表达 output 的 emitted tracker 都在 StreamingPhase<S> 中。 |
`Terminal(T)` | source protocol 已经发出语义终止信号,但 carrier/source stop 还没有完全消费。 |
`Stopped` | translator 已经发出最终的目标协议 stream 输出。 |
Terminal(T) 有意保持泛型,因为不同 source protocol 的终止方式不同:
| Inbound source | Terminal payload | 原因 |
|---|---|---|
Anthropic Messages | StreamingPhase<S> | message_delta 携带 stop/usage 终态语义,但后面仍然必须出现 message_stop。translator 需要保留完整 phase,直到 message_stop 消费它。 |
OpenAI Chat Completions | pair-specific T | finish_reason 已经结束语义 delta。之后的 [DONE]、EOF 或 usage-only chunk 只需要目标协议专属的 terminal summary,例如输出 Anthropic 时的 ChatTerminalState,或输出 Responses 时的 ()。 |
anthropic_messages::streaming 和 openai_chat_completions::streaming 这类协议 wrapper 负责保留 source-specific event ordering 和错误文案。pair translator 仍然负责目标协议是否可表达、output item/block emission,以及 terminal flush 行为。
Stream identity 归属
Section titled “Stream identity 归属”Streaming identity(StreamIdentity)属于协议无关的 inbound lifecycle carrier;source-protocol wrapper 负责决定何时初始化或校验它。
| Inbound source | identity 来源 | 处理原因 |
|---|---|---|
Anthropic Messages | message_start.message.id 和 message_start.message.model | Anthropic 在显式的 message_start 语义边界一次性给出 identity。后续 content_block_*、message_delta、message_stop 不再重复 id/model,所以 Anthropic wrapper 在 message_start 时初始化 lifecycle identity,并依赖 lifecycle ordering 防止重复 start。 |
OpenAI Chat Completions | 每个 chat.completion.chunk 都重复携带 id 和 model | Chat 没有单独的 message_start 事件。第一条语义 chunk 初始化 lifecycle identity;后续 chunk 必须与该 identity 校验一致,避免把不同 upstream response 的片段合并到同一个 translated message/response。 |
InboundStreamLifecycle 把 identity 放在 phase enum 外层,因为 identity 是整条 stream 的 envelope metadata,会稳定横跨 Streaming、Terminal 和 Stopped。pair-local stream state 应只保存目标转换状态或派生 id,不再保存完整 source identity 的副本。
OpenAI Responses observer
Section titled “OpenAI Responses observer”主要文件:
src/provider/openai/responses/response/streaming.rssrc/provider/openai/responses/response/state.rsOpenAI Responses on_chunk 流程
Section titled “OpenAI Responses on_chunk 流程”observer 结构:
struct OpenaiResponsesUpstreamBodyObserver { state: ResponsesUpstreamState, recent_tail: Vec<u8>, saw_terminal_event: bool, stream_error: Option<UpstreamStreamError>, tool_arguments: ToolArgumentStreamState, timeout: Option<Duration>, sse_scanner: SseEventScanner, obs: ObserveContext,}OpenAI Responses chunk 处理步骤
Section titled “OpenAI Responses chunk 处理步骤”chunk -> 维护 recent_tail,最多保留 16 KiB -> SseEventScanner::scan(chunk) -> ResponsesUpstreamState::observe_events(&events) -> 检查 terminal event -> 检查 tool call arguments 是否异常/超时相关ResponsesUpstreamState::observe_events(...) 位于 state_events.rs,负责:
- 解析 OpenAI Responses stream event。
- 记录最新
sequence_number。 - 记录
response.created/response.completed等 snapshot。 - 记录 output item / function call / MCP 等增量观察状态。
- 识别某些 provider 返回的 nested generic error event。
tool argument stall 处理
Section titled “tool argument stall 处理”OpenAI Responses 有额外语义:工具调用参数可能开始后长时间没有完成。ToolArgumentStreamState 负责追踪这类状态。
如果检测到异常或 timeout:
- observer 记录
UpstreamStreamError::Stream。 - 构造一条 OpenAI Responses 风格 SSE error event。
- 返回:
BodyAction::InjectAndClose(error_sse_chunk(...))上游 stream carrier 会把该 error chunk 发给客户端并关闭流。
unfinished tool diagnostics
Section titled “unfinished tool diagnostics”OpenAI Responses observer 维护:
recent_tail: Vec<u8>最多保留最近 16 KiB stream bytes。stream snapshot 中包含该 tail:
ResponsesUpstreamStreamSnapshot { head, metrics, state, recent_tail, metadata,}当 outcome 是 UnfinishedTool 时,diagnostics 会用 snapshot.recent_tail 生成本地诊断 JSON,用于分析最后的 SSE 尾部是否缺失 terminal event 或 arguments done event。
OpenAI Chat Completions observer
Section titled “OpenAI Chat Completions observer”主要文件:
src/provider/openai/chat_completions/response/streaming.rssrc/provider/openai/chat_completions/response/state.rsChat Completions on_chunk 流程
Section titled “Chat Completions on_chunk 流程”observer 结构:
struct ChatUpstreamBodyObserver { state: ChatUpstreamResponseState, sse_scanner: SseEventScanner, stream_error: Option<UpstreamStreamError>, obs: ObserveContext,}Chat Completions chunk 处理步骤
Section titled “Chat Completions chunk 处理步骤”chunk -> SseEventScanner::scan(chunk) -> ChatUpstreamResponseState::observe_events(&events)ChatUpstreamResponseState::observe_events(...) 负责:
- 识别
[DONE]sentinel。 - 解析
CreateChatCompletionStreamResponse。 - 应用增量 observed updates。
- 记录 partial / terminal stream chunk projection。
Chat Completions 的 EOF 完整性判断很简单:
state.stream_done只有看到 [DONE] sentinel 才认为 stream 完整完成。否则 EOF 会被记录为 Closed。
Anthropic Messages observer
Section titled “Anthropic Messages observer”主要文件:
src/provider/anthropic_messages/response/streaming.rssrc/provider/anthropic_messages/response/state.rsAnthropic Messages on_chunk 流程
Section titled “Anthropic Messages on_chunk 流程”observer 结构:
struct AnthropicSseObserver { state: AnthropicResponseState, stream_error: Option<UpstreamStreamError>, sse_scanner: SseEventScanner, obs: ObserveContext,}Anthropic Messages chunk 处理步骤
Section titled “Anthropic Messages chunk 处理步骤”chunk -> SseEventScanner::scan(chunk) -> AnthropicResponseState::observe_events(&events)AnthropicResponseState::observe_events(...) 负责:
- 解析 SSE data JSON。
- 对 Anthropic-compatible provider 的 stream event payload 做 provider-local normalization。
- 解析为
MessageStreamEvent。 - 记录 message id / model / token usage / stop reason / stream_done / summary。
Anthropic stream 完整性判断:
state.stream_done()也就是是否看到 message_stop。
compatibility normalization
Section titled “compatibility normalization”Anthropic-compatible provider 可能返回非严格 Anthropic Messages 形状。相关 normalization 位于:
src/provider/anthropic_messages/response/normalize/streaming response 中,如果 provider compatibility 是 AnthropicCompatible,outbound SSE body 会经过:
normalize::normalize_sse_stream(body_stream)这属于 provider-local response normalization,不进入 translation/。
outcome 语义
Section titled “outcome 语义”provider stream outcome 统一通过:
ProviderStreamOutcomeObserved { snapshot, outcome,}常见 outcome:
ProviderStreamOutcome::CompletedProviderStreamOutcome::ClosedProviderStreamOutcome::Error(...)ProviderStreamOutcome::UnfinishedTool(...)三协议判断方式:
| 协议 | Completed 条件 | 特殊错误 |
|---|---|---|
| OpenAI Responses | 看到 Responses terminal event | tool arguments stall / unfinished tool |
| Chat Completions | 看到 [DONE] | 无协议特有注入 |
| Anthropic Messages | 看到 message_stop | 无协议特有注入 |
Translation pair 内部结构
Section titled “Translation pair 内部结构”每个翻译方向(例如 anthropic_messages -> openai_responses)的 streaming 实现统一拆为 streaming/ 目录下的三个子模块:
streaming/├── mod.rs # translator:驱动 state + 调 output,唯一拥有 lifecycle 和时序知识├── state.rs # 累积状态机:字段 + register/append/stop/snapshot 方法└── output.rs # 纯构造器:接收数据返回事件,不持有 streaming 状态- state 只做累积和状态查询,不关心事件循环。
- output 只做事件/Item 构造,不持有 streaming 状态(不接
&mut State)。 - mod 驱动 state、调 output、管理 lifecycle。
判据:如果一个函数需要 &mut State(advance counter、push items、读累积字段),它是 state 行为,留在 state.rs;如果只接收数据返回事件,它是 output 构造,放 output.rs。
何时需要 types.rs
Section titled “何时需要 types.rs”当 state 和 output 共享同一个内部数据结构时,需要 streaming/types.rs 作为中立归属,避免 state ↔ output 互相依赖。
是否需要取决于累积模型:
| Pair | 累积模型 | finalize 模型 | 是否需要 types.rs |
|---|---|---|---|
ant → responses | 每个 content_block 独立累积 | stop_block 返回 owned 数据,output 消耗它 | 否(StreamBlock 单向流过 output,move 语义) |
chat → responses | 整条流累积 text/tool items | finish_completed_stream 是 state 方法,多次读 state 字段 | 是(StreamTextItem/StreamToolItem 被 state 持有 + output 多次读) |
根本原因:ant 的累积单位是 content_block(明确的开/关生命周期,stop 后 move 走),chat 的累积单位是整条流(流结束才统一 finalize,state 需要一直持有数据供多次读取和 terminal snapshot)。后者的“state 持有 + output 反复读”模式,自然让共享类型需要中立归属。
Terminal state 类型选择
Section titled “Terminal state 类型选择”InboundStreamLifecycle<S, T> 有两个类型参数:S 是 streaming phase 的 state 类型,T 是 terminal phase 的 state 类型。它们可以相同也可以不同,取决于终止事件需要多少数据。
| Pair | S(streaming) | T(terminal) | 为什么 |
|---|---|---|---|
chat → ant | ChatStreamingState | ChatTerminalState(轻量快照) | 终止事件在 translator 当场构造,只需要 finish_reason + refusal。所有 blocks 在 streaming 阶段已 close。 |
chat → responses | StreamingState | StreamingState(同一类型) | 终止事件(*.done × N + response.completed)需要完整累积数据(text/tool items/usage),由 state 方法 finish_completed_stream 构造。 |
根因是终止事件的构造时机和数据需求:
- ant 目标有明确的 block 生命周期(
content_block_stop),每个 block 在 streaming 阶段就关闭并发出,到finish_reason时 streaming state 里只剩少量快照值,可以用轻量 terminal 类型。 - responses 目标没有 block close 的对等物,text delta 和 tool arguments 一直流到
finish_reason才统一发*.done+response.completed。数据必须留在 state 里等到最后,所以 terminal 必须是完整的 streaming state。
这块代码遵循以下原则:
-
stream carrier 和协议语义分离
upstream/streaming.rs只处理通用 bytes stream、timeout、metrics。- provider
streaming.rs处理协议级 SSE 观察。
-
scanner 只在 observer 层
- chunk 只扫描一次。
- state 只接收已经解码好的
SseEvent。
-
state 只做事件归纳
- 不持有 scanner。
- 不持有 HTTP response/body。
- 不负责输出 body 转换。
-
provider compatibility 留在 provider 层
- Anthropic normalization 不进入 translation。
- Responses diagnostics 不伪装成三协议通用事件。
-
诊断数据就近拥有
- OpenAI Responses recent tail 由 Responses observer 维护。
- snapshot 携带诊断所需 tail,diagnostics 只负责写报告。