Stream Transforms
When GodeX proxies a streaming response from an upstream LLM provider, the raw SSE byte stream must pass through several processing stages before it reaches the client. Each stage is implemented as a composable TransformStream transformer in src/responses/stream-transforms/. This design keeps every concern isolated and testable -- tracing records events without touching logging, contract validation can fail the response without affecting persistence, and SSE encoding happens at the edge without any upstream awareness. Understanding the transform chain is essential for debugging streaming behavior, adding new cross-cutting concerns, or extending the response pipeline.
At a Glance
| Transformer | Input Type | Output Type | Purpose |
|---|---|---|---|
TraceTransformer | T | T | Records raw/transformed events to the trace subsystem |
ProviderStreamEventBridge | JsonServerSentEvent | ResponseStreamEvent | Maps provider deltas to OpenAI-compatible events |
wrapWithErrorHandler | ResponseStreamEvent | ResponseStreamEvent | Converts stream errors into response.failed events |
ResponseOutputContractValidationTransformer | ResponseStreamEvent | ResponseStreamEvent | Validates output contract on terminal events |
ResponseLogTransformer | ResponseStreamEvent | ResponseStreamEvent | Logs stream completion with timing and usage |
ResponseSessionPersistenceTransformer | ResponseStreamEvent | ResponseStreamEvent | Persists session on stream completion |
CompatibilityLogTransformer | ResponseStreamEvent | ResponseStreamEvent | Logs compatibility diagnostics at stream end |
ResponseSseEncoder | ResponseStreamEvent | Uint8Array | Encodes events to SSE text lines |
Transform Chain Order
The StreamPipeline.stream() method in src/responses/stream-pipeline.ts assembles the full chain. Every stage is connected via the pipeTransform helper from src/responses/stream-transforms/stream-utils.ts:6, which wraps each transformer in a standard TransformStream.
The pipeline is assembled at src/responses/stream-pipeline.ts:37 and each pipeTransform call chains the previous ReadableStream into the next transformer.
The pipeTransform Helper
All chaining uses a single utility function:
export function pipeTransform<I, O>(
stream: ReadableStream<I>,
transformer: Transformer<I, O>,
): ReadableStream<O>Defined at src/responses/stream-transforms/stream-utils.ts:6, it creates a new TransformStream from any Transformer and pipes the input stream through it. This keeps the pipeline composable -- each stage is an independent transformer that can be tested in isolation.
The module also exports ATTR_UPSTREAM_LATENCY_MILLIS (stream-utils.ts:13) and responseFromTerminalEvent (stream-utils.ts:15), both of which are used by multiple downstream transformers.
TraceTransformer
The TraceTransformer is a passthrough transformer that records every stream event to the trace subsystem. It appears twice in the pipeline -- once before the event bridge (recording raw upstream events) and once after validation (recording transformed events).
Key implementation details:
- Extends
SafeTransformer<T, T>from@ahoo-wang/fetcher-eventstream, which provides error-safe passthrough semantics - Maintains an auto-incrementing
sequencecounter for ordered trace records - Short-circuits if
ctx.app.traceEnabledis false -- no allocation overhead when tracing is off - Source: src/responses/stream-transforms/trace-transformer.ts:8
ResponseLogTransformer
The ResponseLogTransformer watches for terminal events and logs a structured completion record with timing, usage, and latency data.
| Logged Field | Source |
|---|---|
status | response.status from terminal event |
model | response.model |
outputCount | response.output.length |
durationMillis | Date.now() - ctx.createdAt * 1000 |
usage | response.usage |
cacheHitRatio | Computed from usage via cacheHitRatioFromResponseUsage |
upstreamLatencyMillis | Read from ctx.attributes (set during provider exchange) |
streamEventCount | Events seen so far |
It also records usage to the trace subsystem via recordTraceUsage (src/responses/stream-transforms/response-log-transformer.ts:13). The transformer fires exactly once -- subsequent terminal events are ignored via the logged guard.
CompatibilityLogTransformer
The CompatibilityLogTransformer emits compatibility diagnostics at stream end. It checks for terminal events (response.completed, response.failed, response.incomplete, response.cancelled) and calls logDiagnostics with timing data.
The onFlush fallback at src/responses/stream-transforms/compatibility-log-transformer.ts:24 ensures diagnostics are logged even if the stream closes without a terminal event. The severity of the log entry matches the worst diagnostic -- errors at error level, warnings at warn level, and informational diagnostics at info level.
ResponseOutputContractValidationTransformer
This transformer validates the output contract when a terminal event arrives. If the response output does not satisfy the contract (for example, json_schema format was requested but the output is not valid JSON), the transformer rewrites the terminal event into a response.failed event.
Once a terminal event is rewritten, all subsequent events are silently dropped (if (this.rewrittenTerminal) return at src/responses/stream-transforms/response-output-contract-validation-transformer.ts:22). This prevents partial output from leaking to the client after a contract failure.
ResponseSessionPersistenceTransformer
The ResponseSessionPersistenceTransformer saves the completed response to the session store when the stream finishes. It delegates to a pluggable saveSession callback (defaulting to saveResponseSession from the persistence module).
Key behaviors:
- Passes every event through immediately via
enqueuebefore attempting persistence - Guards against duplicate persistence with the
persistenceAttemptedflag - Skipped entirely when
ctx.request.store === false(theStreamPipelinechecks this before adding the transformer at src/responses/stream-pipeline.ts:74) - Catches and logs session save errors at
warnlevel without failing the stream
Source: src/responses/stream-transforms/response-session-persistence-transformer.ts:19
ResponseSseEncoder
The ResponseSseEncoder is the final stage that converts ResponseStreamEvent objects into SSE wire format. Unlike the other transformers, it changes the output type from ResponseStreamEvent to Uint8Array.
Each event is encoded as:
event: <event.type>
data: <JSON payload with sequence_number>The encoder tracks sequence numbers, using the event's own sequence_number if present, or auto-incrementing otherwise (src/responses/stream-transforms/response-sse-encoder.ts:4).
This transformer is applied at the HTTP dispatch layer, not inside StreamPipeline. The response dispatcher adds it as the final step:
const sseBody = pipeTransform(eventStream, new ResponseSseEncoder());Shared Patterns
All transformers follow a consistent design pattern:
| Pattern | Implementation |
|---|---|
| Base class | SafeTransformer<I, O> from @ahoo-wang/fetcher-eventstream |
| Passthrough | Call this.enqueue(controller, chunk) before doing work |
| Once-only actions | Boolean guards (logged, logged, persistenceAttempted, rewrittenTerminal) |
| Context access | All transformers receive ResponsesContext via constructor |
| Error safety | SafeTransformer base class ensures errors in onTransform do not crash the stream |
Adding a New Transform Stage
To add a new concern to the pipeline:
- Create a new file in
src/responses/stream-transforms/extendingSafeTransformer - Implement
onTransform(chunk, controller)-- always callthis.enqueue(controller, chunk)to pass data through - Add an
export *to src/responses/stream-transforms/index.ts - Insert a
pipeTransformcall at the correct position inStreamPipeline.stream()at src/responses/stream-pipeline.ts - Add a co-located
.test.tsfile following the existing test patterns
Cross-References
- Stream Pipeline -- how transforms assemble into the full streaming pipeline
- Architecture Overview -- system-level context for where stream transforms fit
- Error Handling -- the
wrapWithErrorHandlerstage and error propagation - Session Management -- how the persistence transformer saves sessions
- Testing -- co-located unit tests for each transformer
- Trace System -- how
TraceTransformerfeeds the trace subsystem
References
- src/responses/stream-transforms/trace-transformer.ts -- Raw and transformed event tracing
- src/responses/stream-transforms/response-log-transformer.ts -- Completion logging with usage
- src/responses/stream-transforms/compatibility-log-transformer.ts -- Compatibility diagnostics logging
- src/responses/stream-transforms/response-output-contract-validation-transformer.ts -- Output contract validation and rewriting
- src/responses/stream-transforms/response-session-persistence-transformer.ts -- Session persistence on completion
- src/responses/stream-transforms/response-sse-encoder.ts -- SSE wire format encoding
- src/responses/stream-transforms/stream-utils.ts --
pipeTransformhelper and shared utilities - src/responses/stream-transforms/index.ts -- Barrel re-exports
- src/responses/stream-pipeline.ts:37-85 -- Pipeline assembly
- src/server/routes/responses/response-dispatcher.ts -- Where SSE encoding is applied