Skip to content

Streaming Pipeline

The streaming pipeline is GodeX's most complex execution path. It connects to a provider's SSE stream, maps raw provider deltas into structured ResponseStreamEvent objects via the state machine, and then passes them through a composable chain of transform streams that handle error recovery, output contract validation, observability tracing, logging, session persistence, and compatibility diagnostics. Each transformer has a single responsibility, making the pipeline easy to extend and debug.

At a Glance

ConcernComponentKey File
Pipeline orchestratorStreamPipelinestream-pipeline.ts:31
Event bridgeProviderStreamEventBridgestream-pipeline.ts:88
Error handlerwrapWithErrorHandlerstream-error-handler.ts:34
Trace transformerTraceTransformertrace-transformer.ts:8
Log transformerResponseLogTransformerresponse-log-transformer.ts:13
Contract validationResponseOutputContractValidationTransformerresponse-output-contract-validation-transformer.ts:13
Session persistenceResponseSessionPersistenceTransformerresponse-session-persistence-transformer.ts:19
SSE encoderResponseSseEncoderresponse-sse-encoder.ts:7
Pipe utilitypipeTransformstream-utils.ts:6

Transform Chain

StreamPipeline.stream (stream-pipeline.ts:37) constructs a linear chain of TransformStream stages connected via pipeTransform (stream-utils.ts:6):

StageClassPurpose
1TraceTransformer("upstream.stream.event.raw")Record raw provider SSE events for tracing
2ProviderStreamEventBridgeMap provider deltas to ResponseStreamEvent via state machine
3wrapWithErrorHandlerConvert upstream errors to response.failed events
4ResponseOutputContractValidationTransformerValidate JSON output contracts on terminal events
5TraceTransformer("upstream.stream.event.transformed")Record transformed events for tracing
6ResponseLogTransformerLog stream completion with usage metrics
7ResponseSessionPersistenceTransformerPersist response session (if store !== false)
8CompatibilityLogTransformerLog compatibility diagnostics at stream end

Provider Stream Event Bridge

ProviderStreamEventBridge (stream-pipeline.ts:88) is the core transformer that converts raw provider SSE events into structured response events. It:

  1. Creates a ResponseStreamStateMachine with the request's tool identity map
  2. On each SSE event, extracts deltas using ctx.provider.spec.stream.deltas(event.data) and feeds them to mapProviderDeltasToEvents with deferTerminal: true
  3. On stream end (flush), emits the deferred terminal event by calling machine.finish(machine.deferredFinishReason)

The deferTerminal: true flag is critical: it prevents the state machine from transitioning to a terminal phase immediately, giving downstream transformers (especially the output contract validator) a chance to inspect and potentially rewrite the terminal event.

Error Handler

wrapWithErrorHandler (stream-error-handler.ts:34) wraps the event stream in a ReadableStream that catches read errors. When an error occurs:

  1. Records the error via recordTraceError
  2. If the state machine is still in IDLE or IN_PROGRESS, emits machine.start() (if needed) followed by machine.fail(error)
  3. If the fail() call itself throws a known stream lifecycle error (e.g., already terminal), logs at debug level
  4. Unexpected failures during error handling are logged at warn level
  5. Closes the stream cleanly

Individual Transformers

TraceTransformer

TraceTransformer<T> (trace-transformer.ts:8) is a generic pass-through transformer that records each chunk as a trace event when tracing is enabled (ctx.app.traceEnabled). It tracks a sequence number for ordered trace playback.

ResponseLogTransformer

ResponseLogTransformer (response-log-transformer.ts:13) counts events and logs completion when it encounters a terminal event (response.completed, response.failed, response.incomplete). It records usage metrics and upstream latency.

ResponseOutputContractValidationTransformer

This transformer (response-output-contract-validation-transformer.ts:13) validates JSON output contracts on terminal events. If validation fails, it rewrites the event to response.failed and suppresses subsequent events. See Output Contracts for details.

ResponseSessionPersistenceTransformer

ResponseSessionPersistenceTransformer (response-session-persistence-transformer.ts:19) persists the response session when it encounters a terminal event. It uses a persistenceAttempted flag to ensure only one save attempt occurs. This stage is skipped entirely when ctx.request.store === false (stream-pipeline.ts:74).

CompatibilityLogTransformer

CompatibilityLogTransformer (compatibility-log-transformer.ts:7) is the final transformer. It logs all accumulated compatibility diagnostics when the terminal event arrives or on flush, ensuring diagnostics are always emitted even if the stream closes abnormally.

Upstream Latency Tracking

The pipeline records upstream latency (time to connect to the provider stream) in upstreamLatencyMillis via ctx.attributes.set(ATTR_UPSTREAM_LATENCY_MILLIS, ...) at stream-pipeline.ts:42. This value is later included in the ResponseLogTransformer completion log.

SSE Encoding

After the transform chain, ResponseSseEncoder (response-sse-encoder.ts:7) converts each ResponseStreamEvent into an SSE frame (event: type\ndata: JSON\n\n) with auto-incrementing sequence numbers.

Cross-References

References