Skip to content

Commit

Permalink
fix: tidy up and revise to pass tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Nfrederiksen committed Oct 2, 2024
1 parent d7763f7 commit 297e204
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 37 deletions.
62 changes: 38 additions & 24 deletions engine/session.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class Session {
//this.currentVod;
this.currentMetadata = {};
this._events = [];
this.averageSegmentDuration = config.averageSegmentDuration;
this.averageSegmentDuration = null;
this.use_demuxed_audio = false;
this.use_vtt_subtitles = false;
this.dummySubtitleEndpoint = "";
Expand Down Expand Up @@ -616,6 +616,7 @@ class Session {
}
const currentVod = await this._sessionState.getCurrentVod();
if (currentVod) {
let countsData;
try {
let mediaSequenceValue = 0;
if (currentVod.sequenceAlwaysContainNewSegments) {
Expand All @@ -625,6 +626,14 @@ class Session {
mediaSequenceValue = playheadState.vodMediaSeqVideo;
}
const discSeqCount = discSeqOffset + currentVod.discontinuities[playheadState.vodMediaSeqVideo];
debug(`[${this._sessionId}]: MediaSeq: (${playheadState.mediaSeq}+${mediaSequenceValue}=${(playheadState.mediaSeq + mediaSequenceValue)}) and DiscSeq: (${discSeqCount}) requested `);

countsData = {
'mediaSeq': (playheadState.mediaSeq + mediaSequenceValue),
'discSeq': discSeqCount,
'vodMediaSeqVideo': playheadState.vodMediaSeqVideo,
};

let discSeqCountAudio;
let audioSequenceValue;
if (this.use_demuxed_audio) {
Expand All @@ -635,17 +644,12 @@ class Session {
} else {
audioSequenceValue = playheadState.vodMediaSeqAudio;
}
debug(`[${this._sessionId}]: AudioSeq: (${playheadState.mediaSeqAudio}+${audioSequenceValue}=${(playheadState.mediaSeqAudio + audioSequenceValue)}) and DiscSeq: (${discSeqCountAudio}) requested `);
countsData['audioSeq'] = (playheadState.mediaSeqAudio + audioSequenceValue);
countsData['discSeqAudio'] = discSeqCountAudio;
countsData['vodMediaSeqAudio'] = playheadState.vodMediaSeqAudio;
}
debug(`[${this._sessionId}]: MediaSeq: (${playheadState.mediaSeq}+${mediaSequenceValue}=${(playheadState.mediaSeq + mediaSequenceValue)}) and DiscSeq: (${discSeqCount}) requested `);
debug(`[${this._sessionId}]: AudioSeq: (${playheadState.mediaSeqAudio}+${audioSequenceValue}=${(playheadState.mediaSeqAudio + audioSequenceValue)}) and DiscSeq: (${discSeqCountAudio}) requested `);
return {
'mediaSeq': (playheadState.mediaSeq + mediaSequenceValue),
'discSeq': discSeqCount,
'vodMediaSeqVideo': playheadState.vodMediaSeqVideo,
'vodMediaSeqAudio': playheadState.vodMediaSeqAudio,
'audioSeq': playheadState.mediaSeqAudio + audioSequenceValue,
'discSeqAudio': discSeqCountAudio,
};
return countsData;
} catch (err) {
logerror(this._sessionId, err);
await this._sessionState.clearCurrentVodCache(); // force reading up from shared store
Expand Down Expand Up @@ -1638,12 +1642,15 @@ class Session {
await this._sessionState.set("mediaSeq", mseqV);
await this._playheadState.set("mediaSeq", mseqV, isLeader);
await this._sessionState.set("discSeq", discSeqV);
await this._sessionState.set("mediaSeqAudio", mseqA);
await this._playheadState.set("mediaSeqAudio", mseqA, isLeader);
await this._sessionState.set("discSeqAudio", discSeqA);
// TODO: support demux^
debug(`[${this._sessionId}]: Setting current media and discontinuity count -> [${mseqV}]:[${discSeqV}]`);
debug(`[${this._sessionId}]: Setting current audio media and discontinuity count -> [${mseqA}]:[${discSeqA}]`);

if (this.use_demuxed_audio) {
await this._sessionState.set("mediaSeqAudio", mseqA);
await this._playheadState.set("mediaSeqAudio", mseqA, isLeader);
await this._sessionState.set("discSeqAudio", discSeqA);
debug(`[${this._sessionId}]: Setting current audio media and discontinuity count -> [${mseqA}]:[${discSeqA}]`);
}
// 3) Set new media segments/currentVod, to carry on the continuity from session-live
debug(`[${this._sessionId}]: LEADER: making changes to current VOD. I will also update currentVod in store.`);
const playheadState = await this._playheadState.getValues(["vodMediaSeqVideo"]);
Expand All @@ -1652,10 +1659,12 @@ class Session {
nextMseq = currentVod.getLiveMediaSequencesCount() - 1;
}

const playheadStateAudio = await this._playheadState.getValues(["vodMediaSeqAudio"]);
let nextAudioMseq = playheadStateAudio.vodMediaSeqAudio + 1;
if (nextAudioMseq > currentVod.getLiveMediaSequencesCount("audio") - 1) {
nextAudioMseq = currentVod.getLiveMediaSequencesCount("audio") - 1;
if (this.use_demuxed_audio) {
const playheadStateAudio = await this._playheadState.getValues(["vodMediaSeqAudio"]);
let nextAudioMseq = playheadStateAudio.vodMediaSeqAudio + 1;
if (nextAudioMseq > currentVod.getLiveMediaSequencesCount("audio") - 1) {
nextAudioMseq = currentVod.getLiveMediaSequencesCount("audio") - 1;
}
}

// ---------------------------------------------------.
Expand All @@ -1666,18 +1675,23 @@ class Session {
await this._sessionState.setCurrentVod(currentVod, { ttl: currentVod.getDuration() * 1000 });
await this._sessionState.set("vodReloaded", 1);
await this._sessionState.set("vodMediaSeqVideo", 0);
await this._sessionState.set("vodMediaSeqAudio", 0);
await this._sessionState.set("vodMediaSeqSubtitle", 0);
await this._playheadState.set("vodMediaSeqVideo", 0, isLeader);
await this._playheadState.set("vodMediaSeqAudio", 0, isLeader);
await this._playheadState.set("vodMediaSeqSubtitle", 0, isLeader);
if (this.use_demuxed_audio) {
await this._sessionState.set("vodMediaSeqAudio", 0);
await this._playheadState.set("vodMediaSeqAudio", 0, isLeader);
await this._sessionState.set("vodMediaSeqSubtitle", 0);
await this._playheadState.set("vodMediaSeqSubtitle", 0, isLeader);
}
this.currentPlayheadRef = await this._playheadState.set("playheadRef", Date.now(), isLeader);
// 4) Log to debug and cloudwatch
debug(`[${this._sessionId}]: LEADER: Set new Reloaded VOD and vodMediaSeq counts in store.`);
debug(`[${this._sessionId}]: next VOD Reloaded (${currentVod.getDeltaTimes()})`);
debug(`[${this._sessionId}]: ${currentVod.getPlayheadPositions()}`);
debug(`[${this._sessionId}]: msequences=${currentVod.getLiveMediaSequencesCount()}`);
debug(`[${this._sessionId}]: audio msequences=${currentVod.getLiveMediaSequencesCount("audio")}`);
if (this.use_demuxed_audio) {
debug(`[${this._sessionId}]: audio msequences=${currentVod.getLiveMediaSequencesCount("audio")}`);
debug(`[${this._sessionId}]: subtitle msequences=${currentVod.getLiveMediaSequencesCount("subtitle")}`);
}
cloudWatchLog(!this.cloudWatchLogging, "engine-session", { event: "switchback", channel: this._sessionId, reqTimeMs: Date.now() - startTS });
return;
} else {
Expand Down
18 changes: 10 additions & 8 deletions engine/session_live.js
Original file line number Diff line number Diff line change
Expand Up @@ -596,12 +596,15 @@ class SessionLive {
}

async getCurrentMediaAndDiscSequenceCount() {
return {
const counts = {
mediaSeq: this.mediaSeqCount,
discSeq: this.discSeqCount,
audioSeq: this.mediaSeqCount,
audioDiscSeq: this.discSeqCount,
};
if (this.useDemuxedAudio) {
counts.audioSeq = this.audioSeqCount;
counts.audioDiscSeq = this.audioDiscSeqCount;
}
return counts;
}

getStatus() {
Expand Down Expand Up @@ -1761,7 +1764,7 @@ class SessionLive {
}
}

_pushAmountBasedOnPreviousLastSegmentURI(m3u, isFirstTrack) {
_pushAmountBasedOnPreviousLastSegmentURI(m3u, isFirstTrack) {
if (!isFirstTrack) {
return null;
}
Expand All @@ -1773,7 +1776,7 @@ class SessionLive {
}
}
return null;
};
}

_parseMediaManifest(m3u, mediaManifestUri, liveTargetBandwidth, isFirstBW, isLeader) {
return new Promise(async (resolve, reject) => {
Expand Down Expand Up @@ -1919,7 +1922,7 @@ class SessionLive {
segmentUri = urlResolve(baseUrl, playlistItem.get("uri"));
}
}
if (playlistItem.get("discontinuity") && (playlistItemPrev && !playlistItemPrev.get("discontinuity"))) {
if (playlistItem.get("discontinuity") && playlistItemPrev && !playlistItemPrev.get("discontinuity")) {
if (plType === PlaylistTypes.VIDEO) {
this.liveSegQueue[liveTargetVariant].push({ discontinuity: true });
this.liveSegsForFollowers[liveTargetVariant].push({ discontinuity: true });
Expand Down Expand Up @@ -2033,8 +2036,7 @@ class SessionLive {
} else {
this.liveSegQueue[liveTargetBandwidth].push(seg);
this.liveSegsForFollowers[liveTargetBandwidth].push(seg);
debug(
`[${this.sessionId}]: ${logName}: Pushed Video segment (${seg.uri ? seg.uri : "Disc-tag"}) to 'liveSegQueue' (${liveTargetBandwidth})`);
debug(`[${this.sessionId}]: ${logName}: Pushed Video segment (${seg.uri ? seg.uri : "Disc-tag"}) to 'liveSegQueue' (${liveTargetBandwidth})`);
}
}

Expand Down
2 changes: 1 addition & 1 deletion spec/engine/playhead_live_spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ describe("SessionLive-Playhead consumer", () => {
let remain = increments;
let lastMseqNo = 0;
while (remain > 0) {
await sessionLive._loadAllMediaManifests();
await sessionLive._loadAllPlaylistManifests();
let manifest = await sessionLive.getCurrentMediaManifestAsync(180000);

const m = manifest.match(/#EXT-X-MEDIA-SEQUENCE:(\d+)/);
Expand Down
8 changes: 4 additions & 4 deletions spec/engine/stream_switcher_spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ describe("The Stream Switcher", () => {
const assetMgr = new TestAssetManager();
const session = new Session(assetMgr, { sessionId: "1" }, sessionStore);
const sessionLive = new SessionLive({ sessionId: "1" }, sessionLiveStore);
spyOn(sessionLive, "_loadAllMediaManifests").and.returnValue(mockLiveSegments);
spyOn(sessionLive, "_loadAllPlaylistManifests").and.returnValue(mockLiveSegments);

await session.initAsync();
await session.incrementAsync();
Expand All @@ -406,7 +406,7 @@ describe("The Stream Switcher", () => {
const assetMgr = new TestAssetManager();
const session = new Session(assetMgr, { sessionId: "1" }, sessionStore);
const sessionLive = new SessionLive({ sessionId: "1" }, sessionLiveStore);
spyOn(sessionLive, "_loadAllMediaManifests").and.returnValue(mockLiveSegments);
spyOn(sessionLive, "_loadAllPlaylistManifests").and.returnValue(mockLiveSegments);

await session.initAsync();
await session.incrementAsync();
Expand Down Expand Up @@ -464,7 +464,7 @@ describe("The Stream Switcher", () => {
const assetMgr = new TestAssetManager();
const session = new Session(assetMgr, { sessionId: "1" }, sessionStore);
const sessionLive = new SessionLive({ sessionId: "1" }, sessionLiveStore);
spyOn(sessionLive, "_loadAllMediaManifests").and.returnValue(mockLiveSegments);
spyOn(sessionLive, "_loadAllPlaylistManifests").and.returnValue(mockLiveSegments);
spyOn(session, "setCurrentMediaSequenceSegments").and.returnValue(true);
spyOn(session, "setCurrentMediaAndDiscSequenceCount").and.returnValue(true);

Expand All @@ -484,7 +484,7 @@ describe("The Stream Switcher", () => {
expect(testStreamSwitcher.getEventId()).toBe(null);
});

fit("should merge audio segments correctly", async () => {
it("should merge audio segments correctly", async () => {
const switchMgr = new TestSwitchManager(5);
const sessionLive = new StreamSwitcher({ streamSwitchManager: switchMgr });
const fromSegments = {
Expand Down

0 comments on commit 297e204

Please sign in to comment.