Streaming Pipeline
The streaming pipeline is GodeX's most complex execution path. It connects to a provider's SSE stream, maps raw provider deltas into structured ResponseStreamEvent objects via the state machine, and then passes them through a composable chain of transform streams that handle error recovery, output contract validation, observability tracing, logging, session persistence, and compatibility diagnostics. Each transformer has a single responsibility, making the pipeline easy to extend and debug.
At a Glance
| Concern | Component | Key File |
|---|---|---|
| Pipeline orchestrator | StreamPipeline | stream-pipeline.ts:31 |
| Event bridge | ProviderStreamEventBridge | stream-pipeline.ts:88 |
| Error handler | wrapWithErrorHandler | stream-error-handler.ts:34 |
| Trace transformer | TraceTransformer | trace-transformer.ts:8 |
| Log transformer | ResponseLogTransformer | response-log-transformer.ts:13 |
| Contract validation | ResponseOutputContractValidationTransformer | response-output-contract-validation-transformer.ts:13 |
| Session persistence | ResponseSessionPersistenceTransformer | response-session-persistence-transformer.ts:19 |
| SSE encoder | ResponseSseEncoder | response-sse-encoder.ts:7 |
| Pipe utility | pipeTransform | stream-utils.ts:6 |
Transform Chain
StreamPipeline.stream (stream-pipeline.ts:37) constructs a linear chain of TransformStream stages connected via pipeTransform (stream-utils.ts:6):
| Stage | Class | Purpose |
|---|---|---|
| 1 | TraceTransformer("upstream.stream.event.raw") | Record raw provider SSE events for tracing |
| 2 | ProviderStreamEventBridge | Map provider deltas to ResponseStreamEvent via state machine |
| 3 | wrapWithErrorHandler | Convert upstream errors to response.failed events |
| 4 | ResponseOutputContractValidationTransformer | Validate JSON output contracts on terminal events |
| 5 | TraceTransformer("upstream.stream.event.transformed") | Record transformed events for tracing |
| 6 | ResponseLogTransformer | Log stream completion with usage metrics |
| 7 | ResponseSessionPersistenceTransformer | Persist response session (if store !== false) |
| 8 | CompatibilityLogTransformer | Log compatibility diagnostics at stream end |
Provider Stream Event Bridge
ProviderStreamEventBridge (stream-pipeline.ts:88) is the core transformer that converts raw provider SSE events into structured response events. It:
- Creates a
ResponseStreamStateMachinewith the request's tool identity map - On each SSE event, extracts deltas using
ctx.provider.spec.stream.deltas(event.data)and feeds them tomapProviderDeltasToEventswithdeferTerminal: true - On stream end (
flush), emits the deferred terminal event by callingmachine.finish(machine.deferredFinishReason)
The deferTerminal: true flag is critical: it prevents the state machine from transitioning to a terminal phase immediately, giving downstream transformers (especially the output contract validator) a chance to inspect and potentially rewrite the terminal event.
Error Handler
wrapWithErrorHandler (stream-error-handler.ts:34) wraps the event stream in a ReadableStream that catches read errors. When an error occurs:
- Records the error via
recordTraceError - If the state machine is still in
IDLEorIN_PROGRESS, emitsmachine.start()(if needed) followed bymachine.fail(error) - If the
fail()call itself throws a known stream lifecycle error (e.g., already terminal), logs at debug level - Unexpected failures during error handling are logged at warn level
- Closes the stream cleanly
Individual Transformers
TraceTransformer
TraceTransformer<T> (trace-transformer.ts:8) is a generic pass-through transformer that records each chunk as a trace event when tracing is enabled (ctx.app.traceEnabled). It tracks a sequence number for ordered trace playback.
ResponseLogTransformer
ResponseLogTransformer (response-log-transformer.ts:13) counts events and logs completion when it encounters a terminal event (response.completed, response.failed, response.incomplete). It records usage metrics and upstream latency.
ResponseOutputContractValidationTransformer
This transformer (response-output-contract-validation-transformer.ts:13) validates JSON output contracts on terminal events. If validation fails, it rewrites the event to response.failed and suppresses subsequent events. See Output Contracts for details.
ResponseSessionPersistenceTransformer
ResponseSessionPersistenceTransformer (response-session-persistence-transformer.ts:19) persists the response session when it encounters a terminal event. It uses a persistenceAttempted flag to ensure only one save attempt occurs. This stage is skipped entirely when ctx.request.store === false (stream-pipeline.ts:74).
CompatibilityLogTransformer
CompatibilityLogTransformer (compatibility-log-transformer.ts:7) is the final transformer. It logs all accumulated compatibility diagnostics when the terminal event arrives or on flush, ensuring diagnostics are always emitted even if the stream closes abnormally.
Upstream Latency Tracking
The pipeline records upstream latency (time to connect to the provider stream) in upstreamLatencyMillis via ctx.attributes.set(ATTR_UPSTREAM_LATENCY_MILLIS, ...) at stream-pipeline.ts:42. This value is later included in the ResponseLogTransformer completion log.
SSE Encoding
After the transform chain, ResponseSseEncoder (response-sse-encoder.ts:7) converts each ResponseStreamEvent into an SSE frame (event: type\ndata: JSON\n\n) with auto-incrementing sequence numbers.
Cross-References
- Stream Reconstruction -- the state machine and delta-to-event mapping used inside
ProviderStreamEventBridge - Sync Pipeline -- the simpler non-streaming counterpart
- Output Contracts -- validation logic used in the transform chain
- Tool Planning -- produces
ToolIdentityMapused by the event bridge
References
- stream-pipeline.ts:31 --
StreamPipelineclass - stream-pipeline.ts:88 --
ProviderStreamEventBridgeclass - stream-error-handler.ts:34 --
wrapWithErrorHandlerfunction - trace-transformer.ts:8 --
TraceTransformerclass - response-log-transformer.ts:13 --
ResponseLogTransformerclass - response-output-contract-validation-transformer.ts:13 -- Contract validation transformer
- response-session-persistence-transformer.ts:19 -- Session persistence transformer
- stream-utils.ts:6 --
pipeTransformutility - response-sse-encoder.ts:7 --
ResponseSseEncoderclass