-
Notifications
You must be signed in to change notification settings - Fork 71
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add data stream support #361
Merged
+2,154
−108
Merged
Changes from 10 commits
Commits
Show all changes
38 commits
Select commit
Hold shift + click to select a range
7a52b8b
Add data stream receiving
lukasIO e50cfa1
updat lock file
lukasIO ee17e31
pass participant identity
lukasIO c97a793
streams in both directions
lukasIO 84c0c21
add optional override chunk id
lukasIO 2f6a75a
remove test fallback
lukasIO 71c48d3
Add file send API
lukasIO ddcec0e
add docs
lukasIO a1d607a
update README
lukasIO 6a35c9f
remove todos
lukasIO 4eee6a0
remove totalChunks
lukasIO 957b941
update ffi protocol
lukasIO 885eb2c
update rust
lukasIO 0fd1316
update proto
lukasIO 83acae2
Merge branch 'main' into lukas/data-stream
lukasIO a42c9e3
reuse
lukasIO 02e5839
nicer stream API
lukasIO cd2a07f
include info in streamwriter
lukasIO 3760994
allow to pass messageId
lukasIO b0b8063
fix remaining bugs
lukasIO ecc07ba
update rust sdk
lukasIO b516f34
Merge branch 'main' into lukas/data-stream
lukasIO c5faaf6
Create many-suns-smoke.md
lukasIO 643673c
add sendText method
lukasIO 6391feb
Merge branch 'lukas/data-stream' of github.com:livekit/node-sdks into…
lukasIO fd81d7b
update naming
lukasIO 074a3aa
add streamBytes method
lukasIO eab49e4
stream callbacks
lukasIO edbb680
better example
lukasIO 7bd1f90
update example for interop
lukasIO 8d1afe5
add lfs for assets
lukasIO 1bbcfc8
update example asset
lukasIO 0fdc32b
make CI happy
lukasIO fee3777
point rust ffi to 0.12.8
lukasIO 2918189
explicit remove handlers
lukasIO 899d92e
make example write to file
lukasIO fc26f36
Merge branch 'main' into lukas/data-stream
lukasIO 1776863
add extension in file name
lukasIO File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
# 1. Copy this file and rename it to .env | ||
# 2. Update the enviroment variables below. | ||
|
||
LIVEKIT_API_KEY=mykey | ||
LIVEKIT_API_SECRET=mysecret | ||
LIVEKIT_URL=wss://myproject.livekit.cloud |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
# Data Streams Example | ||
|
||
This example demonstrates how to use DataStreams to stream and receive both text and files from other LiveKit participants. | ||
|
||
## Prerequisites | ||
|
||
Before running this example, make sure you have: | ||
|
||
1. Node.js installed on your machine. | ||
2. A LiveKit server running (either locally or remotely). | ||
3. LiveKit API key and secret. | ||
|
||
## Setup | ||
|
||
1. Install dependencies: | ||
|
||
``` | ||
pnpm install | ||
``` | ||
|
||
2. Create a `.env.local` file in the example directory with your LiveKit credentials: | ||
``` | ||
LIVEKIT_API_KEY=your_api_key | ||
LIVEKIT_API_SECRET=your_api_secret | ||
LIVEKIT_URL=your_livekit_url | ||
``` | ||
|
||
## Running the Example | ||
|
||
To run the example, use the following command: | ||
|
||
``` | ||
pnpm run start | ||
``` | ||
|
||
The example will log to your terminal. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
import { RemoteParticipant, Room, RoomEvent, type TextStreamReader } from '@livekit/rtc-node'; | ||
import { config } from 'dotenv'; | ||
import { AccessToken } from 'livekit-server-sdk'; | ||
|
||
config({ path: '.env.local', override: false }); | ||
const LIVEKIT_API_KEY = process.env.LIVEKIT_API_KEY; | ||
const LIVEKIT_API_SECRET = process.env.LIVEKIT_API_SECRET; | ||
const LIVEKIT_URL = process.env.LIVEKIT_URL; | ||
if (!LIVEKIT_API_KEY || !LIVEKIT_API_SECRET || !LIVEKIT_URL) { | ||
throw new Error('Missing required environment variables. Please check your .env.local file.'); | ||
} | ||
|
||
const greetParticipant = async (room: Room, recipient: RemoteParticipant) => { | ||
const greeting = 'Hi this is just a text sample'; | ||
const streamWriter = await room.localParticipant?.streamText({ | ||
destinationIdentities: [recipient.identity], | ||
}); | ||
|
||
for (const c of greeting) { | ||
await streamWriter?.write([c]); | ||
} | ||
|
||
streamWriter?.close(); | ||
}; | ||
|
||
const sendFile = async (room: Room, recipient: RemoteParticipant) => { | ||
console.log('sending file'); | ||
await room.localParticipant?.sendFile('./assets/maybemexico.png', { | ||
destinationIdentities: [recipient.identity], | ||
fileName: 'mex', | ||
mimeType: 'image/png', | ||
}); | ||
lukasIO marked this conversation as resolved.
Show resolved
Hide resolved
|
||
console.log('done sending file'); | ||
}; | ||
|
||
const main = async () => { | ||
const roomName = `dev`; | ||
const identity = 'tester'; | ||
const token = await createToken(identity, roomName); | ||
|
||
const room = new Room(); | ||
|
||
const finishedPromise = new Promise((resolve) => { | ||
room.on(RoomEvent.ParticipantDisconnected, resolve); | ||
}); | ||
|
||
room.on(RoomEvent.TextStreamReceived, async (reader: TextStreamReader) => { | ||
lukasIO marked this conversation as resolved.
Show resolved
Hide resolved
|
||
console.log(await reader.readAll()); | ||
// for await (const { collected } of reader) { | ||
// console.log(collected); | ||
// } | ||
}); | ||
lukasIO marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
room.on(RoomEvent.ParticipantConnected, async (participant) => { | ||
await sendFile(room, participant); | ||
await greetParticipant(room, participant); | ||
}); | ||
|
||
await room.connect(LIVEKIT_URL, token); | ||
|
||
await finishedPromise; | ||
}; | ||
|
||
const createToken = async (identity: string, roomName: string) => { | ||
const token = new AccessToken(LIVEKIT_API_KEY, LIVEKIT_API_SECRET, { | ||
identity, | ||
}); | ||
token.addGrant({ | ||
room: roomName, | ||
roomJoin: true, | ||
canPublish: true, | ||
canSubscribe: true, | ||
}); | ||
return await token.toJwt(); | ||
}; | ||
|
||
main(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
{ | ||
"name": "example-data-streams", | ||
"author": "LiveKit", | ||
"private": "true", | ||
"description": "Example of using data streams in LiveKit", | ||
"type": "module", | ||
"main": "index.ts", | ||
"scripts": { | ||
"lint": "eslint -f unix \"**/*.ts\"", | ||
"start": "tsx index.ts" | ||
}, | ||
"keywords": [], | ||
"license": "Apache-2.0", | ||
"dependencies": { | ||
"@livekit/rtc-node": "workspace:*", | ||
"dotenv": "^16.4.5", | ||
"livekit-server-sdk": "workspace:*" | ||
}, | ||
"devDependencies": { | ||
"@types/node": "^20.10.4", | ||
"tsx": "^4.7.1" | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
export * from './stream_reader.js'; | ||
export * from './stream_writer.js'; | ||
export type * from './types.js'; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,171 @@ | ||
import type { DataStream_Chunk } from '@livekit/protocol'; | ||
import type { ReadableStream } from 'node:stream/web'; | ||
import { log } from '../log.js'; | ||
import { bigIntToNumber } from '../utils.js'; | ||
import type { BaseStreamInfo, FileStreamInfo, TextStreamChunk, TextStreamInfo } from './types.js'; | ||
|
||
abstract class BaseStreamReader<T extends BaseStreamInfo> { | ||
protected reader: ReadableStream<DataStream_Chunk>; | ||
|
||
protected totalChunkCount?: number; | ||
|
||
protected _info: T; | ||
|
||
get info() { | ||
return this._info; | ||
} | ||
|
||
constructor(info: T, stream: ReadableStream<DataStream_Chunk>, totalChunkCount?: number) { | ||
this.reader = stream; | ||
this.totalChunkCount = totalChunkCount; | ||
this._info = info; | ||
} | ||
|
||
protected abstract handleChunkReceived(chunk: DataStream_Chunk): void; | ||
|
||
onProgress?: (progress: number | undefined) => void; | ||
|
||
abstract readAll(): Promise<string | Array<Uint8Array>>; | ||
} | ||
|
||
/** | ||
* A class to read chunks from a ReadableStream and provide them in a structured format. | ||
*/ | ||
export class BinaryStreamReader extends BaseStreamReader<FileStreamInfo> { | ||
private chunksReceived: Set<number>; | ||
|
||
constructor( | ||
info: FileStreamInfo, | ||
stream: ReadableStream<DataStream_Chunk>, | ||
totalChunkCount?: number, | ||
) { | ||
super(info, stream, totalChunkCount); | ||
this.chunksReceived = new Set(); | ||
} | ||
|
||
protected handleChunkReceived(chunk: DataStream_Chunk) { | ||
this.chunksReceived.add(bigIntToNumber(chunk.chunkIndex)); | ||
const currentProgress = this.totalChunkCount | ||
? this.chunksReceived.size / this.totalChunkCount | ||
: undefined; | ||
this.onProgress?.(currentProgress); | ||
} | ||
|
||
[Symbol.asyncIterator]() { | ||
const reader = this.reader.getReader(); | ||
|
||
return { | ||
next: async (): Promise<IteratorResult<Uint8Array>> => { | ||
try { | ||
const { done, value } = await reader.read(); | ||
if (done) { | ||
return { done: true, value: undefined as any }; | ||
} else { | ||
this.handleChunkReceived(value); | ||
return { done: false, value: value.content }; | ||
} | ||
} catch (error) { | ||
log.error('error processing stream update', error); | ||
return { done: true, value: undefined }; | ||
} | ||
}, | ||
|
||
return(): IteratorResult<Uint8Array> { | ||
reader.releaseLock(); | ||
return { done: true, value: undefined }; | ||
}, | ||
}; | ||
} | ||
|
||
async readAll(): Promise<Array<Uint8Array>> { | ||
const chunks: Set<Uint8Array> = new Set(); | ||
for await (const chunk of this) { | ||
chunks.add(chunk); | ||
} | ||
return Array.from(chunks); | ||
} | ||
} | ||
|
||
/** | ||
* A class to read chunks from a ReadableStream and provide them in a structured format. | ||
*/ | ||
export class TextStreamReader extends BaseStreamReader<TextStreamInfo> { | ||
private receivedChunks: Map<number, DataStream_Chunk>; | ||
|
||
/** | ||
* A TextStreamReader instance can be used as an AsyncIterator that returns the entire string | ||
* that has been received up to the current point in time. | ||
*/ | ||
constructor( | ||
info: TextStreamInfo, | ||
stream: ReadableStream<DataStream_Chunk>, | ||
totalChunkCount?: number, | ||
) { | ||
super(info, stream, totalChunkCount); | ||
this.receivedChunks = new Map(); | ||
} | ||
|
||
protected handleChunkReceived(chunk: DataStream_Chunk) { | ||
const index = bigIntToNumber(chunk.chunkIndex); | ||
const previousChunkAtIndex = this.receivedChunks.get(index); | ||
if (previousChunkAtIndex && previousChunkAtIndex.version > chunk.version) { | ||
// we have a newer version already, dropping the old one | ||
return; | ||
} | ||
this.receivedChunks.set(index, chunk); | ||
const currentProgress = this.totalChunkCount | ||
? this.receivedChunks.size / this.totalChunkCount | ||
: undefined; | ||
this.onProgress?.(currentProgress); | ||
} | ||
|
||
/** | ||
* Async iterator implementation to allow usage of `for await...of` syntax. | ||
* Yields structured chunks from the stream. | ||
* | ||
*/ | ||
[Symbol.asyncIterator]() { | ||
const reader = this.reader.getReader(); | ||
const decoder = new TextDecoder(); | ||
|
||
return { | ||
next: async (): Promise<IteratorResult<TextStreamChunk>> => { | ||
try { | ||
const { done, value } = await reader.read(); | ||
if (done) { | ||
return { done: true, value: undefined }; | ||
} else { | ||
this.handleChunkReceived(value); | ||
return { | ||
done: false, | ||
value: { | ||
index: bigIntToNumber(value.chunkIndex), | ||
current: decoder.decode(value.content), | ||
collected: Array.from(this.receivedChunks.values()) | ||
.sort((a, b) => bigIntToNumber(a.chunkIndex) - bigIntToNumber(b.chunkIndex)) | ||
.map((chunk) => decoder.decode(chunk.content)) | ||
.join(''), | ||
}, | ||
}; | ||
} | ||
} catch (error) { | ||
log.error('error processing stream update', error); | ||
return { done: true, value: undefined }; | ||
} | ||
}, | ||
|
||
return(): IteratorResult<TextStreamChunk> { | ||
reader.releaseLock(); | ||
return { done: true, value: undefined }; | ||
}, | ||
}; | ||
} | ||
|
||
async readAll(): Promise<string> { | ||
let latestString: string = ''; | ||
for await (const { collected } of this) { | ||
latestString = collected; | ||
} | ||
return latestString; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
import type { WritableStream } from 'node:stream/web'; | ||
|
||
class BaseStreamWriter<T> { | ||
protected writableStream: WritableStream<T>; | ||
|
||
protected defaultWriter: WritableStreamDefaultWriter<T>; | ||
|
||
protected onClose?: () => void; | ||
|
||
constructor(writableStream: WritableStream<T>, onClose?: () => void) { | ||
this.writableStream = writableStream; | ||
this.defaultWriter = writableStream.getWriter(); | ||
this.onClose = onClose; | ||
} | ||
|
||
write(chunk: T): Promise<void> { | ||
return this.defaultWriter.write(chunk); | ||
} | ||
lukasIO marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
async close() { | ||
await this.defaultWriter.close(); | ||
this.defaultWriter.releaseLock(); | ||
this.onClose?.(); | ||
} | ||
} | ||
|
||
export class TextStreamWriter extends BaseStreamWriter<[string, number?]> {} | ||
|
||
export class BinaryStreamWriter extends BaseStreamWriter<Uint8Array> {} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this file doesn't exist, but when i added one and ran the example i got a rust panic