Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add data stream support #361

Merged
merged 38 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
7a52b8b
Add data stream receiving
lukasIO Dec 17, 2024
e50cfa1
updat lock file
lukasIO Dec 17, 2024
ee17e31
pass participant identity
lukasIO Dec 20, 2024
c97a793
streams in both directions
lukasIO Dec 20, 2024
84c0c21
add optional override chunk id
lukasIO Dec 20, 2024
2f6a75a
remove test fallback
lukasIO Dec 20, 2024
71c48d3
Add file send API
lukasIO Dec 22, 2024
ddcec0e
add docs
lukasIO Dec 22, 2024
a1d607a
update README
lukasIO Dec 22, 2024
6a35c9f
remove todos
lukasIO Dec 22, 2024
4eee6a0
remove totalChunks
lukasIO Dec 22, 2024
957b941
update ffi protocol
lukasIO Dec 22, 2024
885eb2c
update rust
lukasIO Jan 8, 2025
0fd1316
update proto
lukasIO Jan 8, 2025
83acae2
Merge branch 'main' into lukas/data-stream
lukasIO Jan 8, 2025
a42c9e3
reuse
lukasIO Jan 8, 2025
02e5839
nicer stream API
lukasIO Jan 10, 2025
cd2a07f
include info in streamwriter
lukasIO Jan 10, 2025
3760994
allow to pass messageId
lukasIO Jan 15, 2025
b0b8063
fix remaining bugs
lukasIO Jan 17, 2025
ecc07ba
update rust sdk
lukasIO Jan 17, 2025
b516f34
Merge branch 'main' into lukas/data-stream
lukasIO Jan 17, 2025
c5faaf6
Create many-suns-smoke.md
lukasIO Jan 17, 2025
643673c
add sendText method
lukasIO Jan 17, 2025
6391feb
Merge branch 'lukas/data-stream' of github.com:livekit/node-sdks into…
lukasIO Jan 17, 2025
fd81d7b
update naming
lukasIO Jan 22, 2025
074a3aa
add streamBytes method
lukasIO Jan 22, 2025
eab49e4
stream callbacks
lukasIO Jan 22, 2025
edbb680
better example
lukasIO Jan 22, 2025
7bd1f90
update example for interop
lukasIO Jan 22, 2025
8d1afe5
add lfs for assets
lukasIO Jan 22, 2025
1bbcfc8
update example asset
lukasIO Jan 22, 2025
0fdc32b
make CI happy
lukasIO Jan 22, 2025
fee3777
point rust ffi to 0.12.8
lukasIO Jan 23, 2025
2918189
explicit remove handlers
lukasIO Jan 23, 2025
899d92e
make example write to file
lukasIO Jan 27, 2025
fc26f36
Merge branch 'main' into lukas/data-stream
lukasIO Jan 27, 2025
1776863
add extension in file name
lukasIO Jan 27, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions examples/data-streams/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# 1. Copy this file and rename it to .env
# 2. Update the enviroment variables below.

LIVEKIT_API_KEY=mykey
LIVEKIT_API_SECRET=mysecret
LIVEKIT_URL=wss://myproject.livekit.cloud
36 changes: 36 additions & 0 deletions examples/data-streams/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Data Streams Example

This example demonstrates how to use DataStreams to stream and receive both text and files from other LiveKit participants.

## Prerequisites

Before running this example, make sure you have:

1. Node.js installed on your machine.
2. A LiveKit server running (either locally or remotely).
3. LiveKit API key and secret.

## Setup

1. Install dependencies:

```
pnpm install
```

2. Create a `.env.local` file in the example directory with your LiveKit credentials:
```
LIVEKIT_API_KEY=your_api_key
LIVEKIT_API_SECRET=your_api_secret
LIVEKIT_URL=your_livekit_url
```

## Running the Example

To run the example, use the following command:

```
pnpm run start
```

The example will log to your terminal.
77 changes: 77 additions & 0 deletions examples/data-streams/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import { RemoteParticipant, Room, RoomEvent, type TextStreamReader } from '@livekit/rtc-node';
import { config } from 'dotenv';
import { AccessToken } from 'livekit-server-sdk';

config({ path: '.env.local', override: false });
const LIVEKIT_API_KEY = process.env.LIVEKIT_API_KEY;
const LIVEKIT_API_SECRET = process.env.LIVEKIT_API_SECRET;
const LIVEKIT_URL = process.env.LIVEKIT_URL;
if (!LIVEKIT_API_KEY || !LIVEKIT_API_SECRET || !LIVEKIT_URL) {
throw new Error('Missing required environment variables. Please check your .env.local file.');
}

const greetParticipant = async (room: Room, recipient: RemoteParticipant) => {
const greeting = 'Hi this is just a text sample';
const streamWriter = await room.localParticipant?.streamText({
destinationIdentities: [recipient.identity],
});

for (const c of greeting) {
await streamWriter?.write([c]);
}

streamWriter?.close();
};

const sendFile = async (room: Room, recipient: RemoteParticipant) => {
console.log('sending file');
await room.localParticipant?.sendFile('./assets/maybemexico.png', {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this file doesn't exist, but when i added one and ran the example i got a rust panic

sending file
thread '<unnamed>' panicked at src/nodejs.rs:55:13:
failed to handle request: invalid request: message is empty
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
fatal runtime error: failed to initiate panic, error 5

destinationIdentities: [recipient.identity],
fileName: 'mex',
mimeType: 'image/png',
});
lukasIO marked this conversation as resolved.
Show resolved Hide resolved
console.log('done sending file');
};

const main = async () => {
const roomName = `dev`;
const identity = 'tester';
const token = await createToken(identity, roomName);

const room = new Room();

const finishedPromise = new Promise((resolve) => {
room.on(RoomEvent.ParticipantDisconnected, resolve);
});

room.on(RoomEvent.TextStreamReceived, async (reader: TextStreamReader) => {
lukasIO marked this conversation as resolved.
Show resolved Hide resolved
console.log(await reader.readAll());
// for await (const { collected } of reader) {
// console.log(collected);
// }
});
lukasIO marked this conversation as resolved.
Show resolved Hide resolved

room.on(RoomEvent.ParticipantConnected, async (participant) => {
await sendFile(room, participant);
await greetParticipant(room, participant);
});

await room.connect(LIVEKIT_URL, token);

await finishedPromise;
};

const createToken = async (identity: string, roomName: string) => {
const token = new AccessToken(LIVEKIT_API_KEY, LIVEKIT_API_SECRET, {
identity,
});
token.addGrant({
room: roomName,
roomJoin: true,
canPublish: true,
canSubscribe: true,
});
return await token.toJwt();
};

main();
23 changes: 23 additions & 0 deletions examples/data-streams/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"name": "example-data-streams",
"author": "LiveKit",
"private": "true",
"description": "Example of using data streams in LiveKit",
"type": "module",
"main": "index.ts",
"scripts": {
"lint": "eslint -f unix \"**/*.ts\"",
"start": "tsx index.ts"
},
"keywords": [],
"license": "Apache-2.0",
"dependencies": {
"@livekit/rtc-node": "workspace:*",
"dotenv": "^16.4.5",
"livekit-server-sdk": "workspace:*"
},
"devDependencies": {
"@types/node": "^20.10.4",
"tsx": "^4.7.1"
}
}
3 changes: 2 additions & 1 deletion packages/livekit-rtc/generate_proto.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ PATH=$PATH:$(pwd)/node_modules/.bin \
$FFI_PROTOCOL/video_frame.proto \
$FFI_PROTOCOL/e2ee.proto \
$FFI_PROTOCOL/stats.proto \
$FFI_PROTOCOL/rpc.proto
$FFI_PROTOCOL/rpc.proto \
$FFI_PROTOCOL/track_publication.proto
5 changes: 4 additions & 1 deletion packages/livekit-rtc/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@
"dependencies": {
"@bufbuild/protobuf": "^1.10.0",
"@livekit/mutex": "^1.0.0",
"@livekit/typed-emitter": "^3.0.0"
"@livekit/protocol": "^1.29.4",
"@livekit/typed-emitter": "^3.0.0",
"pino": "^8.19.0",
"pino-pretty": "^11.0.0"
},
"devDependencies": {
"@napi-rs/cli": "^2.18.0",
Expand Down
7 changes: 7 additions & 0 deletions packages/livekit-rtc/src/data_streams/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0

export * from './stream_reader.js';
export * from './stream_writer.js';
export type * from './types.js';
166 changes: 166 additions & 0 deletions packages/livekit-rtc/src/data_streams/stream_reader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import type { DataStream_Chunk } from '@livekit/protocol';
import type { ReadableStream } from 'node:stream/web';
import { log } from '../log.js';
import { bigIntToNumber } from '../utils.js';
import type { BaseStreamInfo, FileStreamInfo, TextStreamChunk, TextStreamInfo } from './types.js';

abstract class BaseStreamReader<T extends BaseStreamInfo> {
protected reader: ReadableStream<DataStream_Chunk>;

protected totalByteSize?: number;

protected _info: T;

protected bytesReceived: number;

get info() {
return this._info;
}

constructor(info: T, stream: ReadableStream<DataStream_Chunk>, totalByteSize?: number) {
this.reader = stream;
this.totalByteSize = totalByteSize;
this._info = info;
this.bytesReceived = 0;
}

protected abstract handleChunkReceived(chunk: DataStream_Chunk): void;

onProgress?: (progress: number | undefined) => void;

abstract readAll(): Promise<string | Array<Uint8Array>>;
}

/**
* A class to read chunks from a ReadableStream and provide them in a structured format.
*/
export class BinaryStreamReader extends BaseStreamReader<FileStreamInfo> {
protected handleChunkReceived(chunk: DataStream_Chunk) {
this.bytesReceived += chunk.content.byteLength;
const currentProgress = this.totalByteSize
? this.bytesReceived / this.totalByteSize
: undefined;
this.onProgress?.(currentProgress);
}

[Symbol.asyncIterator]() {
const reader = this.reader.getReader();

return {
next: async (): Promise<IteratorResult<Uint8Array>> => {
try {
const { done, value } = await reader.read();
if (done) {
return { done: true, value: undefined as any };
} else {
this.handleChunkReceived(value);
return { done: false, value: value.content };
}
} catch (error) {
log.error('error processing stream update', error);
return { done: true, value: undefined };
}
},

return(): IteratorResult<Uint8Array> {
reader.releaseLock();
return { done: true, value: undefined };
},
};
}

async readAll(): Promise<Array<Uint8Array>> {
const chunks: Set<Uint8Array> = new Set();
for await (const chunk of this) {
chunks.add(chunk);
}
return Array.from(chunks);
}
}

/**
* A class to read chunks from a ReadableStream and provide them in a structured format.
*/
export class TextStreamReader extends BaseStreamReader<TextStreamInfo> {
private receivedChunks: Map<number, DataStream_Chunk>;

/**
* A TextStreamReader instance can be used as an AsyncIterator that returns the entire string
* that has been received up to the current point in time.
*/
constructor(
info: TextStreamInfo,
stream: ReadableStream<DataStream_Chunk>,
totalChunkCount?: number,
) {
super(info, stream, totalChunkCount);
this.receivedChunks = new Map();
}

protected handleChunkReceived(chunk: DataStream_Chunk) {
const index = bigIntToNumber(chunk.chunkIndex);
const previousChunkAtIndex = this.receivedChunks.get(index);
if (previousChunkAtIndex && previousChunkAtIndex.version > chunk.version) {
// we have a newer version already, dropping the old one
return;
}
this.receivedChunks.set(index, chunk);
const currentProgress = this.totalByteSize
? this.receivedChunks.size / this.totalByteSize
: undefined;
this.onProgress?.(currentProgress);
}

/**
* Async iterator implementation to allow usage of `for await...of` syntax.
* Yields structured chunks from the stream.
*
*/
[Symbol.asyncIterator]() {
const reader = this.reader.getReader();
const decoder = new TextDecoder();

return {
next: async (): Promise<IteratorResult<TextStreamChunk>> => {
try {
const { done, value } = await reader.read();
if (done) {
return { done: true, value: undefined };
} else {
this.handleChunkReceived(value);
return {
done: false,
value: {
index: bigIntToNumber(value.chunkIndex),
current: decoder.decode(value.content),
collected: Array.from(this.receivedChunks.values())
.sort((a, b) => bigIntToNumber(a.chunkIndex) - bigIntToNumber(b.chunkIndex))
.map((chunk) => decoder.decode(chunk.content))
.join(''),
},
};
}
} catch (error) {
log.error('error processing stream update', error);
return { done: true, value: undefined };
}
},

return(): IteratorResult<TextStreamChunk> {
reader.releaseLock();
return { done: true, value: undefined };
},
};
}

async readAll(): Promise<string> {
let latestString: string = '';
for await (const { collected } of this) {
latestString = collected;
}
return latestString;
}
}
32 changes: 32 additions & 0 deletions packages/livekit-rtc/src/data_streams/stream_writer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import type { WritableStream } from 'node:stream/web';

class BaseStreamWriter<T> {
protected writableStream: WritableStream<T>;

protected defaultWriter: WritableStreamDefaultWriter<T>;

protected onClose?: () => void;

constructor(writableStream: WritableStream<T>, onClose?: () => void) {
this.writableStream = writableStream;
this.defaultWriter = writableStream.getWriter();
this.onClose = onClose;
}

write(chunk: T): Promise<void> {
return this.defaultWriter.write(chunk);
}
lukasIO marked this conversation as resolved.
Show resolved Hide resolved

async close() {
await this.defaultWriter.close();
this.defaultWriter.releaseLock();
this.onClose?.();
}
}

export class TextStreamWriter extends BaseStreamWriter<[string, number?]> {}

export class BinaryStreamWriter extends BaseStreamWriter<Uint8Array> {}
Loading
Loading