diff --git a/packages/interfaces/src/sender.ts b/packages/interfaces/src/sender.ts index c195403a7e..19270d5ebd 100644 --- a/packages/interfaces/src/sender.ts +++ b/packages/interfaces/src/sender.ts @@ -1,10 +1,23 @@ import type { IEncoder, IMessage } from "./message.js"; -import { ProtocolUseOptions, SDKProtocolResult } from "./protocols.js"; +import { SDKProtocolResult } from "./protocols.js"; + +export type ISenderOptions = { + /** + * Enables retry of a message that was failed to be sent. + * @default false + */ + autoRetry?: boolean; + /** + * Sets number of attempts if `autoRetry` is enabled. + * @default 3 + */ + maxAttempts?: number; +}; export interface ISender { send: ( encoder: IEncoder, message: IMessage, - sendOptions?: ProtocolUseOptions + sendOptions?: ISenderOptions ) => Promise; } diff --git a/packages/sdk/src/protocols/light_push/light_push.spec.ts b/packages/sdk/src/protocols/light_push/light_push.spec.ts new file mode 100644 index 0000000000..9f529f181e --- /dev/null +++ b/packages/sdk/src/protocols/light_push/light_push.spec.ts @@ -0,0 +1,10 @@ +// TODO: add them after decoupling `BaseProtocolSDK` from LightPush +describe("LightPush SDK", () => { + it("should fail to send if pubsub topics are misconfigured"); + + it("should fail to send if no connected peers found"); + + it("should send to number of used peers"); + + it("should retry on failure if specified"); +}); diff --git a/packages/sdk/src/protocols/light_push/light_push.ts b/packages/sdk/src/protocols/light_push/light_push.ts index 61e72487f7..8da6ea31de 100644 --- a/packages/sdk/src/protocols/light_push/light_push.ts +++ b/packages/sdk/src/protocols/light_push/light_push.ts @@ -6,28 +6,33 @@ import { LightPushCore } from "@waku/core"; import { + type CoreProtocolResult, Failure, type IEncoder, ILightPush, type IMessage, + type ISenderOptions, type Libp2p, type ProtocolCreateOptions, ProtocolError, - ProtocolUseOptions, SDKProtocolResult } from "@waku/interfaces"; import { ensurePubsubTopicIsConfigured, Logger } from "@waku/utils"; -import { ReliabilityMonitorManager } from "../../reliability_monitor/index.js"; -import { SenderReliabilityMonitor } from "../../reliability_monitor/sender.js"; import { BaseProtocolSDK } from "../base_protocol.js"; const log = new Logger("sdk:light-push"); -class LightPush extends BaseProtocolSDK implements ILightPush { - public readonly protocol: LightPushCore; +const DEFAULT_MAX_ATTEMPTS = 3; +const DEFAULT_SEND_OPTIONS: ISenderOptions = { + autoRetry: false, + maxAttempts: DEFAULT_MAX_ATTEMPTS +}; + +type RetryCallback = (peer: Peer) => Promise; - private readonly reliabilityMonitor: SenderReliabilityMonitor; +export class LightPush extends BaseProtocolSDK implements ILightPush { + public readonly protocol: LightPushCore; public constructor( connectionManager: ConnectionManager, @@ -42,17 +47,13 @@ class LightPush extends BaseProtocolSDK implements ILightPush { } ); - this.reliabilityMonitor = ReliabilityMonitorManager.createSenderMonitor( - this.renewPeer.bind(this) - ); - this.protocol = this.core as LightPushCore; } public async send( encoder: IEncoder, message: IMessage, - _options?: ProtocolUseOptions + options: ISenderOptions = DEFAULT_SEND_OPTIONS ): Promise { const successes: PeerId[] = []; const failures: Failure[] = []; @@ -105,14 +106,10 @@ class LightPush extends BaseProtocolSDK implements ILightPush { if (failure) { failures.push(failure); - const connectedPeer = this.connectedPeers.find((connectedPeer) => - connectedPeer.id.equals(failure.peerId) - ); - - if (connectedPeer) { - void this.reliabilityMonitor.attemptRetriesOrRenew( - connectedPeer.id, - () => this.protocol.send(encoder, message, connectedPeer) + if (options?.autoRetry) { + void this.attemptRetries( + (peer: Peer) => this.protocol.send(encoder, message, peer), + options.maxAttempts ); } } @@ -129,6 +126,32 @@ class LightPush extends BaseProtocolSDK implements ILightPush { }; } + private async attemptRetries( + fn: RetryCallback, + maxAttempts?: number + ): Promise { + maxAttempts = maxAttempts || DEFAULT_MAX_ATTEMPTS; + const connectedPeers = await this.getConnectedPeers(); + + if (connectedPeers.length === 0) { + log.warn("Cannot retry with no connected peers."); + return; + } + + for (let i = 0; i < maxAttempts; i++) { + const peer = connectedPeers[i % connectedPeers.length]; // always present as we checked for the length already + const response = await fn(peer); + + if (response.success) { + return; + } + + log.info( + `Attempted retry for peer:${peer.id} failed with:${response?.failure?.error}` + ); + } + } + private async getConnectedPeers(): Promise { const peerIDs = this.libp2p.getPeers(); diff --git a/packages/sdk/src/reliability_monitor/index.ts b/packages/sdk/src/reliability_monitor/index.ts index f5bc48879d..f75c5fdc92 100644 --- a/packages/sdk/src/reliability_monitor/index.ts +++ b/packages/sdk/src/reliability_monitor/index.ts @@ -7,14 +7,12 @@ import { } from "@waku/interfaces"; import { ReceiverReliabilityMonitor } from "./receiver.js"; -import { SenderReliabilityMonitor } from "./sender.js"; export class ReliabilityMonitorManager { private static receiverMonitors: Map< PubsubTopic, ReceiverReliabilityMonitor > = new Map(); - private static senderMonitor: SenderReliabilityMonitor | undefined; public static createReceiverMonitor( pubsubTopic: PubsubTopic, @@ -44,22 +42,10 @@ export class ReliabilityMonitorManager { return monitor; } - public static createSenderMonitor( - renewPeer: (peerId: PeerId) => Promise - ): SenderReliabilityMonitor { - if (!ReliabilityMonitorManager.senderMonitor) { - ReliabilityMonitorManager.senderMonitor = new SenderReliabilityMonitor( - renewPeer - ); - } - return ReliabilityMonitorManager.senderMonitor; - } - private constructor() {} public static stop(pubsubTopic: PubsubTopic): void { this.receiverMonitors.delete(pubsubTopic); - this.senderMonitor = undefined; } public static stopAll(): void { @@ -67,7 +53,6 @@ export class ReliabilityMonitorManager { monitor.setMaxMissedMessagesThreshold(undefined); monitor.setMaxPingFailures(undefined); this.receiverMonitors.delete(pubsubTopic); - this.senderMonitor = undefined; } } } diff --git a/packages/sdk/src/reliability_monitor/sender.ts b/packages/sdk/src/reliability_monitor/sender.ts deleted file mode 100644 index 914c321da8..0000000000 --- a/packages/sdk/src/reliability_monitor/sender.ts +++ /dev/null @@ -1,65 +0,0 @@ -import type { Peer, PeerId } from "@libp2p/interface"; -import { CoreProtocolResult, PeerIdStr } from "@waku/interfaces"; -import { Logger } from "@waku/utils"; - -const log = new Logger("sdk:sender:reliability_monitor"); - -const DEFAULT_MAX_ATTEMPTS_BEFORE_RENEWAL = 3; - -export class SenderReliabilityMonitor { - private attempts: Map = new Map(); - private readonly maxAttemptsBeforeRenewal = - DEFAULT_MAX_ATTEMPTS_BEFORE_RENEWAL; - - public constructor( - private renewPeer: (peerId: PeerId) => Promise - ) {} - - public async attemptRetriesOrRenew( - peerId: PeerId, - protocolSend: () => Promise - ): Promise { - const peerIdStr = peerId.toString(); - const currentAttempts = this.attempts.get(peerIdStr) || 0; - this.attempts.set(peerIdStr, currentAttempts + 1); - - if (currentAttempts + 1 < this.maxAttemptsBeforeRenewal) { - try { - const result = await protocolSend(); - if (result.success) { - log.info(`Successfully sent message after retry to ${peerIdStr}`); - this.attempts.delete(peerIdStr); - } else { - log.error( - `Failed to send message after retry to ${peerIdStr}: ${result.failure}` - ); - await this.attemptRetriesOrRenew(peerId, protocolSend); - } - } catch (error) { - log.error( - `Failed to send message after retry to ${peerIdStr}: ${error}` - ); - await this.attemptRetriesOrRenew(peerId, protocolSend); - } - } else { - try { - const newPeer = await this.renewPeer(peerId); - if (newPeer) { - log.info( - `Renewed peer ${peerId.toString()} to ${newPeer.id.toString()}` - ); - - this.attempts.delete(peerIdStr); - this.attempts.set(newPeer.id.toString(), 0); - await protocolSend(); - } else { - log.error( - `Failed to renew peer ${peerId.toString()}: New peer is undefined` - ); - } - } catch (error) { - log.error(`Failed to renew peer ${peerId.toString()}: ${error}`); - } - } - } -}