Skip to content

流式管道

流式管道是 GodeX 最复杂的执行路径。它连接到提供者的 SSE 流,通过状态机将原始提供者增量映射为结构化的 ResponseStreamEvent 对象,然后通过可组合的转换流链传递,处理错误恢复、输出契约验证、可观测性追踪、日志记录、会话持久化和兼容性诊断。每个转换器只负责单一职责,使管道易于扩展和调试。

概览

关注点组件关键文件
管道编排器StreamPipelinestream-pipeline.ts:31
事件桥接ProviderStreamEventBridgestream-pipeline.ts:88
错误处理器wrapWithErrorHandlerstream-error-handler.ts:34
追踪转换器TraceTransformertrace-transformer.ts:8
日志转换器ResponseLogTransformerresponse-log-transformer.ts:13
契约验证ResponseOutputContractValidationTransformerresponse-output-contract-validation-transformer.ts:13
会话持久化ResponseSessionPersistenceTransformerresponse-session-persistence-transformer.ts:19
SSE 编码器ResponseSseEncoderresponse-sse-encoder.ts:7
管道工具函数pipeTransformstream-utils.ts:6

转换链

StreamPipeline.stream (stream-pipeline.ts:37) 构建一个通过 pipeTransform (stream-utils.ts:6) 连接的 TransformStream 线性链:

阶段用途
1TraceTransformer("upstream.stream.event.raw")记录原始提供者 SSE 事件用于追踪
2ProviderStreamEventBridge通过状态机将提供者增量映射为 ResponseStreamEvent
3wrapWithErrorHandler将上游错误转换为 response.failed 事件
4ResponseOutputContractValidationTransformer在终止事件上验证 JSON 输出契约
5TraceTransformer("upstream.stream.event.transformed")记录转换后事件用于追踪
6ResponseLogTransformer记录包含使用量指标的流完成日志
7ResponseSessionPersistenceTransformer持久化响应会话(如果 store !== false
8CompatibilityLogTransformer在流结束时记录兼容性诊断

提供者流事件桥接

ProviderStreamEventBridge (stream-pipeline.ts:88) 是将原始提供者 SSE 事件转换为结构化响应事件的核心转换器。它:

  1. 创建一个带有请求工具身份映射的 ResponseStreamStateMachine
  2. 对每个 SSE 事件,使用 ctx.provider.spec.stream.deltas(event.data) 提取增量,并通过 mapProviderDeltasToEventsdeferTerminal: true 传入
  3. 在流结束时(flush),通过调用 machine.finish(machine.deferredFinishReason) 发出延迟的终止事件

deferTerminal: true 标志至关重要:它阻止状态机立即转换到终止阶段,给下游转换器(尤其是输出契约验证器)一个检查并可能重写终止事件的机会。

错误处理器

wrapWithErrorHandler (stream-error-handler.ts:34) 将事件流包装在一个捕获读取错误的 ReadableStream 中。当错误发生时:

  1. 通过 recordTraceError 记录错误
  2. 如果状态机仍处于 IDLEIN_PROGRESS 阶段,发出 machine.start()(如果需要)然后发出 machine.fail(error)
  3. 如果 fail() 调用本身抛出已知的流生命周期错误(例如已处于终止状态),以 debug 级别记录日志
  4. 错误处理期间的意外失败以 warn 级别记录
  5. 干净地关闭流

各转换器详解

TraceTransformer

TraceTransformer<T> (trace-transformer.ts:8) 是一个通用的直通转换器,在追踪启用时(ctx.app.traceEnabled)将每个数据块记录为追踪事件。它跟踪一个序列号用于有序的追踪回放。

ResponseLogTransformer

ResponseLogTransformer (response-log-transformer.ts:13) 统计事件数,在遇到终止事件(response.completedresponse.failedresponse.incomplete)时记录完成日志。它记录使用量指标和上游延迟。

ResponseOutputContractValidationTransformer

此转换器 (response-output-contract-validation-transformer.ts:13) 在终止事件上验证 JSON 输出契约。如果验证失败,它将事件重写为 response.failed 并抑制后续事件。详见 Output Contracts

ResponseSessionPersistenceTransformer

ResponseSessionPersistenceTransformer (response-session-persistence-transformer.ts:19) 在遇到终止事件时持久化响应会话。它使用 persistenceAttempted 标志确保只执行一次保存尝试。当 ctx.request.store === false 时 (stream-pipeline.ts:74),此阶段完全跳过。

CompatibilityLogTransformer

CompatibilityLogTransformer (compatibility-log-transformer.ts:7) 是最后一个转换器。它在终止事件到达或 flush 时记录所有累积的兼容性诊断,确保即使流异常关闭也能发出诊断信息。

上游延迟追踪

管道通过 ctx.attributes.set(ATTR_UPSTREAM_LATENCY_MILLIS, ...)stream-pipeline.ts:42 记录上游延迟(连接到提供者流的时间)到 upstreamLatencyMillis。该值稍后包含在 ResponseLogTransformer 的完成日志中。

SSE 编码

转换链之后,ResponseSseEncoder (response-sse-encoder.ts:7) 将每个 ResponseStreamEvent 转换为 SSE 帧(event: type\ndata: JSON\n\n),带有自增的序列号。

交叉引用

参考