跳转到内容

流式内部实现

本文说明 ProxAI 中三个 provider protocol 的 streaming response 处理流程:

OpenAI Responsessrc/provider/openai/responses:output item 生命周期 + sequence_number + tool argument stall 诊断

OpenAI Chat Completionssrc/provider/openai/chat_completions:chunk delta + finish_reason + usage

Anthropic Messagessrc/provider/anthropic_messages:content_block_start/delta/stop + message_delta 终态

重点:provider response 层如何观察上游 SSE 字节流、维护协议状态、判断流结束/异常,并生成日志与诊断信息。

  1. 1pipeline/upstream_response.rs判断成功响应是否为 SSE,并分发到 provider streaming 处理。
  2. 2provider/<protocol>/response/streaming.rs构造协议专属 BodyObserver,返回 outbound streaming body。
  3. 3upstream/streaming.rs包装 reqwest bytes_stream,记录指标,调用 observer hooks,并执行 idle timeout。
  1. 1
    response.created

    Responses stream 起点和初始 response metadata。

  2. 2
    response.output_item.added

    新增 message、reasoning、function_call 或 MCP output item。

  3. 3
    response.output_text.delta / response.function_call_arguments.delta

    文本或工具调用参数增量。

  4. 4
    response.output_text.done / response.function_call_arguments.done

    content part 或工具调用 arguments 语义完成。

  5. 5
    response.output_item.done

    output item 完成。

  6. 6
    response.completed

    完整流的 terminal event。

  1. 1
    chat.completion.chunk

    包含 role/content/tool_calls/finish_reason 的增量 chunk。

  2. 2
    [DONE]

    终止 sentinel;没有它的 EOF 会被视为 closed/incomplete。

  1. 1
    message_start

    Anthropic message stream 开始。

  2. 2
    content_block_start

    text、thinking、tool_use 等 content block 开始。

  3. 3
    content_block_delta

    当前 block 的增量内容。

  4. 4
    content_block_stop

    content block 完成。

  5. 5
    message_delta

    携带 stop_reason、stop_sequence、usage 等 message-level delta。

  6. 6
    message_stop

    完整 Anthropic stream 的 terminal event。

streaming response 的处理大致分为三层:

pipeline/upstream_response.rs
判断上游 2xx response 是否为 SSE
-> provider::handle_streaming_success_response(...)
provider/<protocol>/response/streaming.rs
构造协议专属 BodyObserver
调用 upstream::prepare_response_stream(...)
返回保留原始响应语义的 outbound streaming body
upstream/streaming.rs
包装 reqwest bytes_stream
记录通用 stream 指标
调用 BodyObserver 生命周期 hook

其中 translation/ 不参与 provider response 观察和 provider-local normalization。provider streaming 处理仍然位于 provider/ 边界内。

核心通用入口在:

src/upstream/streaming.rs

主要类型:

pub(crate) trait BodyObserver: Send + Unpin + 'static {
fn on_chunk(&mut self, _chunk: &[u8]) -> BodyAction;
fn on_stream_error(&mut self, error: &reqwest::Error);
fn poll_pending_action(&mut self, _cx: &mut Context<'_>) -> BodyAction;
fn on_stream_finished(&self, head: &UpstreamResponseHead, stats: UpstreamBodyStreamStats);
}

MonitoredUpstreamBodyStream 负责通用 stream carrier 行为:

  1. reqwest::Response::bytes_stream() 拉取 chunk。
  2. 记录通用 upstream stream 指标。
  3. 观察通用 upstream chunk 日志/捕获点。
  4. 调用协议专属 BodyObserver::on_chunk(...)
  5. 处理 read idle timeout。
  6. 在 EOF、错误、超时、注入错误 chunk 或 drop 时调用 on_stream_finished(...)

BodyAction 表达协议 observer 是否需要干预输出:

pub(crate) enum BodyAction {
Continue,
InjectAndClose(Bytes),
}

大多数协议返回 Continue,只有 OpenAI Responses 的工具参数超时/异常会注入一条 SSE error 并关闭流。

三个协议的 streaming observer 现在基本一致:

streaming.rs
- 持有协议 State
- 持有 SseEventScanner
- on_chunk:
1. scan chunk -> Vec<SseEvent>
2. state.observe_events(&events)
3. 做协议特有生命周期检查
- on_stream_error: 记录 UpstreamStreamError
- on_stream_finished: 生成 snapshot 并 emit outcome
state.rs
- 持有协议状态
- observe_events(&[SseEvent]) 将事件落入状态
- 生成 summary / error / terminal 状态

也就是说:

  • streaming.rs 负责 stream 生命周期和 carrier hook。
  • state.rs 负责协议事件到状态的归纳。
  • SseEventScanner 只在 observer 中持有,chunk 只扫描一次。
  • 不再有额外的 tracker.rs wrapper。

跨协议 streaming translation 在 translation/streaming.rs 中使用协议无关的四阶段 carrier:

阶段含义
`Waiting`还没有语义 source message/chunk 初始化 translated stream。
`Streaming(StreamingPhase<S>)`source delta 正在进行;pair-private state 和目标协议可表达 output 的 emitted tracker 都在 StreamingPhase<S> 中。
`Terminal(T)`source protocol 已经发出语义终止信号,但 carrier/source stop 还没有完全消费。
`Stopped`translator 已经发出最终的目标协议 stream 输出。

Terminal(T) 有意保持泛型,因为不同 source protocol 的终止方式不同:

Inbound sourceTerminal payload原因
Anthropic MessagesStreamingPhase<S>message_delta 携带 stop/usage 终态语义,但后面仍然必须出现 message_stop。translator 需要保留完整 phase,直到 message_stop 消费它。
OpenAI Chat Completionspair-specific Tfinish_reason 已经结束语义 delta。之后的 [DONE]、EOF 或 usage-only chunk 只需要目标协议专属的 terminal summary,例如输出 Anthropic 时的 ChatTerminalState,或输出 Responses 时的 ()

anthropic_messages::streamingopenai_chat_completions::streaming 这类协议 wrapper 负责保留 source-specific event ordering 和错误文案。pair translator 仍然负责目标协议是否可表达、output item/block emission,以及 terminal flush 行为。

Streaming identity(StreamIdentity)属于协议无关的 inbound lifecycle carrier;source-protocol wrapper 负责决定何时初始化或校验它。

Inbound sourceidentity 来源处理原因
Anthropic Messagesmessage_start.message.idmessage_start.message.modelAnthropic 在显式的 message_start 语义边界一次性给出 identity。后续 content_block_*message_deltamessage_stop 不再重复 id/model,所以 Anthropic wrapper 在 message_start 时初始化 lifecycle identity,并依赖 lifecycle ordering 防止重复 start。
OpenAI Chat Completions每个 chat.completion.chunk 都重复携带 idmodelChat 没有单独的 message_start 事件。第一条语义 chunk 初始化 lifecycle identity;后续 chunk 必须与该 identity 校验一致,避免把不同 upstream response 的片段合并到同一个 translated message/response。

InboundStreamLifecycle 把 identity 放在 phase enum 外层,因为 identity 是整条 stream 的 envelope metadata,会稳定横跨 StreamingTerminalStopped。pair-local stream state 应只保存目标转换状态或派生 id,不再保存完整 source identity 的副本。

主要文件:

src/provider/openai/responses/response/streaming.rs
src/provider/openai/responses/response/state.rs

observer 结构:

struct OpenaiResponsesUpstreamBodyObserver {
state: ResponsesUpstreamState,
recent_tail: Vec<u8>,
saw_terminal_event: bool,
stream_error: Option<UpstreamStreamError>,
tool_arguments: ToolArgumentStreamState,
timeout: Option<Duration>,
sse_scanner: SseEventScanner,
obs: ObserveContext,
}
chunk
-> 维护 recent_tail,最多保留 16 KiB
-> SseEventScanner::scan(chunk)
-> ResponsesUpstreamState::observe_events(&events)
-> 检查 terminal event
-> 检查 tool call arguments 是否异常/超时相关

ResponsesUpstreamState::observe_events(...) 位于 state_events.rs,负责:

  • 解析 OpenAI Responses stream event。
  • 记录最新 sequence_number
  • 记录 response.created / response.completed 等 snapshot。
  • 记录 output item / function call / MCP 等增量观察状态。
  • 识别某些 provider 返回的 nested generic error event。

OpenAI Responses 有额外语义:工具调用参数可能开始后长时间没有完成。ToolArgumentStreamState 负责追踪这类状态。

如果检测到异常或 timeout:

  1. observer 记录 UpstreamStreamError::Stream
  2. 构造一条 OpenAI Responses 风格 SSE error event。
  3. 返回:
BodyAction::InjectAndClose(error_sse_chunk(...))

上游 stream carrier 会把该 error chunk 发给客户端并关闭流。

OpenAI Responses observer 维护:

recent_tail: Vec<u8>

最多保留最近 16 KiB stream bytes。stream snapshot 中包含该 tail:

ResponsesUpstreamStreamSnapshot {
head,
metrics,
state,
recent_tail,
metadata,
}

当 outcome 是 UnfinishedTool 时,diagnostics 会用 snapshot.recent_tail 生成本地诊断 JSON,用于分析最后的 SSE 尾部是否缺失 terminal event 或 arguments done event。

主要文件:

src/provider/openai/chat_completions/response/streaming.rs
src/provider/openai/chat_completions/response/state.rs

observer 结构:

struct ChatUpstreamBodyObserver {
state: ChatUpstreamResponseState,
sse_scanner: SseEventScanner,
stream_error: Option<UpstreamStreamError>,
obs: ObserveContext,
}
chunk
-> SseEventScanner::scan(chunk)
-> ChatUpstreamResponseState::observe_events(&events)

ChatUpstreamResponseState::observe_events(...) 负责:

  • 识别 [DONE] sentinel。
  • 解析 CreateChatCompletionStreamResponse
  • 应用增量 observed updates。
  • 记录 partial / terminal stream chunk projection。

Chat Completions 的 EOF 完整性判断很简单:

state.stream_done

只有看到 [DONE] sentinel 才认为 stream 完整完成。否则 EOF 会被记录为 Closed

主要文件:

src/provider/anthropic_messages/response/streaming.rs
src/provider/anthropic_messages/response/state.rs

observer 结构:

struct AnthropicSseObserver {
state: AnthropicResponseState,
stream_error: Option<UpstreamStreamError>,
sse_scanner: SseEventScanner,
obs: ObserveContext,
}
chunk
-> SseEventScanner::scan(chunk)
-> AnthropicResponseState::observe_events(&events)

AnthropicResponseState::observe_events(...) 负责:

  • 解析 SSE data JSON。
  • 对 Anthropic-compatible provider 的 stream event payload 做 provider-local normalization。
  • 解析为 MessageStreamEvent
  • 记录 message id / model / token usage / stop reason / stream_done / summary。

Anthropic stream 完整性判断:

state.stream_done()

也就是是否看到 message_stop

Anthropic-compatible provider 可能返回非严格 Anthropic Messages 形状。相关 normalization 位于:

src/provider/anthropic_messages/response/normalize/

streaming response 中,如果 provider compatibility 是 AnthropicCompatible,outbound SSE body 会经过:

normalize::normalize_sse_stream(body_stream)

这属于 provider-local response normalization,不进入 translation/

provider stream outcome 统一通过:

ProviderStreamOutcomeObserved {
snapshot,
outcome,
}

常见 outcome:

ProviderStreamOutcome::Completed
ProviderStreamOutcome::Closed
ProviderStreamOutcome::Error(...)
ProviderStreamOutcome::UnfinishedTool(...)

三协议判断方式:

协议Completed 条件特殊错误
OpenAI Responses看到 Responses terminal eventtool arguments stall / unfinished tool
Chat Completions看到 [DONE]无协议特有注入
Anthropic Messages看到 message_stop无协议特有注入

每个翻译方向(例如 anthropic_messages -> openai_responses)的 streaming 实现统一拆为 streaming/ 目录下的三个子模块:

streaming/
├── mod.rs # translator:驱动 state + 调 output,唯一拥有 lifecycle 和时序知识
├── state.rs # 累积状态机:字段 + register/append/stop/snapshot 方法
└── output.rs # 纯构造器:接收数据返回事件,不持有 streaming 状态
  • state 只做累积和状态查询,不关心事件循环。
  • output 只做事件/Item 构造,不持有 streaming 状态(不接 &mut State)。
  • mod 驱动 state、调 output、管理 lifecycle。

判据:如果一个函数需要 &mut State(advance counter、push items、读累积字段),它是 state 行为,留在 state.rs;如果只接收数据返回事件,它是 output 构造,放 output.rs。

当 state 和 output 共享同一个内部数据结构时,需要 streaming/types.rs 作为中立归属,避免 state ↔ output 互相依赖。

是否需要取决于累积模型:

Pair累积模型finalize 模型是否需要 types.rs
ant → responses每个 content_block 独立累积stop_block 返回 owned 数据,output 消耗它否(StreamBlock 单向流过 output,move 语义)
chat → responses整条流累积 text/tool itemsfinish_completed_stream 是 state 方法,多次读 state 字段是(StreamTextItem/StreamToolItem 被 state 持有 + output 多次读)

根本原因:ant 的累积单位是 content_block(明确的开/关生命周期,stop 后 move 走),chat 的累积单位是整条流(流结束才统一 finalize,state 需要一直持有数据供多次读取和 terminal snapshot)。后者的“state 持有 + output 反复读”模式,自然让共享类型需要中立归属。

InboundStreamLifecycle<S, T> 有两个类型参数:S 是 streaming phase 的 state 类型,T 是 terminal phase 的 state 类型。它们可以相同也可以不同,取决于终止事件需要多少数据。

PairS(streaming)T(terminal)为什么
chat → antChatStreamingStateChatTerminalState(轻量快照)终止事件在 translator 当场构造,只需要 finish_reason + refusal。所有 blocks 在 streaming 阶段已 close。
chat → responsesStreamingStateStreamingState(同一类型)终止事件(*.done × N + response.completed)需要完整累积数据(text/tool items/usage),由 state 方法 finish_completed_stream 构造。

根因是终止事件的构造时机和数据需求:

  • ant 目标有明确的 block 生命周期(content_block_stop),每个 block 在 streaming 阶段就关闭并发出,到 finish_reason 时 streaming state 里只剩少量快照值,可以用轻量 terminal 类型。
  • responses 目标没有 block close 的对等物,text delta 和 tool arguments 一直流到 finish_reason 才统一发 *.done + response.completed。数据必须留在 state 里等到最后,所以 terminal 必须是完整的 streaming state。

这块代码遵循以下原则:

  1. stream carrier 和协议语义分离

    • upstream/streaming.rs 只处理通用 bytes stream、timeout、metrics。
    • provider streaming.rs 处理协议级 SSE 观察。
  2. scanner 只在 observer 层

    • chunk 只扫描一次。
    • state 只接收已经解码好的 SseEvent
  3. state 只做事件归纳

    • 不持有 scanner。
    • 不持有 HTTP response/body。
    • 不负责输出 body 转换。
  4. provider compatibility 留在 provider 层

    • Anthropic normalization 不进入 translation。
    • Responses diagnostics 不伪装成三协议通用事件。
  5. 诊断数据就近拥有

    • OpenAI Responses recent tail 由 Responses observer 维护。
    • snapshot 携带诊断所需 tail,diagnostics 只负责写报告。