Skip to content

Commit

Permalink
Fix subscription timing
Browse files Browse the repository at this point in the history
Agent fails to subscribe to user track if it was connect to room before agent.
  • Loading branch information
davidzhao committed Oct 14, 2024
1 parent 6d46ee5 commit fb1aa9f
Showing 1 changed file with 63 additions and 42 deletions.
105 changes: 63 additions & 42 deletions agents/src/multimodal/multimodal_agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import type {
LocalTrackPublication,
RemoteAudioTrack,
RemoteParticipant,
RemoteTrack,
RemoteTrackPublication,
Room,
} from '@livekit/rtc-node';
import {
Expand Down Expand Up @@ -135,12 +137,17 @@ export class MultimodalAgent extends EventEmitter {
this.#updateState();

room.on(RoomEvent.ParticipantConnected, (participant: RemoteParticipant) => {
if (!this.linkedParticipant) {
// automatically link to the first participant that connects, if not already linked
if (this.linkedParticipant) {
return;
}

this.#linkParticipant(participant.identity);
});
room.on(RoomEvent.TrackPublished, () => {
// in case we are connected before the participant has published, we'd need to re-subscribe
this.#subscribeToMicrophone();
});
room.on(RoomEvent.TrackSubscribed, this.#handleTrackSubscription.bind(this));

this.room = room;
this.#participant = participant;
Expand Down Expand Up @@ -297,14 +304,50 @@ export class MultimodalAgent extends EventEmitter {

if (this.linkedParticipant.trackPublications.size > 0) {
this.#subscribeToMicrophone();
} else {
this.room.on(RoomEvent.TrackPublished, () => {
this.#subscribeToMicrophone();
});
}

// also check if already subscribed
for (const publication of this.linkedParticipant.trackPublications.values()) {
if (publication.source === TrackSource.SOURCE_MICROPHONE && publication.track) {
this.#handleTrackSubscription(publication.track, publication, this.linkedParticipant);
break;
}
}
}

#subscribeToMicrophone(): void {
if (!this.linkedParticipant) {
this.#logger.error('Participant is not set');
return;
}

let microphonePublication: RemoteTrackPublication | undefined = undefined;
for (const publication of this.linkedParticipant.trackPublications.values()) {
if (publication.source === TrackSource.SOURCE_MICROPHONE) {
microphonePublication = publication;
break;
}
}
if (!microphonePublication) {
return;
}

if (!microphonePublication.subscribed) {
microphonePublication.setSubscribed(true);
}
}

#handleTrackSubscription(
track: RemoteTrack,
publication: RemoteTrackPublication,
participant: RemoteParticipant,
) {
if (
publication.source !== TrackSource.SOURCE_MICROPHONE ||
participant.identity !== this.linkedParticipant?.identity
) {
return;
}
const readAudioStreamTask = async (audioStream: AudioStream) => {
const bstream = new AudioByteStream(
this.model.sampleRate,
Expand All @@ -319,46 +362,24 @@ export class MultimodalAgent extends EventEmitter {
}
}
};
this.subscribedTrack = track;

if (!this.linkedParticipant) {
this.#logger.error('Participant is not set');
return;
if (this.readMicroTask) {
this.readMicroTask.cancel();
}

for (const publication of this.linkedParticipant.trackPublications.values()) {
if (publication.source !== TrackSource.SOURCE_MICROPHONE) {
continue;
}

if (!publication.subscribed) {
publication.setSubscribed(true);
}

const track = publication.track;

if (track && track !== this.subscribedTrack) {
this.subscribedTrack = track;

if (this.readMicroTask) {
this.readMicroTask.cancel();
}

let cancel: () => void;
this.readMicroTask = {
promise: new Promise<void>((resolve, reject) => {
cancel = () => {
reject(new Error('Task cancelled'));
};
readAudioStreamTask(
new AudioStream(track, this.model.sampleRate, this.model.numChannels),
)
.then(resolve)
.catch(reject);
}),
cancel: () => cancel(),
let cancel: () => void;
this.readMicroTask = {
promise: new Promise<void>((resolve, reject) => {
cancel = () => {
reject(new Error('Task cancelled'));
};
}
}
readAudioStreamTask(new AudioStream(track, this.model.sampleRate, this.model.numChannels))
.then(resolve)
.catch(reject);
}),
cancel: () => cancel(),
};
}

#getLocalTrackSid(): string | null {
Expand Down

0 comments on commit fb1aa9f

Please sign in to comment.