流式管道
流式管道是 GodeX 最复杂的执行路径。它连接到提供者的 SSE 流,通过状态机将原始提供者增量映射为结构化的 ResponseStreamEvent 对象,然后通过可组合的转换流链传递,处理错误恢复、输出契约验证、可观测性追踪、日志记录、会话持久化和兼容性诊断。每个转换器只负责单一职责,使管道易于扩展和调试。
概览
| 关注点 | 组件 | 关键文件 |
|---|---|---|
| 管道编排器 | StreamPipeline | stream-pipeline.ts:31 |
| 事件桥接 | ProviderStreamEventBridge | stream-pipeline.ts:88 |
| 错误处理器 | wrapWithErrorHandler | stream-error-handler.ts:34 |
| 追踪转换器 | TraceTransformer | trace-transformer.ts:8 |
| 日志转换器 | ResponseLogTransformer | response-log-transformer.ts:13 |
| 契约验证 | ResponseOutputContractValidationTransformer | response-output-contract-validation-transformer.ts:13 |
| 会话持久化 | ResponseSessionPersistenceTransformer | response-session-persistence-transformer.ts:19 |
| SSE 编码器 | ResponseSseEncoder | response-sse-encoder.ts:7 |
| 管道工具函数 | pipeTransform | stream-utils.ts:6 |
转换链
StreamPipeline.stream (stream-pipeline.ts:37) 构建一个通过 pipeTransform (stream-utils.ts:6) 连接的 TransformStream 线性链:
| 阶段 | 类 | 用途 |
|---|---|---|
| 1 | TraceTransformer("upstream.stream.event.raw") | 记录原始提供者 SSE 事件用于追踪 |
| 2 | ProviderStreamEventBridge | 通过状态机将提供者增量映射为 ResponseStreamEvent |
| 3 | wrapWithErrorHandler | 将上游错误转换为 response.failed 事件 |
| 4 | ResponseOutputContractValidationTransformer | 在终止事件上验证 JSON 输出契约 |
| 5 | TraceTransformer("upstream.stream.event.transformed") | 记录转换后事件用于追踪 |
| 6 | ResponseLogTransformer | 记录包含使用量指标的流完成日志 |
| 7 | ResponseSessionPersistenceTransformer | 持久化响应会话(如果 store !== false) |
| 8 | CompatibilityLogTransformer | 在流结束时记录兼容性诊断 |
提供者流事件桥接
ProviderStreamEventBridge (stream-pipeline.ts:88) 是将原始提供者 SSE 事件转换为结构化响应事件的核心转换器。它:
- 创建一个带有请求工具身份映射的
ResponseStreamStateMachine - 对每个 SSE 事件,使用
ctx.provider.spec.stream.deltas(event.data)提取增量,并通过mapProviderDeltasToEvents以deferTerminal: true传入 - 在流结束时(
flush),通过调用machine.finish(machine.deferredFinishReason)发出延迟的终止事件
deferTerminal: true 标志至关重要:它阻止状态机立即转换到终止阶段,给下游转换器(尤其是输出契约验证器)一个检查并可能重写终止事件的机会。
错误处理器
wrapWithErrorHandler (stream-error-handler.ts:34) 将事件流包装在一个捕获读取错误的 ReadableStream 中。当错误发生时:
- 通过
recordTraceError记录错误 - 如果状态机仍处于
IDLE或IN_PROGRESS阶段,发出machine.start()(如果需要)然后发出machine.fail(error) - 如果
fail()调用本身抛出已知的流生命周期错误(例如已处于终止状态),以 debug 级别记录日志 - 错误处理期间的意外失败以 warn 级别记录
- 干净地关闭流
各转换器详解
TraceTransformer
TraceTransformer<T> (trace-transformer.ts:8) 是一个通用的直通转换器,在追踪启用时(ctx.app.traceEnabled)将每个数据块记录为追踪事件。它跟踪一个序列号用于有序的追踪回放。
ResponseLogTransformer
ResponseLogTransformer (response-log-transformer.ts:13) 统计事件数,在遇到终止事件(response.completed、response.failed、response.incomplete)时记录完成日志。它记录使用量指标和上游延迟。
ResponseOutputContractValidationTransformer
此转换器 (response-output-contract-validation-transformer.ts:13) 在终止事件上验证 JSON 输出契约。如果验证失败,它将事件重写为 response.failed 并抑制后续事件。详见 Output Contracts。
ResponseSessionPersistenceTransformer
ResponseSessionPersistenceTransformer (response-session-persistence-transformer.ts:19) 在遇到终止事件时持久化响应会话。它使用 persistenceAttempted 标志确保只执行一次保存尝试。当 ctx.request.store === false 时 (stream-pipeline.ts:74),此阶段完全跳过。
CompatibilityLogTransformer
CompatibilityLogTransformer (compatibility-log-transformer.ts:7) 是最后一个转换器。它在终止事件到达或 flush 时记录所有累积的兼容性诊断,确保即使流异常关闭也能发出诊断信息。
上游延迟追踪
管道通过 ctx.attributes.set(ATTR_UPSTREAM_LATENCY_MILLIS, ...) 在 stream-pipeline.ts:42 记录上游延迟(连接到提供者流的时间)到 upstreamLatencyMillis。该值稍后包含在 ResponseLogTransformer 的完成日志中。
SSE 编码
转换链之后,ResponseSseEncoder (response-sse-encoder.ts:7) 将每个 ResponseStreamEvent 转换为 SSE 帧(event: type\ndata: JSON\n\n),带有自增的序列号。
交叉引用
- Stream Reconstruction --
ProviderStreamEventBridge内部使用的状态机和增量到事件映射 - Sync Pipeline -- 更简单的非流式对应管道
- Output Contracts -- 转换链中使用的验证逻辑
- Tool Planning -- 生成事件桥接使用的
ToolIdentityMap
参考
- stream-pipeline.ts:31 --
StreamPipeline类 - stream-pipeline.ts:88 --
ProviderStreamEventBridge类 - stream-error-handler.ts:34 --
wrapWithErrorHandler函数 - trace-transformer.ts:8 --
TraceTransformer类 - response-log-transformer.ts:13 --
ResponseLogTransformer类 - response-output-contract-validation-transformer.ts:13 -- 契约验证转换器
- response-session-persistence-transformer.ts:19 -- 会话持久化转换器
- stream-utils.ts:6 --
pipeTransform工具函数 - response-sse-encoder.ts:7 --
ResponseSseEncoder类