Skip to content

Commit

Permalink
fix: remove window reference and improve waitForRemotePeer
Browse files Browse the repository at this point in the history
  • Loading branch information
weboko committed Nov 5, 2024
1 parent 7c0ce7b commit 26c78b1
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 43 deletions.
4 changes: 2 additions & 2 deletions packages/sdk/src/reliability_monitor/receiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ export class ReceiverReliabilityMonitor {
return;
}

const timeout = window.setTimeout(
const timeout = setTimeout(
(async () => {
const receivedAnyMessage = this.verifiedPeers.has(peerIdStr);
const receivedTestMessage = this.receivedMessagesFormPeer.has(
Expand All @@ -136,7 +136,7 @@ export class ReceiverReliabilityMonitor {
await this.renewAndSubscribePeer(peerId);
}) as () => void,
MESSAGE_VERIFICATION_DELAY
);
) as unknown as number;

this.scheduledVerification.set(peerIdStr, timeout);
}
Expand Down
8 changes: 4 additions & 4 deletions packages/sdk/src/waku/wait_for_remote_peer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import sinon from "sinon";
import { waitForRemotePeer } from "./wait_for_remote_peer.js";
import { WakuNode } from "./waku.js";

describe("waitForRemotePeer", () => {
describe.only("waitForRemotePeer", () => {
let eventTarget = new EventTarget();

beforeEach(() => {
Expand Down Expand Up @@ -121,8 +121,8 @@ describe("waitForRemotePeer", () => {
});

it("should check connected peers if present and suitable", async () => {
const addEventListenerSpy = sinon.spy(eventTarget.addEventListener);
eventTarget.addEventListener = addEventListenerSpy;
const removeEventListenerSpy = sinon.spy(eventTarget.removeEventListener);
eventTarget.removeEventListener = removeEventListenerSpy;

const wakuNode = mockWakuNode({
isStarted: true,
Expand All @@ -144,7 +144,7 @@ describe("waitForRemotePeer", () => {
}

expect(err).to.be.undefined;
expect(addEventListenerSpy.notCalled).to.be.true;
expect(removeEventListenerSpy.notCalled).to.be.true;
});

it("should wait for LightPush peer to be connected", async () => {
Expand Down
91 changes: 54 additions & 37 deletions packages/sdk/src/waku/wait_for_remote_peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,57 +40,76 @@ export async function waitForRemotePeer(
throw Error("Waku node is not started");
}

for (const protocol of protocols) {
switch (protocol) {
case Protocols.Relay:
if (!waku.relay)
throw Error("Cannot wait for Relay peer: protocol not mounted");
break;
case Protocols.LightPush:
if (!waku.lightPush)
throw Error("Cannot wait for LightPush peer: protocol not mounted");
break;
case Protocols.Store:
if (!waku.store)
throw Error("Cannot wait for Store peer: protocol not mounted");
break;
case Protocols.Filter:
if (!waku.filter)
throw Error("Cannot wait for Filter peer: protocol not mounted");
break;
}
}

const promises = [waitForProtocols(waku, protocols)];

if (connections.length > 0 && !protocols.includes(Protocols.Relay)) {
const success = await waitForMetadata(waku, protocols);
promises.push(
waitForMetadata(waku, protocols) as unknown as Promise<any[]>

Check warning on line 68 in packages/sdk/src/waku/wait_for_remote_peer.ts

View workflow job for this annotation

GitHub Actions / check

Unexpected any. Specify a different type

Check warning on line 68 in packages/sdk/src/waku/wait_for_remote_peer.ts

View workflow job for this annotation

GitHub Actions / proto

Unexpected any. Specify a different type
);
}

if (success) {
return;
}
if (timeoutMs) {
await rejectOnTimeout(
Promise.any(promises),
timeoutMs,
"Timed out waiting for a remote peer."
);
} else {
await Promise.any(promises);
}
}

type EventListener = (_: CustomEvent<IdentifyResult>) => void;

/**
* Waits for required peers to be connected.
*/
async function waitForProtocols(
waku: IWaku,
protocols: Protocols[]
): Promise<any[]> {

Check warning on line 91 in packages/sdk/src/waku/wait_for_remote_peer.ts

View workflow job for this annotation

GitHub Actions / check

Unexpected any. Specify a different type

Check warning on line 91 in packages/sdk/src/waku/wait_for_remote_peer.ts

View workflow job for this annotation

GitHub Actions / proto

Unexpected any. Specify a different type
const promises = [];

if (protocols.includes(Protocols.Relay)) {
if (!waku.relay) {
throw Error("Cannot wait for Relay peer: protocol not mounted");
}
if (waku.relay && protocols.includes(Protocols.Relay)) {
promises.push(waku.relay.waitForPeers());
}

if (protocols.includes(Protocols.Store)) {
if (!waku.store) {
throw Error("Cannot wait for Store peer: protocol not mounted");
}
if (waku.store && protocols.includes(Protocols.Store)) {
promises.push(waitForConnectedPeer(StoreCodec, waku.libp2p));
}

if (protocols.includes(Protocols.LightPush)) {
if (!waku.lightPush) {
throw Error("Cannot wait for LightPush peer: protocol not mounted");
}
if (waku.lightPush && protocols.includes(Protocols.LightPush)) {
promises.push(waitForConnectedPeer(LightPushCodec, waku.libp2p));
}

if (protocols.includes(Protocols.Filter)) {
if (!waku.filter) {
throw new Error("Cannot wait for Filter peer: protocol not mounted");
}
if (waku.filter && protocols.includes(Protocols.Filter)) {
promises.push(waitForConnectedPeer(FilterCodecs.SUBSCRIBE, waku.libp2p));
}

if (timeoutMs) {
await rejectOnTimeout(
Promise.all(promises),
timeoutMs,
"Timed out waiting for a remote peer."
);
} else {
await Promise.all(promises);
}
return Promise.all(promises);
}

type EventListener = (_: CustomEvent<IdentifyResult>) => void;

/**
* Wait for a peer with the given protocol to be connected.
* If sharding is enabled on the node, it will also wait for the peer to be confirmed by the metadata service.
Expand Down Expand Up @@ -135,12 +154,12 @@ async function waitForConnectedPeer(
}

/**
* Waits for the metadata from the remote peer.
* Checks existing connections for needed metadata.
*/
async function waitForMetadata(
waku: IWaku,
protocols: Protocols[]
): Promise<boolean> {
): Promise<void> {
const connectedPeers = waku.libp2p.getPeers();
const metadataService = waku.libp2p.services.metadata;
const enabledCodes = mapProtocolsToCodecs(protocols);
Expand All @@ -149,7 +168,7 @@ async function waitForMetadata(
log.info(
`Skipping waitForMetadata due to missing connections:${connectedPeers.length} or metadataService:${!!metadataService}`
);
return false;
return;
}

for (const peerId of connectedPeers) {
Expand All @@ -173,7 +192,7 @@ async function waitForMetadata(
);

if (confirmedAllCodecs) {
return true;
return;
}
}
}
Expand All @@ -187,8 +206,6 @@ async function waitForMetadata(
continue;
}
}

return false;
}

const awaitTimeout = (ms: number, rejectReason: string): Promise<void> =>
Expand Down

0 comments on commit 26c78b1

Please sign in to comment.