Skip to content

Commit

Permalink
Move P2P tracker client to separate file.
Browse files Browse the repository at this point in the history
  • Loading branch information
i-zolotarenko committed Nov 27, 2023
1 parent 2275bd0 commit fec7ce4
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 142 deletions.
143 changes: 1 addition & 142 deletions packages/p2p-media-loader-core/src/p2p/loader.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
import TrackerClient, {
PeerConnection,
TrackerClientEvents,
} from "bittorrent-tracker";
import { Peer } from "./peer";
import { Segment, Settings, StreamWithSegments } from "../types";
import { QueueItem } from "../internal-types";
Expand All @@ -13,6 +9,7 @@ import * as Utils from "../utils/utils";
import { PeerSegmentStatus } from "../enums";
import { RequestsContainer } from "../request-container";
import { Request } from "../request";
import { P2PTrackerClient } from "./tracker-client";
import debug from "debug";

export class P2PLoader {
Expand Down Expand Up @@ -165,141 +162,3 @@ export class P2PLoader {
this.trackerClient.destroy();
}
}

type PeerItem = {
peer?: Peer;
potentialConnections: Set<PeerConnection>;
};

type P2PTrackerClientEventHandlers = {
onPeerConnected: (peer: Peer) => void;
onSegmentRequested: (peer: Peer, segmentExternalId: string) => void;
};

class P2PTrackerClient {
private readonly client: TrackerClient;
private readonly _peers = new Map<string, PeerItem>();
private readonly streamHash: string;

constructor(
private readonly peerId: string,
private readonly streamExternalId: string,
private readonly stream: StreamWithSegments,
private readonly eventHandlers: P2PTrackerClientEventHandlers,
private readonly settings: Settings,
private readonly logger: debug.Debugger
) {
this.streamHash = PeerUtil.getStreamHash(streamExternalId);
this.client = new TrackerClient({
infoHash: utf8ToHex(this.streamHash),
peerId: utf8ToHex(this.peerId),
port: 6881,
announce: [
// "wss://tracker.novage.com.ua",
"wss://tracker.openwebtorrent.com",
],
rtcConfig: {
iceServers: [
{
urls: [
"stun:stun.l.google.com:19302",
"stun:global.stun.twilio.com:3478",
],
},
],
},
});
this.client.on("peer", this.onReceivePeerConnection);
this.client.on("warning", this.onTrackerClientWarning);
this.client.on("error", this.onTrackerClientError);
this.logger(
`create tracker client: ${LoggerUtils.getStreamString(stream)}; ${
this.peerId
}`
);
}

start() {
this.client.start();
}

destroy() {
this.client.destroy();
for (const { peer, potentialConnections } of this._peers.values()) {
peer?.destroy();
for (const connection of potentialConnections) {
connection.destroy();
}
}
}

private onReceivePeerConnection: TrackerClientEvents["peer"] = (
peerConnection
) => {
let peerItem = this._peers.get(peerConnection.id);

if (peerItem?.peer) {
peerConnection.destroy();
return;
} else if (!peerItem) {
peerItem = { potentialConnections: new Set() };
peerItem.potentialConnections.add(peerConnection);
const itemId = Peer.getPeerIdFromHexString(peerConnection.id);
this._peers.set(itemId, peerItem);
}

peerConnection.on("connect", () => {
if (!peerItem) return;

for (const connection of peerItem.potentialConnections) {
if (connection !== peerConnection) connection.destroy();
}
peerItem.potentialConnections.clear();
peerItem.peer = new Peer(
peerConnection,
{
onPeerClosed: this.onPeerClosed,
onSegmentRequested: this.eventHandlers.onSegmentRequested,
},
this.settings
);
this.eventHandlers.onPeerConnected(peerItem.peer);
});
};

private onTrackerClientWarning: TrackerClientEvents["warning"] = (
warning
) => {
this.logger(
`tracker warning (${LoggerUtils.getStreamString(
this.stream
)}: ${warning})`
);
};

private onTrackerClientError: TrackerClientEvents["error"] = (error) => {
this.logger(
`tracker error (${LoggerUtils.getStreamString(this.stream)}: ${error})`
);
};

*peers() {
for (const peerItem of this._peers.values()) {
if (peerItem?.peer) yield peerItem.peer;
}
}

private onPeerClosed = (peer: Peer) => {
this.logger(`peer closed: ${peer.id}`);
this._peers.delete(peer.id);
};
}

function utf8ToHex(utf8String: string) {
let result = "";
for (let i = 0; i < utf8String.length; i++) {
result += utf8String.charCodeAt(i).toString(16);
}

return result;
}
146 changes: 146 additions & 0 deletions packages/p2p-media-loader-core/src/p2p/tracker-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import TrackerClient, {
PeerConnection,
TrackerClientEvents,
} from "bittorrent-tracker";
import { Settings, StreamWithSegments } from "../types";
import debug from "debug";
import * as PeerUtil from "../utils/peer";
import * as LoggerUtils from "../utils/logger";
import { Peer } from "./peer";

type PeerItem = {
peer?: Peer;
potentialConnections: Set<PeerConnection>;
};
type P2PTrackerClientEventHandlers = {
onPeerConnected: (peer: Peer) => void;
onSegmentRequested: (peer: Peer, segmentExternalId: string) => void;
};

export class P2PTrackerClient {
private readonly client: TrackerClient;
private readonly _peers = new Map<string, PeerItem>();
private readonly streamHash: string;

constructor(
private readonly peerId: string,
private readonly streamExternalId: string,
private readonly stream: StreamWithSegments,
private readonly eventHandlers: P2PTrackerClientEventHandlers,
private readonly settings: Settings,
private readonly logger: debug.Debugger
) {
this.streamHash = PeerUtil.getStreamHash(streamExternalId);
this.client = new TrackerClient({
infoHash: utf8ToHex(this.streamHash),
peerId: utf8ToHex(this.peerId),
port: 6881,
announce: [
// "wss://tracker.novage.com.ua",
"wss://tracker.openwebtorrent.com",
],
rtcConfig: {
iceServers: [
{
urls: [
"stun:stun.l.google.com:19302",
"stun:global.stun.twilio.com:3478",
],
},
],
},
});
this.client.on("peer", this.onReceivePeerConnection);
this.client.on("warning", this.onTrackerClientWarning);
this.client.on("error", this.onTrackerClientError);
this.logger(
`create tracker client: ${LoggerUtils.getStreamString(stream)}; ${
this.peerId
}`
);
}

start() {
this.client.start();
}

destroy() {
this.client.destroy();
for (const { peer, potentialConnections } of this._peers.values()) {
peer?.destroy();
for (const connection of potentialConnections) {
connection.destroy();
}
}
}

private onReceivePeerConnection: TrackerClientEvents["peer"] = (
peerConnection
) => {
let peerItem = this._peers.get(peerConnection.id);

if (peerItem?.peer) {
peerConnection.destroy();
return;
} else if (!peerItem) {
peerItem = { potentialConnections: new Set() };
peerItem.potentialConnections.add(peerConnection);
const itemId = Peer.getPeerIdFromHexString(peerConnection.id);
this._peers.set(itemId, peerItem);
}

peerConnection.on("connect", () => {
if (!peerItem) return;

for (const connection of peerItem.potentialConnections) {
if (connection !== peerConnection) connection.destroy();
}
peerItem.potentialConnections.clear();
peerItem.peer = new Peer(
peerConnection,
{
onPeerClosed: this.onPeerClosed,
onSegmentRequested: this.eventHandlers.onSegmentRequested,
},
this.settings
);
this.eventHandlers.onPeerConnected(peerItem.peer);
});
};

private onTrackerClientWarning: TrackerClientEvents["warning"] = (
warning
) => {
this.logger(
`tracker warning (${LoggerUtils.getStreamString(
this.stream
)}: ${warning})`
);
};

private onTrackerClientError: TrackerClientEvents["error"] = (error) => {
this.logger(
`tracker error (${LoggerUtils.getStreamString(this.stream)}: ${error})`
);
};

*peers() {
for (const peerItem of this._peers.values()) {
if (peerItem?.peer) yield peerItem.peer;
}
}

private onPeerClosed = (peer: Peer) => {
this.logger(`peer closed: ${peer.id}`);
this._peers.delete(peer.id);
};
}

function utf8ToHex(utf8String: string) {
let result = "";
for (let i = 0; i < utf8String.length; i++) {
result += utf8String.charCodeAt(i).toString(16);
}

return result;
}

0 comments on commit fec7ce4

Please sign in to comment.