Skip to content

Commit

Permalink
feat: lighten retry logic for LightPush
Browse files Browse the repository at this point in the history
  • Loading branch information
weboko committed Oct 11, 2024
1 parent b2efce5 commit 771440c
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 101 deletions.
17 changes: 15 additions & 2 deletions packages/interfaces/src/sender.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,23 @@
import type { IEncoder, IMessage } from "./message.js";
import { ProtocolUseOptions, SDKProtocolResult } from "./protocols.js";
import { SDKProtocolResult } from "./protocols.js";

export type ISenderOptions = {
/**
* Enables retry of a message that was failed to be sent.
* @default false
*/
autoRetry?: boolean;
/**
* Sets number of attempts if `autoRetry` is enabled.
* @default 3
*/
maxAttempts?: number;
};

export interface ISender {
send: (
encoder: IEncoder,
message: IMessage,
sendOptions?: ProtocolUseOptions
sendOptions?: ISenderOptions
) => Promise<SDKProtocolResult>;
}
10 changes: 10 additions & 0 deletions packages/sdk/src/protocols/light_push/light_push.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// TODO: add them after decoupling `BaseProtocolSDK` from LightPush
describe("LightPush SDK", () => {
it("should fail to send if pubsub topics are misconfigured");

it("should fail to send if no connected peers found");

it("should send to number of used peers");

it("should retry on failure if specified");
});
61 changes: 42 additions & 19 deletions packages/sdk/src/protocols/light_push/light_push.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,33 @@ import {
LightPushCore
} from "@waku/core";
import {
type CoreProtocolResult,
Failure,
type IEncoder,
ILightPush,
type IMessage,
type ISenderOptions,
type Libp2p,
type ProtocolCreateOptions,
ProtocolError,
ProtocolUseOptions,
SDKProtocolResult
} from "@waku/interfaces";
import { ensurePubsubTopicIsConfigured, Logger } from "@waku/utils";

import { ReliabilityMonitorManager } from "../../reliability_monitor/index.js";
import { SenderReliabilityMonitor } from "../../reliability_monitor/sender.js";
import { BaseProtocolSDK } from "../base_protocol.js";

const log = new Logger("sdk:light-push");

class LightPush extends BaseProtocolSDK implements ILightPush {
public readonly protocol: LightPushCore;
const DEFAULT_MAX_ATTEMPTS = 3;
const DEFAULT_SEND_OPTIONS: ISenderOptions = {
autoRetry: false,
maxAttempts: DEFAULT_MAX_ATTEMPTS
};

type RetryCallback = (peer: Peer) => Promise<CoreProtocolResult>;

private readonly reliabilityMonitor: SenderReliabilityMonitor;
export class LightPush extends BaseProtocolSDK implements ILightPush {
public readonly protocol: LightPushCore;

public constructor(
connectionManager: ConnectionManager,
Expand All @@ -42,17 +47,13 @@ class LightPush extends BaseProtocolSDK implements ILightPush {
}
);

this.reliabilityMonitor = ReliabilityMonitorManager.createSenderMonitor(
this.renewPeer.bind(this)
);

this.protocol = this.core as LightPushCore;
}

public async send(
encoder: IEncoder,
message: IMessage,
_options?: ProtocolUseOptions
options: ISenderOptions = DEFAULT_SEND_OPTIONS
): Promise<SDKProtocolResult> {
const successes: PeerId[] = [];
const failures: Failure[] = [];
Expand Down Expand Up @@ -105,14 +106,10 @@ class LightPush extends BaseProtocolSDK implements ILightPush {
if (failure) {
failures.push(failure);

const connectedPeer = this.connectedPeers.find((connectedPeer) =>
connectedPeer.id.equals(failure.peerId)
);

if (connectedPeer) {
void this.reliabilityMonitor.attemptRetriesOrRenew(
connectedPeer.id,
() => this.protocol.send(encoder, message, connectedPeer)
if (options?.autoRetry) {
void this.attemptRetries(
(peer: Peer) => this.protocol.send(encoder, message, peer),
options.maxAttempts
);
}
}
Expand All @@ -129,6 +126,32 @@ class LightPush extends BaseProtocolSDK implements ILightPush {
};
}

private async attemptRetries(
fn: RetryCallback,
maxAttempts?: number
): Promise<void> {
maxAttempts = maxAttempts || DEFAULT_MAX_ATTEMPTS;
const connectedPeers = await this.getConnectedPeers();

if (connectedPeers.length === 0) {
log.warn("Cannot retry with no connected peers.");
return;
}

for (let i = 0; i < maxAttempts; i++) {
const peer = connectedPeers[i % connectedPeers.length]; // always present as we checked for the length already
const response = await fn(peer);

if (response.success) {
return;
}

log.info(
`Attempted retry for peer:${peer.id} failed with:${response?.failure?.error}`
);
}
}

private async getConnectedPeers(): Promise<Peer[]> {
const peerIDs = this.libp2p.getPeers();

Expand Down
15 changes: 0 additions & 15 deletions packages/sdk/src/reliability_monitor/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,12 @@ import {
} from "@waku/interfaces";

import { ReceiverReliabilityMonitor } from "./receiver.js";
import { SenderReliabilityMonitor } from "./sender.js";

export class ReliabilityMonitorManager {
private static receiverMonitors: Map<
PubsubTopic,
ReceiverReliabilityMonitor
> = new Map();
private static senderMonitor: SenderReliabilityMonitor | undefined;

public static createReceiverMonitor(
pubsubTopic: PubsubTopic,
Expand Down Expand Up @@ -44,30 +42,17 @@ export class ReliabilityMonitorManager {
return monitor;
}

public static createSenderMonitor(
renewPeer: (peerId: PeerId) => Promise<Peer | undefined>
): SenderReliabilityMonitor {
if (!ReliabilityMonitorManager.senderMonitor) {
ReliabilityMonitorManager.senderMonitor = new SenderReliabilityMonitor(
renewPeer
);
}
return ReliabilityMonitorManager.senderMonitor;
}

private constructor() {}

public static stop(pubsubTopic: PubsubTopic): void {
this.receiverMonitors.delete(pubsubTopic);
this.senderMonitor = undefined;
}

public static stopAll(): void {
for (const [pubsubTopic, monitor] of this.receiverMonitors) {
monitor.setMaxMissedMessagesThreshold(undefined);
monitor.setMaxPingFailures(undefined);
this.receiverMonitors.delete(pubsubTopic);
this.senderMonitor = undefined;
}
}
}
65 changes: 0 additions & 65 deletions packages/sdk/src/reliability_monitor/sender.ts

This file was deleted.

0 comments on commit 771440c

Please sign in to comment.