Skip to content

Commit

Permalink
feat(filter): enhancing protocol peer management with mutex locks (#2137
Browse files Browse the repository at this point in the history
)

* chore: improvements

* chore: add logs for subscription maintenance

* chore: update logging

* chore: trimming down BaseProtocolCore

* chore: track peers in a hashmap instead of array

* chore: peer mgmt responds to conenction/disconnection and improve logging

* feat: add mutex locks to tackle race conditions over shared state

* fix: build

* chore: some mutex lock-release improvements

* feat: peer manager

* chore: rm tests for remove internal util

* chore: update HealthManager updates

* chore: update tests

* rm: only

* fix: hasPeers management

* chore: add modularity to getting connected peers

* chore: improve logs & add debug

* chore: renewal doesnt disconnect, only removes

* chore: await for sequential operations

* chore: add TODO

* chore: minor improvements

* chore: fix rebase

* chore: update playright

* chore: remove additional arg

* chore: update interafce

* feat(peer-manager): unit tests

* chore: improve hasPeers()

* chore: update lockfile

* feat: Filter reacts to peer:disconnect event, add tests

* chore: fix lock

* chore: update playright

* chore: update protocol health for lightpush

* chore: remove .only

* chore: address comments and improvements

* fix: tsconfig
  • Loading branch information
danisharora099 authored Oct 10, 2024
1 parent 75fcca4 commit b2efce5
Show file tree
Hide file tree
Showing 23 changed files with 4,924 additions and 3,882 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/playwright.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
timeout-minutes: 60
runs-on: ubuntu-latest
container:
image: mcr.microsoft.com/playwright:v1.46.0-jammy
image: mcr.microsoft.com/playwright:v1.48.0-jammy
steps:
- uses: actions/checkout@v3
- uses: actions/setup-node@v3
Expand Down
7,556 changes: 4,405 additions & 3,151 deletions package-lock.json

Large diffs are not rendered by default.

42 changes: 14 additions & 28 deletions packages/core/src/lib/base_protocol.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
import type { Libp2p } from "@libp2p/interface";
import type { Peer, PeerStore, Stream } from "@libp2p/interface";
import type { Peer, Stream } from "@libp2p/interface";
import type {
IBaseProtocolCore,
Libp2pComponents,
PubsubTopic
} from "@waku/interfaces";
import { Logger, pubsubTopicsToShardInfo } from "@waku/utils";
import {
getConnectedPeersForProtocolAndShard,
getPeersForProtocol,
sortPeersByLatency
} from "@waku/utils/libp2p";
import { Logger } from "@waku/utils";
import { getPeersForProtocol, sortPeersByLatency } from "@waku/utils/libp2p";

import { filterPeersByDiscovery } from "./filterPeers.js";
import { StreamManager } from "./stream_manager/index.js";
Expand All @@ -26,7 +22,7 @@ export class BaseProtocol implements IBaseProtocolCore {

protected constructor(
public multicodec: string,
private components: Libp2pComponents,
protected components: Libp2pComponents,
private log: Logger,
public readonly pubsubTopics: PubsubTopic[]
) {
Expand All @@ -50,25 +46,22 @@ export class BaseProtocol implements IBaseProtocolCore {
return this.streamManager.getStream(peer);
}

public get peerStore(): PeerStore {
return this.components.peerStore;
}

/**
* Returns known peers from the address book (`libp2p.peerStore`) that support
* the class protocol. Waku may or may not be currently connected to these
* peers.
*/
public async allPeers(): Promise<Peer[]> {
return getPeersForProtocol(this.peerStore, [this.multicodec]);
return getPeersForProtocol(this.components.peerStore, [this.multicodec]);
}

public async connectedPeers(): Promise<Peer[]> {
const peers = await this.allPeers();
return peers.filter((peer) => {
return (
this.components.connectionManager.getConnections(peer.id).length > 0
const connections = this.components.connectionManager.getConnections(
peer.id
);
return connections.length > 0;
});
}

Expand All @@ -77,9 +70,8 @@ export class BaseProtocol implements IBaseProtocolCore {
*
* @param numPeers - The total number of peers to retrieve. If 0, all peers are returned.
* @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve.
* @returns A list of peers that support the protocol sorted by latency.
*/
* @returns A list of peers that support the protocol sorted by latency. By default, returns all peers available, including bootstrap.
*/
public async getPeers(
{
numPeers,
Expand All @@ -88,29 +80,23 @@ export class BaseProtocol implements IBaseProtocolCore {
numPeers: number;
maxBootstrapPeers: number;
} = {
maxBootstrapPeers: 1,
maxBootstrapPeers: 0,
numPeers: 0
}
): Promise<Peer[]> {
// Retrieve all connected peers that support the protocol & shard (if configured)
const connectedPeersForProtocolAndShard =
await getConnectedPeersForProtocolAndShard(
this.components.connectionManager.getConnections(),
this.peerStore,
[this.multicodec],
pubsubTopicsToShardInfo(this.pubsubTopics)
);
const allAvailableConnectedPeers = await this.connectedPeers();

// Filter the peers based on discovery & number of peers requested
const filteredPeers = filterPeersByDiscovery(
connectedPeersForProtocolAndShard,
allAvailableConnectedPeers,
numPeers,
maxBootstrapPeers
);

// Sort the peers by latency
const sortedFilteredPeers = await sortPeersByLatency(
this.peerStore,
this.components.peerStore,
filteredPeers
);

Expand Down
7 changes: 5 additions & 2 deletions packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,11 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
() => {
log.info("Receiving pipe closed.");
},
(e) => {
log.error("Error with receiving pipe", e);
async (e) => {
log.error(
`Error with receiving pipe on peer:${connection.remotePeer.toString()} -- stream:${stream.id} -- protocol:${stream.protocol}: `,
e
);
}
);
} catch (e) {
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/lib/metadata/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class Metadata extends BaseProtocol implements IMetadata {
pubsubTopicsToShardInfo(this.pubsubTopics)
);

const peer = await this.peerStore.get(peerId);
const peer = await this.libp2pComponents.peerStore.get(peerId);
if (!peer) {
return {
shardInfo: null,
Expand Down
2 changes: 1 addition & 1 deletion packages/discovery/src/peer-exchange/waku_peer_exchange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
numPeers: BigInt(numPeers)
});

const peer = await this.peerStore.get(peerId);
const peer = await this.components.peerStore.get(peerId);
if (!peer) {
return {
peerInfos: null,
Expand Down
17 changes: 2 additions & 15 deletions packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { Libp2p } from "@libp2p/interface";
import type { PeerId } from "@libp2p/interface";
import type { Peer, PeerStore } from "@libp2p/interface";
import type { Peer } from "@libp2p/interface";

import type { CreateLibp2pOptions } from "./libp2p.js";
import type { IDecodedMessage } from "./message.js";
Expand All @@ -16,7 +16,6 @@ export enum Protocols {

export type IBaseProtocolCore = {
multicodec: string;
peerStore: PeerStore;
allPeers: () => Promise<Peer[]>;
connectedPeers: () => Promise<Peer[]>;
addLibp2pEventListener: Libp2p["addEventListener"];
Expand All @@ -25,7 +24,7 @@ export type IBaseProtocolCore = {

export type IBaseProtocolSDK = {
readonly connectedPeers: Peer[];
renewPeer: (peerToDisconnect: PeerId) => Promise<Peer>;
renewPeer: (peerToDisconnect: PeerId) => Promise<Peer | undefined>;
readonly numPeersToUse: number;
};

Expand All @@ -36,10 +35,6 @@ export type NetworkConfig = StaticSharding | AutoSharding;
* Options for using LightPush and Filter
*/
export type ProtocolUseOptions = {
/**
* Optional flag to enable auto-retry with exponential backoff
*/
autoRetry?: boolean;
/**
* Optional flag to force using all available peers
*/
Expand All @@ -48,14 +43,6 @@ export type ProtocolUseOptions = {
* Optional maximum number of attempts for exponential backoff
*/
maxAttempts?: number;
/**
* Optional initial delay in milliseconds for exponential backoff
*/
initialDelay?: number;
/**
* Optional maximum delay in milliseconds for exponential backoff
*/
maxDelay?: number;
};

export type ProtocolCreateOptions = {
Expand Down
15 changes: 8 additions & 7 deletions packages/sdk/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
},
"dependencies": {
"@chainsafe/libp2p-noise": "^15.1.0",
"@libp2p/bootstrap": "^10.1.2",
"@libp2p/bootstrap": "^10",
"@libp2p/identify": "^2.1.2",
"@libp2p/mplex": "^10.1.2",
"@libp2p/ping": "^1.1.2",
Expand All @@ -73,23 +73,24 @@
"@waku/proto": "^0.0.8",
"@waku/utils": "0.0.20",
"@waku/message-hash": "0.1.16",
"async-mutex": "^0.5.0",
"libp2p": "^1.8.1"
},
"devDependencies": {
"@types/mocha": "^10.0.6",
"@types/chai": "^4.3.11",
"@rollup/plugin-commonjs": "^25.0.7",
"@rollup/plugin-json": "^6.0.0",
"@rollup/plugin-node-resolve": "^15.2.3",
"@rollup/plugin-replace": "^5.0.5",
"@types/mocha": "^10.0.9",
"@waku/build-utils": "*",
"mocha": "^10.3.0",
"sinon": "^18.0.0",
"chai": "^4.3.10",
"chai": "^5.1.1",
"cspell": "^8.6.1",
"interface-datastore": "^8.2.10",
"mocha": "^10.7.3",
"npm-run-all": "^4.1.5",
"rollup": "^4.12.0"
"rollup": "^4.12.0",
"sinon": "^19.0.2"
},
"peerDependencies": {
"@libp2p/bootstrap": "^10"
Expand All @@ -109,4 +110,4 @@
"LICENSE",
"README.md"
]
}
}
Loading

0 comments on commit b2efce5

Please sign in to comment.