Skip to content

Commit

Permalink
fix(stream-text): ensure SSE handling conforms to the spec
Browse files Browse the repository at this point in the history
  • Loading branch information
iseki0 committed Jan 18, 2025
1 parent 5c73eb3 commit 3b43f4c
Showing 1 changed file with 38 additions and 18 deletions.
56 changes: 38 additions & 18 deletions packages/stream-text/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ export interface StreamTextResult {
usage?: Usage
}

const chunkHeaderPrefix = 'data: '
const chunkErrorPrefix = `{"error":`
const chunkHeaderPrefix = 'data:'

/**
* @experimental WIP, does not support function calling (tools).
Expand All @@ -64,30 +63,51 @@ export const streamText = async (options: StreamTextOptions): Promise<StreamText
const rawChunkStream = res.body!.pipeThrough(new TransformStream({
transform: async (chunk, controller) => {
buffer += decoder.decode(chunk, { stream: true })
const lines = buffer.split('\n\n')
buffer = lines.pop() || ''

for (const line of lines) {
// Some cases:
// - Empty chunk
// - :ROUTER PROCESSING from OpenRouter
if (!line || !line.startsWith(chunkHeaderPrefix)) {
continue
const events = buffer.split('\n\n')
// Since the stream must ended with '\n\n', so it's safe to pop the last "event"(it should be empty)
buffer = events.pop() || ''

for (const event of events) {
// Handle multi-line data
const lines = event.split('\n')
const dataLines: string[] = []

for (const line of lines) {
// Skip empty lines
if (!line)
continue
// Check for comments
if (line.startsWith(':'))
continue
// Check if line starts with data: (allowing spaces)
if (line.startsWith(chunkHeaderPrefix)) {
// Extract content after data: (remove leading single space if present)
const content = line.slice(chunkHeaderPrefix.length)
dataLines.push(content.length > 0 && content[0] === ' ' ? content.slice(1) : content)
}
else {
break // event ended, dispatch
}
}

const lineWithoutPrefix = line.slice(chunkHeaderPrefix.length)
if (lineWithoutPrefix.startsWith(chunkErrorPrefix)) {
// About controller error: https://developer.mozilla.org/en-US/docs/Web/API/TransformStreamDefaultController/error
controller.error(new Error(`Error from server: ${lineWithoutPrefix}`))
// Skip this event if no data lines found
if (dataLines.length === 0)
continue

const data = dataLines.join('\n')

if (data === '[DONE]') {
controller.terminate()
break
}

if (lineWithoutPrefix === '[DONE]') {
controller.terminate()
// Maybe we should use `JSON.parse` to check if it's a valid JSON
if (data.startsWith('{') && data.includes('"error"')) {
controller.error(new Error(`Error from server: ${data}`))
break
}

const chunk: ChunkResult = JSON.parse(lineWithoutPrefix)
const chunk: ChunkResult = JSON.parse(data)
controller.enqueue(chunk)

if (options.onChunk)
Expand Down

0 comments on commit 3b43f4c

Please sign in to comment.