Part 4 of 6
The Streaming Layer
How the UI Stays Live
The engine is an async generator that yields typed events as each phase completes. The API route wraps this in a ReadableStream and sends it as Server-Sent Events. The user is never staring at a blank screen — every tool call, thinking step, and cost update appears in real time.
The Async Generator Pattern
The engine function signature is async function* — an async generator. Instead of returning a single value, it yields events throughout execution. This is what makes real-time streaming possible without WebSockets or polling.
Engine as an async generator
// The engine yields events throughout its execution
async function* streamWorker(
worker: WorkerDefinition,
goal: string,
chatHistory: ChatMessage[],
organizationId: string,
userId: string
): AsyncGenerator<WorkerEvent> {
// Phase 0: Initialize state
const state = createWorkerState(worker, goal, chatHistory, organizationId, userId);
// Phase 1: Pre-enrichment
yield { type: 'pre_enrichment', data: { message: 'Gathering context...' } };
const enrichment = await runWorkerPreEnrichment(worker, goal, organizationId, userId);
if (enrichment.contextMarkdown) {
state.preEnrichedContext = enrichment.contextMarkdown;
yield {
type: 'pre_enrichment',
data: { toolsRun: enrichment.toolsRun, message: `Pre-loaded ${enrichment.toolsRun.length} data sources` }
};
}
// Phase 2: Main loop
while (true) {
const exit = checkExitConditions(state);
if (exit.shouldExit) {
// Phase 4: Synthesis fallback
const result = await synthesizeResponse(worker, state);
yield { type: 'response', data: { message: result.message } };
break;
}
yield { type: 'status', data: { message: `Pass ${state.passCount + 1}: analyzing...` } };
// Get AI decision
const decision = await getAIDecision(worker, state);
yield { type: 'thinking', data: { thinking: decision.thinking } };
// Update living document
if (decision.document_updates) {
updateDocument(state, decision.document_updates);
yield { type: 'document_update', data: { sections: state.document.sections } };
}
// Ready to respond?
if (decision.should_respond && decision.response) {
yield { type: 'response', data: { message: decision.response } };
break;
}
// Execute tools
for (const call of decision.tool_calls) {
yield { type: 'tool_start', data: { tool: call.tool, params: call.params } };
const result = await executeToolByName(call.tool, call.params, context);
yield { type: 'tool_result', data: { tool: call.tool, result } };
}
yield { type: 'cost_update', data: { totalCost: state.totalCost } };
state.passCount++;
}
yield { type: 'done', data: { passes: state.passCount, totalCost: state.totalCost } };
}Wiring to Server-Sent Events
The API route consumes the async generator and converts each yielded event into an SSE message. The browser receives a stream of data: {...}\n\n lines and processes them as they arrive.
API route: generator → SSE stream
// app/api/workers/[employeeId]/route.ts
// Route handler (Next.js App Router: export this as the named HTTP method)
async function handleWorkerRequest(request: Request, { params }: { params: { employeeId: string } }) {
const { message, chatHistory, stream } = await request.json();
const worker = getWorkerById(params.employeeId);
if (!stream) {
// Non-streaming mode: run to completion and return JSON
const result = await runWorker(worker, message, chatHistory, orgId, userId);
return Response.json({ result });
}
// Streaming mode: wrap generator in ReadableStream
const readable = new ReadableStream({
async start(controller) {
const encoder = new TextEncoder();
try {
// Consume the async generator
for await (const event of streamWorker(worker, message, chatHistory, orgId, userId)) {
// Format as SSE: "data: {...}\n\n"
const sseMessage = `data: ${JSON.stringify(event)}\n\n`;
controller.enqueue(encoder.encode(sseMessage));
}
} catch (error) {
const errorEvent = { type: 'error', data: { message: 'An error occurred' } };
controller.enqueue(encoder.encode(`data: ${JSON.stringify(errorEvent)}\n\n`));
} finally {
controller.close();
}
}
});
return new Response(readable, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
}
});
}Browser: consuming the SSE stream
// Frontend: consuming the SSE stream
async function sendMessage(message: string) {
const response = await fetch('/api/workers/record-analyst', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message, chatHistory, stream: true }),
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n').filter(line => line.startsWith('data: '));
for (const line of lines) {
const event = JSON.parse(line.slice(6)); // Remove "data: " prefix
handleEvent(event); // Dispatch to UI state
}
}
}
function handleEvent(event: WorkerEvent) {
switch (event.type) {
case 'thinking':
setThinkingText(prev => prev + event.data.thinking);
break;
case 'tool_start':
addToolActivity({ tool: event.data.tool, status: 'running' });
break;
case 'tool_result':
updateToolActivity(event.data.tool, { status: 'done', summary: event.data.result.summary });
break;
case 'response':
setFinalResponse(event.data.message);
break;
case 'approval':
showApprovalCard(event.data.approvals);
break;
case 'done':
setSessionComplete({ passes: event.data.passes, cost: event.data.totalCost });
break;
}
}All Event Types
The engine emits 12 event types. Each has a type string and a data payload. Here is the complete reference:
| Event Type | Triggered When | UI Behavior |
|---|---|---|
pre_enrichment | Before loop starts | Shows "Gathering context..." banner with tool count |
status | Each pass start | Updates "Pass N: analyzing..." status indicator |
thinking | After AI decision parsed | Accumulates into collapsible "View reasoning" panel |
tool_start | Before each tool executes | Shows tool name as "running" in activity panel |
tool_result | After each tool completes | Updates tool to "done" or "failed" with summary text |
document_update | After living doc update | Updates collapsible loop state panel with new sections |
cost_update | After tool execution batch | Updates cost display ($0.04 / $1.50) |
approval | Write tool requested | Shows approval card with tool name, params, and Approve button |
response | Final answer ready | Renders Markdown response in the chat window |
done | Loop exits | Shows exit reason + final cost + pass count |
error | Any unhandled error | Shows error message with retry option |
debug | Throughout (dev only) | Shown in developer debug panel only, hidden in production |
The Approval Flow — Pausing and Resuming
When the AI requests a write tool, the loop pauses and emits an approval event. The UI shows an approval card. When the user clicks "Approve," the frontend sends the approval IDs back to the API — which executes the approved tools directly, bypassing the loop entirely.
Approval flow — pause and resume
// 1. Engine detects write tool → pauses loop
if (tool.permissionLevel === 'write' || tool.permissionLevel === 'destructive') {
state.pendingApprovals.push({
id: generateId(),
tool: call.tool,
params: call.params,
description: `Update record: ${call.params.recordId}`,
});
}
// 2. Exit condition fires: pendingApprovals.length > 0
yield {
type: 'approval',
data: { approvals: state.pendingApprovals }
};
yield { type: 'done', data: { reason: 'approval_needed' } };
// Stream closes. User sees approval card.
// 3. User clicks "Approve" → frontend sends:
// POST /api/workers/record-analyst
// Body: { approvals: ['approval-id-1', 'approval-id-2'] }
// 4. API route detects approvals in body → bypass loop
if (body.approvals?.length > 0) {
const results = await executeApprovedWorkerTools(worker, body.approvals, orgId, userId);
return Response.json({ results });
// No loop. No AI decision. Just execute the pre-approved tools.
}