Skip to content

Commit

Permalink
feat(lightPush): improve peer usage and improve readability (#2155)
Browse files Browse the repository at this point in the history
* fix comment of default number of peers

* export default number of peers from base protocol sdk

* rename to light_push, move class to separate file

* move waitForRemotePeer to sdk package

* add todo to move waitForGossipSubPeerInMesh into @waku/relay

* clean up waitForRemotePeer, split metadata await from event and optimise, decouple from protocol implementations

* simplify and rename ILightPush interface

* use only connected peers in light push based on connections instead of peer renewal mechanism

* improve readability of result processing in light push

* fix check & update tests

* address tests, add new test cases, fix racing condition in StreamManager

* use libp2p.getPeers
  • Loading branch information
weboko authored Oct 4, 2024
1 parent b93134a commit 1d68526
Show file tree
Hide file tree
Showing 36 changed files with 308 additions and 235 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@
"it-all": "^3.0.4",
"it-length-prefixed": "^9.0.4",
"it-pipe": "^3.0.1",
"p-event": "^6.0.1",
"uint8arraylist": "^2.4.3",
"uuid": "^9.0.0"
},
Expand Down
4 changes: 1 addition & 3 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ export * as waku_light_push from "./lib/light_push/index.js";
export { LightPushCodec, LightPushCore } from "./lib/light_push/index.js";

export * as waku_store from "./lib/store/index.js";
export { StoreCore } from "./lib/store/index.js";

export { waitForRemotePeer } from "./lib/wait_for_remote_peer.js";
export { StoreCore, StoreCodec } from "./lib/store/index.js";

export { ConnectionManager } from "./lib/connection_manager.js";

Expand Down
29 changes: 28 additions & 1 deletion packages/core/src/lib/stream_manager/stream_manager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,32 @@ describe("StreamManager", () => {
}
});

it("should return different streams if requested simultaniously", async () => {
const con1 = createMockConnection();
con1.streams = [createMockStream({ id: "1", protocol: MULTICODEC })];

const newStreamSpy = sinon.spy(async (_protocol, _options) =>
createMockStream({
id: "2",
protocol: MULTICODEC,
writeStatus: "writable"
})
);

con1.newStream = newStreamSpy;
streamManager["getConnections"] = (_peerId: PeerId | undefined) => [con1];

const [stream1, stream2] = await Promise.all([
streamManager.getStream(mockPeer),
streamManager.getStream(mockPeer)
]);

const expected = ["1", "2"].toString();
const actual = [stream1.id, stream2.id].sort().toString();

expect(actual).to.be.eq(expected);
});

it("peer:update - should do nothing if another protocol hit", async () => {
const scheduleNewStreamSpy = sinon.spy();
streamManager["scheduleNewStream"] = scheduleNewStreamSpy;
Expand Down Expand Up @@ -156,6 +182,7 @@ function createMockStream(options: MockStreamOptions): Stream {
return {
id: options.id,
protocol: options.protocol,
writeStatus: options.writeStatus || "ready"
writeStatus: options.writeStatus || "ready",
metadata: {}
} as Stream;
}
27 changes: 23 additions & 4 deletions packages/core/src/lib/stream_manager/stream_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import { Logger } from "@waku/utils";

import { selectOpenConnection } from "./utils.js";

const STREAM_LOCK_KEY = "consumed";

export class StreamManager {
private readonly log: Logger;

Expand All @@ -29,16 +31,20 @@ export class StreamManager {
await scheduledStream;
}

const stream = this.getOpenStreamForCodec(peer.id);
let stream = this.getOpenStreamForCodec(peer.id);

if (stream) {
this.log.info(
`Found existing stream peerId=${peer.id.toString()} multicodec=${this.multicodec}`
);
this.lockStream(peer.id.toString(), stream);
return stream;
}

return this.createStream(peer);
stream = await this.createStream(peer);
this.lockStream(peer.id.toString(), stream);

return stream;
}

private async createStream(peer: Peer, retries = 0): Promise<Stream> {
Expand Down Expand Up @@ -142,13 +148,26 @@ export class StreamManager {
(s) => s.protocol === this.multicodec
);

if (!stream) {
return;
}

const isStreamUnusable = ["done", "closed", "closing"].includes(
stream?.writeStatus || ""
stream.writeStatus || ""
);
if (isStreamUnusable) {
if (isStreamUnusable || this.isStreamLocked(stream)) {
return;
}

return stream;
}

private lockStream(peerId: string, stream: Stream): void {
this.log.info(`Locking stream for peerId:${peerId}\tstreamId:${stream.id}`);
stream.metadata[STREAM_LOCK_KEY] = true;
}

private isStreamLocked(stream: Stream): boolean {
return !!stream.metadata[STREAM_LOCK_KEY];
}
}
2 changes: 1 addition & 1 deletion packages/interfaces/src/light_push.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { IBaseProtocolCore, IBaseProtocolSDK } from "./protocols.js";
import type { ISender } from "./sender.js";

export type ILightPushSDK = ISender &
export type ILightPush = ISender &
IBaseProtocolSDK & { protocol: IBaseProtocolCore };
2 changes: 1 addition & 1 deletion packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ export type ProtocolCreateOptions = {
* This is used by:
* - Light Push to send messages,
* - Filter to retrieve messages.
* Defaults to 3.
* Defaults to 2.
*/
numPeersToUse?: number;
/**
Expand Down
6 changes: 3 additions & 3 deletions packages/interfaces/src/waku.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { IConnectionManager } from "./connection_manager.js";
import type { IFilterSDK } from "./filter.js";
import { IHealthManager } from "./health_manager.js";
import type { Libp2p } from "./libp2p.js";
import type { ILightPushSDK } from "./light_push.js";
import type { ILightPush } from "./light_push.js";
import { Protocols } from "./protocols.js";
import type { IRelay } from "./relay.js";
import type { IStoreSDK } from "./store.js";
Expand All @@ -15,7 +15,7 @@ export interface Waku {
relay?: IRelay;
store?: IStoreSDK;
filter?: IFilterSDK;
lightPush?: ILightPushSDK;
lightPush?: ILightPush;

connectionManager: IConnectionManager;

Expand All @@ -36,7 +36,7 @@ export interface LightNode extends Waku {
relay: undefined;
store: IStoreSDK;
filter: IFilterSDK;
lightPush: ILightPushSDK;
lightPush: ILightPush;
}

export interface RelayNode extends Waku {
Expand Down
3 changes: 2 additions & 1 deletion packages/sdk/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@
"@waku/proto": "^0.0.8",
"@waku/utils": "0.0.20",
"@waku/message-hash": "0.1.16",
"libp2p": "^1.8.1"
"libp2p": "^1.8.1",
"p-event": "^6.0.1"
},
"devDependencies": {
"@rollup/plugin-commonjs": "^25.0.7",
Expand Down
6 changes: 4 additions & 2 deletions packages/sdk/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export { waitForRemotePeer, createEncoder, createDecoder } from "@waku/core";
export { createEncoder, createDecoder } from "@waku/core";
export {
DecodedMessage,
Decoder,
Expand All @@ -14,10 +14,12 @@ export {
defaultLibp2p,
createLibp2pAndUpdateOptions
} from "./create/index.js";
export { wakuLightPush } from "./protocols/lightpush/index.js";
export { wakuLightPush } from "./protocols/light_push/index.js";
export { wakuFilter } from "./protocols/filter/index.js";
export { wakuStore } from "./protocols/store/index.js";

export { waitForRemotePeer } from "./wait_for_remote_peer.js";

export * as waku from "@waku/core";
export * as utils from "@waku/utils";
export * from "@waku/interfaces";
4 changes: 2 additions & 2 deletions packages/sdk/src/protocols/base_protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ interface Options {
}

const RENEW_TIME_LOCK_DURATION = 30 * 1000;
const DEFAULT_NUM_PEERS_TO_USE = 2;
export const DEFAULT_NUM_PEERS_TO_USE = 2;
const DEFAULT_MAINTAIN_PEERS_INTERVAL = 30_000;

export class BaseProtocolSDK implements IBaseProtocolSDK {
private healthManager: IHealthManager;
protected healthManager: IHealthManager;
public readonly numPeersToUse: number;
private peers: Peer[] = [];
private maintainPeersIntervalId: ReturnType<
Expand Down
1 change: 1 addition & 0 deletions packages/sdk/src/protocols/light_push/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export { wakuLightPush } from "./light_push.js";
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import type { PeerId } from "@libp2p/interface";
import { ConnectionManager, LightPushCore } from "@waku/core";
import type { Peer, PeerId } from "@libp2p/interface";
import { ConnectionManager, LightPushCodec, LightPushCore } from "@waku/core";
import {
Failure,
type IEncoder,
ILightPushSDK,
ILightPush,
type IMessage,
type Libp2p,
type ProtocolCreateOptions,
Expand All @@ -19,14 +19,14 @@ import { BaseProtocolSDK } from "../base_protocol.js";

const log = new Logger("sdk:light-push");

class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
class LightPush extends BaseProtocolSDK implements ILightPush {
public readonly protocol: LightPushCore;

private readonly reliabilityMonitor: SenderReliabilityMonitor;

public constructor(
connectionManager: ConnectionManager,
libp2p: Libp2p,
private libp2p: Libp2p,
options?: ProtocolCreateOptions
) {
super(
Expand All @@ -49,11 +49,6 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
message: IMessage,
_options?: ProtocolUseOptions
): Promise<SDKProtocolResult> {
const options = {
autoRetry: true,
..._options
} as ProtocolUseOptions;

const successes: PeerId[] = [];
const failures: Failure[] = [];

Expand All @@ -63,17 +58,17 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
} catch (error) {
log.error("Failed to send waku light push: pubsub topic not configured");
return {
successes,
failures: [
{
error: ProtocolError.TOPIC_NOT_CONFIGURED
}
],
successes: []
]
};
}

const hasPeers = await this.hasPeers(options);
if (!hasPeers) {
const peers = await this.getConnectedPeers();
if (peers.length === 0) {
return {
successes,
failures: [
Expand All @@ -84,53 +79,75 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
};
}

const sendPromises = this.connectedPeers.map((peer) =>
this.protocol.send(encoder, message, peer)
const results = await Promise.allSettled(
peers.map((peer) => this.protocol.send(encoder, message, peer))
);

const results = await Promise.allSettled(sendPromises);

for (const result of results) {
if (result.status === "fulfilled") {
const { failure, success } = result.value;
if (success) {
successes.push(success);
}
if (failure) {
failures.push(failure);
if (failure.peerId) {
const peer = this.connectedPeers.find((connectedPeer) =>
connectedPeer.id.equals(failure.peerId)
);
if (peer) {
log.info(`
Failed to send message to peer ${failure.peerId}.
Retrying the message with the same peer in the background.
If this fails, the peer will be renewed.
`);
void this.reliabilityMonitor.attemptRetriesOrRenew(
failure.peerId,
() => this.protocol.send(encoder, message, peer)
);
}
}
}
} else {
if (result.status !== "fulfilled") {
log.error("Failed unexpectedly while sending:", result.reason);
failures.push({ error: ProtocolError.GENERIC_FAIL });
continue;
}

const { failure, success } = result.value;

if (success) {
successes.push(success);
continue;
}

if (failure) {
failures.push(failure);

const connectedPeer = this.connectedPeers.find((connectedPeer) =>
connectedPeer.id.equals(failure.peerId)
);

if (connectedPeer) {
void this.reliabilityMonitor.attemptRetriesOrRenew(
connectedPeer.id,
() => this.protocol.send(encoder, message, connectedPeer)
);
}
}
}

this.healthManager.updateProtocolHealth(LightPushCodec, successes.length);

return {
successes,
failures
};
}

private async getConnectedPeers(): Promise<Peer[]> {
const peerIDs = this.libp2p.getPeers();

if (peerIDs.length === 0) {
return [];
}

const peers = await Promise.all(
peerIDs.map(async (id) => {
try {
return await this.libp2p.peerStore.get(id);
} catch (e) {
return null;
}
})
);

return peers
.filter((p) => !!p)
.filter((p) => (p as Peer).protocols.includes(LightPushCodec))
.slice(0, this.numPeersToUse) as Peer[];
}
}

export function wakuLightPush(
connectionManager: ConnectionManager,
init: Partial<ProtocolCreateOptions> = {}
): (libp2p: Libp2p) => ILightPushSDK {
return (libp2p: Libp2p) => new LightPushSDK(connectionManager, libp2p, init);
): (libp2p: Libp2p) => ILightPush {
return (libp2p: Libp2p) => new LightPush(connectionManager, libp2p, init);
}
Loading

0 comments on commit 1d68526

Please sign in to comment.