diff --git a/package-lock.json b/package-lock.json index 30fa5c4078..8a01ff6a1c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -8,11 +8,11 @@ "workspaces": [ "packages/interfaces", "packages/utils", + "packages/message-hash", "packages/proto", "packages/enr", "packages/core", "packages/relay", - "packages/message-hash", "packages/peer-exchange", "packages/dns-discovery", "packages/message-encryption", @@ -27635,6 +27635,7 @@ "@noble/hashes": "^1.3.2", "@waku/enr": "^0.0.20", "@waku/interfaces": "0.0.21", + "@waku/message-hash": "^0.1.10", "@waku/proto": "0.0.6", "@waku/utils": "0.0.14", "debug": "^4.3.4", @@ -31818,6 +31819,7 @@ "@waku/build-utils": "*", "@waku/enr": "^0.0.20", "@waku/interfaces": "0.0.21", + "@waku/message-hash": "^0.1.10", "@waku/proto": "0.0.6", "@waku/utils": "0.0.14", "chai": "^4.3.10", diff --git a/package.json b/package.json index fb280910c7..b28428d408 100644 --- a/package.json +++ b/package.json @@ -5,11 +5,11 @@ "workspaces": [ "packages/interfaces", "packages/utils", + "packages/message-hash", "packages/proto", "packages/enr", "packages/core", "packages/relay", - "packages/message-hash", "packages/peer-exchange", "packages/dns-discovery", "packages/message-encryption", diff --git a/packages/core/package.json b/packages/core/package.json index 9c8c91e642..d449b87acf 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -76,6 +76,7 @@ "@noble/hashes": "^1.3.2", "@waku/enr": "^0.0.20", "@waku/interfaces": "0.0.21", + "@waku/message-hash": "^0.1.10", "@waku/proto": "0.0.6", "@waku/utils": "0.0.14", "debug": "^4.3.4", diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts index 6abe59b561..9eb3ab57d8 100644 --- a/packages/core/src/lib/base_protocol.ts +++ b/packages/core/src/lib/base_protocol.ts @@ -17,6 +17,8 @@ import { import { filterPeersByDiscovery } from "./filterPeers.js"; import { StreamManager } from "./stream_manager.js"; +const DEFAULT_NUM_PEERS_TO_USE = 3; + /** * A class with predefined helpers, to be used as a base to implement Waku * Protocols. @@ -24,6 +26,7 @@ import { StreamManager } from "./stream_manager.js"; export class BaseProtocol implements IBaseProtocol { public readonly addLibp2pEventListener: Libp2p["addEventListener"]; public readonly removeLibp2pEventListener: Libp2p["removeEventListener"]; + readonly numPeersToUse: number; protected streamManager: StreamManager; protected pubsubTopics: PubsubTopic[]; @@ -35,6 +38,8 @@ export class BaseProtocol implements IBaseProtocol { ) { this.pubsubTopics = this.initializePubsubTopic(options); + this.numPeersToUse = options?.numPeersToUse ?? DEFAULT_NUM_PEERS_TO_USE; + this.addLibp2pEventListener = components.events.addEventListener.bind( components.events ); @@ -124,6 +129,12 @@ export class BaseProtocol implements IBaseProtocol { ); } + if (sortedFilteredPeers.length < numPeers) { + this.log.warn( + `Only ${sortedFilteredPeers.length} peers found. Requested ${numPeers}.` + ); + } + return sortedFilteredPeers; } diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index cf8fb126dd..7e6353964c 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -11,13 +11,13 @@ import type { IProtoMessage, IReceiver, Libp2p, - PeerIdStr, ProtocolCreateOptions, PubsubTopic, SingleShardInfo, Unsubscribe } from "@waku/interfaces"; import { DefaultPubsubTopic } from "@waku/interfaces"; +import { messageHashStr } from "@waku/message-hash"; import { WakuMessage } from "@waku/proto"; import { ensurePubsubTopicIsConfigured, @@ -50,10 +50,14 @@ export const FilterCodecs = { PUSH: "/vac/waku/filter-push/2.0.0-beta1" }; +/** + * A subscription object refers to a subscription to a given pubsub topic. + */ class Subscription { - private readonly peer: Peer; + readonly peers: Peer[]; private readonly pubsubTopic: PubsubTopic; private newStream: (peer: Peer) => Promise; + readonly receivedMessagesHashStr: string[] = []; private subscriptionCallbacks: Map< ContentTopic, @@ -62,10 +66,10 @@ class Subscription { constructor( pubsubTopic: PubsubTopic, - remotePeer: Peer, + remotePeers: Peer[], newStream: (peer: Peer) => Promise ) { - this.peer = remotePeer; + this.peers = remotePeers; this.pubsubTopic = pubsubTopic; this.newStream = newStream; this.subscriptionCallbacks = new Map(); @@ -89,53 +93,59 @@ class Subscription { const decodersGroupedByCT = groupByContentTopic(decodersArray); const contentTopics = Array.from(decodersGroupedByCT.keys()); - const stream = await this.newStream(this.peer); + const promises = this.peers.map(async (peer) => { + const stream = await this.newStream(peer); - const request = FilterSubscribeRpc.createSubscribeRequest( - this.pubsubTopic, - contentTopics - ); - - try { - const res = await pipe( - [request.encode()], - lp.encode, - stream, - lp.decode, - async (source) => await all(source) + const request = FilterSubscribeRpc.createSubscribeRequest( + this.pubsubTopic, + contentTopics ); - if (!res || !res.length) { - throw Error( - `No response received for request ${request.requestId}: ${res}` + try { + const res = await pipe( + [request.encode()], + lp.encode, + stream, + lp.decode, + async (source) => await all(source) ); - } - const { statusCode, requestId, statusDesc } = - FilterSubscribeResponse.decode(res[0].slice()); + if (!res || !res.length) { + throw Error( + `No response received for request ${request.requestId}: ${res}` + ); + } + + const { statusCode, requestId, statusDesc } = + FilterSubscribeResponse.decode(res[0].slice()); + + if (statusCode < 200 || statusCode >= 300) { + throw new Error( + `Filter subscribe request ${requestId} failed with status code ${statusCode}: ${statusDesc}` + ); + } - if (statusCode < 200 || statusCode >= 300) { + log.info( + "Subscribed to peer ", + peer.id.toString(), + "for content topics", + contentTopics + ); + } catch (e) { throw new Error( - `Filter subscribe request ${requestId} failed with status code ${statusCode}: ${statusDesc}` + "Error subscribing to peer: " + + peer.id.toString() + + " for content topics: " + + contentTopics + + ": " + + e ); } + }); - log.info( - "Subscribed to peer ", - this.peer.id.toString(), - "for content topics", - contentTopics - ); - } catch (e) { - throw new Error( - "Error subscribing to peer: " + - this.peer.id.toString() + - " for content topics: " + - contentTopics + - ": " + - e - ); - } + const results = await Promise.allSettled(promises); + + this.handleErrors(results, "subscribe"); // Save the callback functions by content topics so they // can easily be removed (reciprocally replaced) if `unsubscribe` (reciprocally `subscribe`) @@ -155,125 +165,195 @@ class Subscription { } async unsubscribe(contentTopics: ContentTopic[]): Promise { - const stream = await this.newStream(this.peer); - const unsubscribeRequest = FilterSubscribeRpc.createUnsubscribeRequest( - this.pubsubTopic, - contentTopics - ); + const promises = this.peers.map(async (peer) => { + const stream = await this.newStream(peer); + const unsubscribeRequest = FilterSubscribeRpc.createUnsubscribeRequest( + this.pubsubTopic, + contentTopics + ); - try { - await pipe([unsubscribeRequest.encode()], lp.encode, stream.sink); - } catch (error) { - throw new Error("Error subscribing: " + error); - } + try { + await pipe([unsubscribeRequest.encode()], lp.encode, stream.sink); + } catch (error) { + throw new Error("Error unsubscribing: " + error); + } - contentTopics.forEach((contentTopic: string) => { - this.subscriptionCallbacks.delete(contentTopic); + contentTopics.forEach((contentTopic: string) => { + this.subscriptionCallbacks.delete(contentTopic); + }); }); + + const results = await Promise.allSettled(promises); + + this.handleErrors(results, "unsubscribe"); } async ping(): Promise { - const stream = await this.newStream(this.peer); + const promises = this.peers.map(async (peer) => { + const stream = await this.newStream(peer); + + const request = FilterSubscribeRpc.createSubscriberPingRequest(); + + try { + const res = await pipe( + [request.encode()], + lp.encode, + stream, + lp.decode, + async (source) => await all(source) + ); - const request = FilterSubscribeRpc.createSubscriberPingRequest(); + if (!res || !res.length) { + throw Error( + `No response received for request ${request.requestId}: ${res}` + ); + } - try { - const res = await pipe( - [request.encode()], - lp.encode, - stream, - lp.decode, - async (source) => await all(source) - ); + const { statusCode, requestId, statusDesc } = + FilterSubscribeResponse.decode(res[0].slice()); - if (!res || !res.length) { - throw Error( - `No response received for request ${request.requestId}: ${res}` - ); + if (statusCode < 200 || statusCode >= 300) { + throw new Error( + `Filter ping request ${requestId} failed with status code ${statusCode}: ${statusDesc}` + ); + } + log.info(`Ping successful for peer ${peer.id.toString()}`); + } catch (error) { + log.error("Error pinging: ", error); + throw error; // Rethrow the actual error instead of wrapping it } + }); - const { statusCode, requestId, statusDesc } = - FilterSubscribeResponse.decode(res[0].slice()); + const results = await Promise.allSettled(promises); - if (statusCode < 200 || statusCode >= 300) { - throw new Error( - `Filter ping request ${requestId} failed with status code ${statusCode}: ${statusDesc}` - ); - } - - log.info("Ping successful"); - } catch (error) { - log.error("Error pinging: ", error); - throw new Error("Error pinging: " + error); - } + this.handleErrors(results, "ping"); } async unsubscribeAll(): Promise { - const stream = await this.newStream(this.peer); - - const request = FilterSubscribeRpc.createUnsubscribeAllRequest( - this.pubsubTopic - ); + const promises = this.peers.map(async (peer) => { + const stream = await this.newStream(peer); - try { - const res = await pipe( - [request.encode()], - lp.encode, - stream, - lp.decode, - async (source) => await all(source) + const request = FilterSubscribeRpc.createUnsubscribeAllRequest( + this.pubsubTopic ); - if (!res || !res.length) { - throw Error( - `No response received for request ${request.requestId}: ${res}` + try { + const res = await pipe( + [request.encode()], + lp.encode, + stream, + lp.decode, + async (source) => await all(source) ); - } - const { statusCode, requestId, statusDesc } = - FilterSubscribeResponse.decode(res[0].slice()); + if (!res || !res.length) { + throw Error( + `No response received for request ${request.requestId}: ${res}` + ); + } + + const { statusCode, requestId, statusDesc } = + FilterSubscribeResponse.decode(res[0].slice()); + + if (statusCode < 200 || statusCode >= 300) { + throw new Error( + `Filter unsubscribe all request ${requestId} failed with status code ${statusCode}: ${statusDesc}` + ); + } - if (statusCode < 200 || statusCode >= 300) { + this.subscriptionCallbacks.clear(); + log.info( + `Unsubscribed from all content topics for pubsub topic ${this.pubsubTopic}` + ); + } catch (error) { throw new Error( - `Filter unsubscribe all request ${requestId} failed with status code ${statusCode}: ${statusDesc}` + "Error unsubscribing from all content topics: " + error ); } + }); - this.subscriptionCallbacks.clear(); - log.info("Unsubscribed from all content topics"); - } catch (error) { - throw new Error("Error unsubscribing from all content topics: " + error); - } + const results = await Promise.allSettled(promises); + + this.handleErrors(results, "unsubscribeAll"); } async processMessage(message: WakuMessage): Promise { - const contentTopic = message.contentTopic; + const hashedMessageStr = messageHashStr( + this.pubsubTopic, + message as IProtoMessage + ); + if (this.receivedMessagesHashStr.includes(hashedMessageStr)) { + log.info("Message already received, skipping"); + return; + } + this.receivedMessagesHashStr.push(hashedMessageStr); + + const { contentTopic } = message; const subscriptionCallback = this.subscriptionCallbacks.get(contentTopic); if (!subscriptionCallback) { log.error("No subscription callback available for ", contentTopic); return; } + log.info( + "Processing message with content topic ", + contentTopic, + " on pubsub topic ", + this.pubsubTopic + ); await pushMessage(subscriptionCallback, this.pubsubTopic, message); } + + // Filter out only the rejected promises and extract & handle their reasons + private handleErrors( + results: PromiseSettledResult[], + type: "ping" | "subscribe" | "unsubscribe" | "unsubscribeAll" + ): void { + const errors = results + .filter( + (result): result is PromiseRejectedResult => + result.status === "rejected" + ) + .map((rejectedResult) => rejectedResult.reason); + + if (errors.length === this.peers.length) { + const errorCounts = new Map(); + // TODO: streamline error logging with https://github.com/orgs/waku-org/projects/2/views/1?pane=issue&itemId=42849952 + errors.forEach((error) => { + const message = error instanceof Error ? error.message : String(error); + errorCounts.set(message, (errorCounts.get(message) || 0) + 1); + }); + + const uniqueErrorMessages = Array.from( + errorCounts, + ([message, count]) => `${message} (occurred ${count} times)` + ).join(", "); + throw new Error(`Error ${type} all peers: ${uniqueErrorMessages}`); + } else if (errors.length > 0) { + // TODO: handle renewing faulty peers with new peers (https://github.com/waku-org/js-waku/issues/1463) + log.warn( + `Some ${type} failed. These will be refreshed with new peers`, + errors + ); + } else { + log.info(`${type} successful for all peers`); + } + } } class Filter extends BaseProtocol implements IReceiver { private activeSubscriptions = new Map(); - private readonly NUM_PEERS_PROTOCOL = 1; private getActiveSubscription( - pubsubTopic: PubsubTopic, - peerIdStr: PeerIdStr + pubsubTopic: PubsubTopic ): Subscription | undefined { - return this.activeSubscriptions.get(`${pubsubTopic}_${peerIdStr}`); + return this.activeSubscriptions.get(pubsubTopic); } private setActiveSubscription( pubsubTopic: PubsubTopic, - peerIdStr: PeerIdStr, subscription: Subscription ): Subscription { - this.activeSubscriptions.set(`${pubsubTopic}_${peerIdStr}`, subscription); + this.activeSubscriptions.set(pubsubTopic, subscription); return subscription; } @@ -287,6 +367,12 @@ class Filter extends BaseProtocol implements IReceiver { this.activeSubscriptions = new Map(); } + /** + * Creates a new subscription to the given pubsub topic. + * The subscription is made to multiple peers for decentralization. + * @param pubsubTopicShardInfo The pubsub topic to subscribe to. + * @returns The subscription object. + */ async createSubscription( pubsubTopicShardInfo: SingleShardInfo | PubsubTopic = DefaultPubsubTopic ): Promise { @@ -297,23 +383,24 @@ class Filter extends BaseProtocol implements IReceiver { ensurePubsubTopicIsConfigured(pubsubTopic, this.pubsubTopics); - const peer = ( - await this.getPeers({ - maxBootstrapPeers: 1, - numPeers: this.NUM_PEERS_PROTOCOL - }) - )[0]; - - if (!peer) { + const peers = await this.getPeers({ + maxBootstrapPeers: 1, + numPeers: this.numPeersToUse + }); + if (peers.length === 0) { throw new Error("No peer found to initiate subscription."); } + log.info( + `Creating filter subscription with ${peers.length} peers: `, + peers.map((peer) => peer.id.toString()) + ); + const subscription = - this.getActiveSubscription(pubsubTopic, peer.id.toString()) ?? + this.getActiveSubscription(pubsubTopic) ?? this.setActiveSubscription( pubsubTopic, - peer.id.toString(), - new Subscription(pubsubTopic, peer, this.getStream.bind(this, peer)) + new Subscription(pubsubTopic, peers, this.getStream.bind(this)) ); return subscription; @@ -360,8 +447,11 @@ class Filter extends BaseProtocol implements IReceiver { } private onRequest(streamData: IncomingStreamData): void { + const { connection, stream } = streamData; + const { remotePeer } = connection; + log.info(`Received message from ${remotePeer.toString()}`); try { - pipe(streamData.stream, lp.decode, async (source) => { + pipe(stream, lp.decode, async (source) => { for await (const bytes of source) { const response = FilterPushRpc.decode(bytes.slice()); @@ -377,11 +467,7 @@ class Filter extends BaseProtocol implements IReceiver { return; } - const peerIdStr = streamData.connection.remotePeer.toString(); - const subscription = this.getActiveSubscription( - pubsubTopic, - peerIdStr - ); + const subscription = this.getActiveSubscription(pubsubTopic); if (!subscription) { log.error( diff --git a/packages/core/src/lib/light_push/index.ts b/packages/core/src/lib/light_push/index.ts index 1ba60fdafb..ffab97cb55 100644 --- a/packages/core/src/lib/light_push/index.ts +++ b/packages/core/src/lib/light_push/index.ts @@ -42,8 +42,6 @@ type PreparePushMessageResult = * Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/). */ class LightPush extends BaseProtocol implements ILightPush { - private readonly NUM_PEERS_PROTOCOL = 1; - constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { super(LightPushCodec, libp2p.components, log, options); } @@ -106,7 +104,7 @@ class LightPush extends BaseProtocol implements ILightPush { const peers = await this.getPeers({ maxBootstrapPeers: 1, - numPeers: this.NUM_PEERS_PROTOCOL + numPeers: this.numPeersToUse }); if (!peers.length) { @@ -172,6 +170,8 @@ class LightPush extends BaseProtocol implements ILightPush { }); const results = await Promise.allSettled(promises); + + // TODO: handle renewing faulty peers with new peers (https://github.com/waku-org/js-waku/issues/1463) const errors = results .filter( ( diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 712ec9fe8e..0d30c230e9 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -69,6 +69,14 @@ export type ProtocolCreateOptions = { * Notes that some values are overridden by {@link @waku/core!WakuNode} to ensure it implements the Waku protocol. */ libp2p?: Partial; + /** + * Number of peers to connect to, for the usage of the protocol. + * This is used by: + * - Light Push to send messages, + * - Filter to retrieve messages. + * Defaults to 3. + */ + numPeersToUse?: number; /** * Byte array used as key for the noise protocol used for connection encryption * by [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create) diff --git a/packages/message-hash/src/index.ts b/packages/message-hash/src/index.ts index ac7982e5b6..4bf9842e19 100644 --- a/packages/message-hash/src/index.ts +++ b/packages/message-hash/src/index.ts @@ -1,6 +1,6 @@ import { sha256 } from "@noble/hashes/sha256"; import type { IProtoMessage } from "@waku/interfaces"; -import { concat, utf8ToBytes } from "@waku/utils/bytes"; +import { bytesToUtf8, concat, utf8ToBytes } from "@waku/utils/bytes"; /** * Deterministic Message Hashing as defined in @@ -27,3 +27,12 @@ export function messageHash( } return sha256(bytes); } + +export function messageHashStr( + pubsubTopic: string, + message: IProtoMessage +): string { + const hash = messageHash(pubsubTopic, message); + const hashStr = bytesToUtf8(hash); + return hashStr; +} diff --git a/packages/tests/src/lib/index.ts b/packages/tests/src/lib/index.ts index 4e7ab34693..550b77f88b 100644 --- a/packages/tests/src/lib/index.ts +++ b/packages/tests/src/lib/index.ts @@ -1,2 +1,294 @@ -export * from "./message_collector.js"; -export * from "./service_node.js"; +import { DecodedMessage } from "@waku/core"; +import { + DefaultPubsubTopic, + PubsubTopic, + ShardingParams +} from "@waku/interfaces"; +import { Logger } from "@waku/utils"; +import { expect } from "chai"; + +import { Args, MessageRpcQuery, MessageRpcResponse } from "../types"; +import { delay, makeLogFileName } from "../utils/index.js"; + +import { MessageCollector } from "./message_collector.js"; +import { defaultArgs, ServiceNode } from "./service_node.js"; + +export { ServiceNode, MessageCollector, defaultArgs }; + +const log = new Logger("test:message-collector"); + +/** + * This class is a wrapper over the ServiceNode & MessageCollector class + * that allows for the creation & handling of multiple ServiceNodes + */ +export class ServiceNodesFleet { + static async createAndRun( + mochaContext: Mocha.Context, + pubsubTopics: PubsubTopic[], + nodesToCreate: number = 3, + strictChecking: boolean = false, + shardInfo?: ShardingParams, + _args?: Args, + withoutFilter = false + ): Promise { + const serviceNodePromises = Array.from( + { length: nodesToCreate }, + async () => { + const node = new ServiceNode( + makeLogFileName(mochaContext) + + Math.random().toString(36).substring(7) + ); + + const args = getArgs(pubsubTopics, shardInfo, _args); + await node.start(args, { + retries: 3 + }); + + return node; + } + ); + + const nodes = await Promise.all(serviceNodePromises); + return new ServiceNodesFleet(nodes, withoutFilter, strictChecking); + } + + /** + * Convert a [[WakuMessage]] to a [[WakuRelayMessage]]. The latter is used + * by the nwaku JSON-RPC API. + */ + static toMessageRpcQuery(message: { + payload: Uint8Array; + contentTopic: string; + timestamp?: Date; + }): MessageRpcQuery { + return ServiceNode.toMessageRpcQuery(message); + } + + public messageCollector: MultipleNodesMessageCollector; + private constructor( + public nodes: ServiceNode[], + relay: boolean, + private strictChecking: boolean + ) { + const _messageCollectors: MessageCollector[] = []; + this.nodes.forEach((node) => { + _messageCollectors.push(new MessageCollector(node)); + }); + this.messageCollector = new MultipleNodesMessageCollector( + _messageCollectors, + relay ? this.nodes : undefined, + strictChecking + ); + } + + get type(): "go-waku" | "nwaku" { + const nodeType = new Set( + this.nodes.map((node) => { + return node.type(); + }) + ); + if (nodeType.size > 1) { + throw new Error("Multiple node types"); + } + return nodeType.values().next().value; + } + + async start(): Promise { + const startPromises = this.nodes.map((node) => node.start()); + await Promise.all(startPromises); + } + + async sendRelayMessage( + message: MessageRpcQuery, + pubsubTopic?: string, + raw = false + ): Promise { + let relayMessagePromises: Promise[]; + if (raw) { + relayMessagePromises = this.nodes.map((node) => + node.rpcCall("post_waku_v2_relay_v1_message", [ + pubsubTopic && pubsubTopic, + message + ]) + ); + } else { + relayMessagePromises = this.nodes.map((node) => + node.sendMessage(message, pubsubTopic) + ); + } + const relayMessages = await Promise.all(relayMessagePromises); + return relayMessages.every((message) => message); + } + + async confirmMessageLength(numMessages: number): Promise { + if (this.strictChecking) { + await Promise.all( + this.nodes.map(async (node) => + expect(await node.messages()).to.have.length(numMessages) + ) + ); + } else { + // Wait for all promises to resolve and check if any meets the condition + const results = await Promise.all( + this.nodes.map(async (node) => { + const msgs = await node.messages(); + return msgs.length === numMessages; + }) + ); + + // Check if at least one result meets the condition + const conditionMet = results.some((result) => result); + expect(conditionMet).to.be.true; + } + } +} + +class MultipleNodesMessageCollector { + callback: (msg: DecodedMessage) => void = () => {}; + messageList: Array = []; + constructor( + private messageCollectors: MessageCollector[], + private relayNodes?: ServiceNode[], + private strictChecking: boolean = false + ) { + this.callback = (msg: DecodedMessage): void => { + log.info("Got a message"); + this.messageList.push(msg); + }; + } + + get count(): number { + return this.messageList.length; + } + + public hasMessage(topic: string, text: string): boolean { + if (this.strictChecking) { + return this.messageCollectors.every((collector) => + collector.hasMessage(topic, text) + ); + } else { + return this.messageCollectors.some((collector) => + collector.hasMessage(topic, text) + ); + } + } + + getMessage(index: number): MessageRpcResponse | DecodedMessage { + return this.messageList[index]; + } + + /** + * Verifies a received message against expected values on all nodes. + * Returns true if any node's collector verifies the message successfully. + */ + verifyReceivedMessage( + index: number, + options: { + expectedMessageText: string | Uint8Array | undefined; + expectedContentTopic?: string; + expectedPubsubTopic?: string; + expectedVersion?: number; + expectedMeta?: Uint8Array; + expectedEphemeral?: boolean; + expectedTimestamp?: bigint | number; + checkTimestamp?: boolean; + } + ): boolean { + if (this.strictChecking) { + return this.messageCollectors.every((collector) => { + try { + collector.verifyReceivedMessage(index, options); + return true; // Verification successful + } catch (error) { + return false; // Verification failed, continue with the next collector + } + }); + } else { + return this.messageCollectors.some((collector) => { + try { + collector.verifyReceivedMessage(index, options); + return true; // Verification successful + } catch (error) { + return false; // Verification failed, continue with the next collector + } + }); + } + } + + /** + * Waits for a total number of messages across all nodes. + */ + async waitForMessages( + numMessages: number, + options?: { + pubsubTopic?: string; + timeoutDuration?: number; + exact?: boolean; + } + ): Promise { + const startTime = Date.now(); + const pubsubTopic = options?.pubsubTopic || DefaultPubsubTopic; + const timeoutDuration = options?.timeoutDuration || 400; + const exact = options?.exact || false; + + while (this.messageList.length < numMessages) { + if (this.relayNodes) { + if (this.strictChecking) { + const results = await Promise.all( + this.relayNodes.map(async (node) => { + const msgs = await node.messages(pubsubTopic); + return msgs.length >= numMessages; + }) + ); + return results.every((result) => result); + } else { + const results = await Promise.all( + this.relayNodes.map(async (node) => { + const msgs = await node.messages(pubsubTopic); + return msgs.length >= numMessages; + }) + ); + return results.some((result) => result); + } + } + + if (Date.now() - startTime > timeoutDuration * numMessages) { + return false; + } + + await delay(10); + } + + if (exact) { + if (this.messageList.length == numMessages) { + return true; + } else { + log.warn( + `Was expecting exactly ${numMessages} messages. Received: ${this.messageList.length}` + ); + + return false; + } + } else { + return true; + } + } +} + +function getArgs( + pubsubTopics: PubsubTopic[], + shardInfo?: ShardingParams, + args?: Args +): Args { + const defaultArgs = { + lightpush: true, + filter: true, + discv5Discovery: true, + peerExchange: true, + relay: true, + pubsubTopic: pubsubTopics, + ...(shardInfo && { clusterId: shardInfo.clusterId }) + } as Args; + + return { ...defaultArgs, ...args }; +} diff --git a/packages/tests/src/lib/message_collector.ts b/packages/tests/src/lib/message_collector.ts index c0d714e660..b8d1b408c6 100644 --- a/packages/tests/src/lib/message_collector.ts +++ b/packages/tests/src/lib/message_collector.ts @@ -5,8 +5,11 @@ import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; import { AssertionError, expect } from "chai"; import { equals } from "uint8arrays/equals"; -import { base64ToUtf8, delay, ServiceNode } from "../index.js"; import { MessageRpcResponse } from "../types.js"; +import { base64ToUtf8 } from "../utils/base64_utf8.js"; +import { delay } from "../utils/delay.js"; + +import { ServiceNode } from "./service_node.js"; const log = new Logger("test:message-collector"); diff --git a/packages/tests/src/utils/index.ts b/packages/tests/src/utils/index.ts index 98f0c03a25..30dee17f95 100644 --- a/packages/tests/src/utils/index.ts +++ b/packages/tests/src/utils/index.ts @@ -4,3 +4,4 @@ export * from "./random_array.js"; export * from "./wait_for_remote_peer_with_codec.js"; export * from "./delay.js"; export * from "./base64_utf8.js"; +export * from "./waitForConnections.js"; diff --git a/packages/tests/src/utils/waitForConnections.ts b/packages/tests/src/utils/waitForConnections.ts new file mode 100644 index 0000000000..b0fc063214 --- /dev/null +++ b/packages/tests/src/utils/waitForConnections.ts @@ -0,0 +1,20 @@ +import type { LightNode } from "@waku/interfaces"; +export async function waitForConnections( + numPeers: number, + waku: LightNode +): Promise { + let connectionsLen = waku.libp2p.getConnections().length; + if (connectionsLen >= numPeers) { + return; + } + await new Promise((resolve) => { + const cb = (): void => { + connectionsLen++; + if (connectionsLen >= numPeers) { + waku.libp2p.removeEventListener("peer:identify", cb); + resolve(); + } + }; + waku.libp2p.addEventListener("peer:identify", cb); + }); +} diff --git a/packages/tests/src/waitForConnections.ts b/packages/tests/src/waitForConnections.ts new file mode 100644 index 0000000000..b0fc063214 --- /dev/null +++ b/packages/tests/src/waitForConnections.ts @@ -0,0 +1,20 @@ +import type { LightNode } from "@waku/interfaces"; +export async function waitForConnections( + numPeers: number, + waku: LightNode +): Promise { + let connectionsLen = waku.libp2p.getConnections().length; + if (connectionsLen >= numPeers) { + return; + } + await new Promise((resolve) => { + const cb = (): void => { + connectionsLen++; + if (connectionsLen >= numPeers) { + waku.libp2p.removeEventListener("peer:identify", cb); + resolve(); + } + }; + waku.libp2p.addEventListener("peer:identify", cb); + }); +} diff --git a/packages/tests/tests/filter/ping.node.spec.ts b/packages/tests/tests/filter/ping.node.spec.ts new file mode 100644 index 0000000000..5527d79cdb --- /dev/null +++ b/packages/tests/tests/filter/ping.node.spec.ts @@ -0,0 +1,117 @@ +import { + DefaultPubsubTopic, + IFilterSubscription, + LightNode +} from "@waku/interfaces"; +import { utf8ToBytes } from "@waku/sdk"; +import { expect } from "chai"; + +import { ServiceNodesFleet } from "../../src/index.js"; + +import { + runMultipleNodes, + teardownNodesWithRedundancy, + TestContentTopic, + TestDecoder, + TestEncoder, + validatePingError +} from "./utils"; + +const runTests = (strictCheckNodes: boolean): void => { + describe(`Waku Filter V2: Ping: Multiple Nodes: Strict Checking: ${strictCheckNodes}`, function () { + // Set the timeout for all tests in this suite. Can be overwritten at test level + this.timeout(10000); + let waku: LightNode; + let serviceNodes: ServiceNodesFleet; + let subscription: IFilterSubscription; + + this.beforeEach(async function () { + this.timeout(15000); + [serviceNodes, waku] = await runMultipleNodes(this, [DefaultPubsubTopic]); + subscription = await waku.filter.createSubscription(); + }); + + this.afterEach(async function () { + this.timeout(15000); + await teardownNodesWithRedundancy(serviceNodes, waku); + }); + + it("Ping on subscribed peer", async function () { + await subscription.subscribe( + [TestDecoder], + serviceNodes.messageCollector.callback + ); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + + // If ping is successfull(node has active subscription) we receive a success status code. + await subscription.ping(); + + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") }); + + // Confirm new messages are received after a ping. + expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq( + true + ); + }); + + it("Ping on peer without subscriptions", async function () { + await validatePingError(subscription); + }); + + it("Ping on unsubscribed peer", async function () { + await subscription.subscribe( + [TestDecoder], + serviceNodes.messageCollector.callback + ); + await subscription.ping(); + await subscription.unsubscribe([TestContentTopic]); + + // Ping imediately after unsubscribe + await validatePingError(subscription); + }); + + it("Reopen subscription with peer with lost subscription", async function () { + const openSubscription = async (): Promise => { + await subscription.subscribe( + [TestDecoder], + serviceNodes.messageCollector.callback + ); + }; + + const unsubscribe = async (): Promise => { + await subscription.unsubscribe([TestContentTopic]); + }; + + const pingAndReinitiateSubscription = async (): Promise => { + try { + await subscription.ping(); + } catch (error) { + if ( + error instanceof Error && + error.message.includes("peer has no subscriptions") + ) { + await openSubscription(); + } else { + throw error; + } + } + }; + + // open subscription & ping -> should pass + await openSubscription(); + await pingAndReinitiateSubscription(); + + // unsubscribe & ping -> should fail and reinitiate subscription + await unsubscribe(); + await pingAndReinitiateSubscription(); + + // ping -> should pass as subscription is reinitiated + await pingAndReinitiateSubscription(); + }); + }); +}; + +[true, false].map(runTests); diff --git a/packages/tests/tests/filter/push.node.spec.ts b/packages/tests/tests/filter/push.node.spec.ts new file mode 100644 index 0000000000..3907de3cbf --- /dev/null +++ b/packages/tests/tests/filter/push.node.spec.ts @@ -0,0 +1,321 @@ +import { waitForRemotePeer } from "@waku/core"; +import { + DefaultPubsubTopic, + IFilterSubscription, + LightNode, + Protocols +} from "@waku/interfaces"; +import { utf8ToBytes } from "@waku/sdk"; +import { expect } from "chai"; + +import { + delay, + ServiceNodesFleet, + TEST_STRING, + TEST_TIMESTAMPS +} from "../../src/index.js"; + +import { + messageText, + runMultipleNodes, + teardownNodesWithRedundancy, + TestContentTopic, + TestDecoder, + TestEncoder +} from "./utils.js"; + +const runTests = (strictCheckNodes: boolean): void => { + describe(`Waku Filter V2: FilterPush: Multiple Nodes: Strict Checking: ${strictCheckNodes}`, function () { + // Set the timeout for all tests in this suite. Can be overwritten at test level + this.timeout(10000); + let waku: LightNode; + let serviceNodes: ServiceNodesFleet; + let subscription: IFilterSubscription; + + this.beforeEach(async function () { + this.timeout(15000); + [serviceNodes, waku] = await runMultipleNodes(this, [DefaultPubsubTopic]); + subscription = await waku.filter.createSubscription(); + }); + + this.afterEach(async function () { + this.timeout(15000); + await teardownNodesWithRedundancy(serviceNodes, waku); + }); + + TEST_STRING.forEach((testItem) => { + it(`Check received message containing ${testItem.description}`, async function () { + await subscription.subscribe( + [TestDecoder], + serviceNodes.messageCollector.callback + ); + await waku.lightPush.send(TestEncoder, { + payload: utf8ToBytes(testItem.value) + }); + + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: testItem.value, + expectedContentTopic: TestContentTopic + }); + }); + }); + + TEST_TIMESTAMPS.forEach((testItem) => { + it(`Check received message with timestamp: ${testItem} `, async function () { + await subscription.subscribe( + [TestDecoder], + serviceNodes.messageCollector.callback + ); + await delay(400); + + await serviceNodes.sendRelayMessage( + { + contentTopic: TestContentTopic, + payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"), + timestamp: testItem as any + }, + DefaultPubsubTopic, + true + ); + + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + checkTimestamp: false, + expectedContentTopic: TestContentTopic + }); + + // Check if the timestamp matches + const timestamp = serviceNodes.messageCollector.getMessage(0).timestamp; + if (testItem == undefined) { + expect(timestamp).to.eq(undefined); + } + if (timestamp !== undefined && timestamp instanceof Date) { + expect(testItem?.toString()).to.contain( + timestamp.getTime().toString() + ); + } + }); + }); + + it("Check message with invalid timestamp is not received", async function () { + await subscription.subscribe( + [TestDecoder], + serviceNodes.messageCollector.callback + ); + await delay(400); + + await serviceNodes.sendRelayMessage( + { + contentTopic: TestContentTopic, + payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"), + timestamp: "2023-09-06T12:05:38.609Z" as any + }, + DefaultPubsubTopic, + true + ); + + // Verify that no message was received + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + false + ); + }); + + it("Check message on other pubsub topic is not received", async function () { + await subscription.subscribe( + [TestDecoder], + serviceNodes.messageCollector.callback + ); + await delay(400); + + await serviceNodes.sendRelayMessage( + { + contentTopic: TestContentTopic, + payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"), + timestamp: BigInt(Date.now()) * BigInt(1000000) + }, + "DefaultPubsubTopic" + ); + + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + false + ); + }); + + it("Check message with no pubsub topic is not received", async function () { + await subscription.subscribe( + [TestDecoder], + serviceNodes.messageCollector.callback + ); + await delay(400); + + await serviceNodes.sendRelayMessage( + { + contentTopic: TestContentTopic, + payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"), + timestamp: BigInt(Date.now()) * BigInt(1000000) + }, + undefined, + true + ); + + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + false + ); + }); + + it("Check message with no content topic is not received", async function () { + await subscription.subscribe( + [TestDecoder], + serviceNodes.messageCollector.callback + ); + await delay(400); + + await serviceNodes.sendRelayMessage( + { + payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"), + timestamp: BigInt(Date.now()) * BigInt(1000000) + }, + DefaultPubsubTopic + ); + + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + false + ); + }); + + it("Check message with no payload is not received", async function () { + await subscription.subscribe( + [TestDecoder], + serviceNodes.messageCollector.callback + ); + await delay(400); + + await serviceNodes.sendRelayMessage( + { + contentTopic: TestContentTopic, + timestamp: BigInt(Date.now()) * BigInt(1000000), + payload: undefined as any + }, + DefaultPubsubTopic, + true + ); + + // For go-waku the message is received (it is possible to send a message with no payload) + if (serviceNodes.type == "go-waku") { + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + } else { + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + false + ); + } + }); + + it("Check message with non string payload is not received", async function () { + await subscription.subscribe( + [TestDecoder], + serviceNodes.messageCollector.callback + ); + await delay(400); + + await serviceNodes.sendRelayMessage( + { + contentTopic: TestContentTopic, + payload: 12345 as unknown as string, + timestamp: BigInt(Date.now()) * BigInt(1000000) + }, + DefaultPubsubTopic + ); + + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + false + ); + }); + + // Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done + it.skip("Check message received after jswaku node is restarted", async function () { + // Subscribe and send message + await subscription.subscribe( + [TestDecoder], + serviceNodes.messageCollector.callback + ); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + + // Restart js-waku node + await waku.stop(); + expect(waku.isStarted()).to.eq(false); + await waku.start(); + expect(waku.isStarted()).to.eq(true); + + // Redo the connection and create a new subscription + for (const node of this.serviceNodes) { + await waku.dial(await node.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); + } + subscription = await waku.filter.createSubscription(); + await subscription.subscribe( + [TestDecoder], + serviceNodes.messageCollector.callback + ); + + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") }); + + // Confirm both messages were received. + expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: "M1", + expectedContentTopic: TestContentTopic + }); + serviceNodes.messageCollector.verifyReceivedMessage(1, { + expectedMessageText: "M2", + expectedContentTopic: TestContentTopic + }); + }); + + // Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done + it.skip("Check message received after nwaku node is restarted", async function () { + await subscription.subscribe( + [TestDecoder], + serviceNodes.messageCollector.callback + ); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + + // Restart nwaku node + await teardownNodesWithRedundancy(serviceNodes, []); + await serviceNodes.start(); + await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); + + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") }); + + // Confirm both messages were received. + expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: "M1", + expectedContentTopic: TestContentTopic + }); + serviceNodes.messageCollector.verifyReceivedMessage(1, { + expectedMessageText: "M2", + expectedContentTopic: TestContentTopic + }); + }); + }); +}; + +[true, false].map(runTests); diff --git a/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts b/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts index f7f17bb0bc..dbaad69bd9 100644 --- a/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts @@ -21,7 +21,8 @@ import { ServiceNode, tearDownNodes } from "../../../src/index.js"; -import { runNodes } from "../utils.js"; + +import { runNodes } from "./utils.js"; describe("Waku Filter V2: Multiple PubsubTopics", function () { // Set the timeout for all tests in this suite. Can be overwritten at test level diff --git a/packages/tests/tests/filter/single_node/ping.node.spec.ts b/packages/tests/tests/filter/single_node/ping.node.spec.ts index 5f73c8cbd5..8528c48dc2 100644 --- a/packages/tests/tests/filter/single_node/ping.node.spec.ts +++ b/packages/tests/tests/filter/single_node/ping.node.spec.ts @@ -1,6 +1,9 @@ -import { DefaultPubsubTopic } from "@waku/interfaces"; -import type { IFilterSubscription, LightNode } from "@waku/interfaces"; -import { utf8ToBytes } from "@waku/utils/bytes"; +import { + DefaultPubsubTopic, + IFilterSubscription, + LightNode +} from "@waku/interfaces"; +import { utf8ToBytes } from "@waku/sdk"; import { expect } from "chai"; import { @@ -9,13 +12,14 @@ import { tearDownNodes } from "../../../src/index.js"; import { - runNodes, TestContentTopic, TestDecoder, TestEncoder, validatePingError } from "../utils.js"; +import { runNodes } from "./utils.js"; + describe("Waku Filter V2: Ping", function () { // Set the timeout for all tests in this suite. Can be overwritten at test level this.timeout(10000); diff --git a/packages/tests/tests/filter/single_node/push.node.spec.ts b/packages/tests/tests/filter/single_node/push.node.spec.ts index 969fa99b87..383f9e3955 100644 --- a/packages/tests/tests/filter/single_node/push.node.spec.ts +++ b/packages/tests/tests/filter/single_node/push.node.spec.ts @@ -1,8 +1,11 @@ import { waitForRemotePeer } from "@waku/core"; -import type { IFilterSubscription, LightNode } from "@waku/interfaces"; -import { DefaultPubsubTopic } from "@waku/interfaces"; -import { Protocols } from "@waku/interfaces"; -import { utf8ToBytes } from "@waku/utils/bytes"; +import { + DefaultPubsubTopic, + IFilterSubscription, + LightNode, + Protocols +} from "@waku/interfaces"; +import { utf8ToBytes } from "@waku/sdk"; import { expect } from "chai"; import { @@ -13,9 +16,9 @@ import { TEST_STRING, TEST_TIMESTAMPS } from "../../../src/index.js"; +import { runNodes } from "../../light-push/utils"; import { messageText, - runNodes, TestContentTopic, TestDecoder, TestEncoder diff --git a/packages/tests/tests/filter/single_node/subscribe.node.spec.ts b/packages/tests/tests/filter/single_node/subscribe.node.spec.ts index e0ca48498d..5f64dee724 100644 --- a/packages/tests/tests/filter/single_node/subscribe.node.spec.ts +++ b/packages/tests/tests/filter/single_node/subscribe.node.spec.ts @@ -1,7 +1,10 @@ import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core"; -import type { IFilterSubscription, LightNode } from "@waku/interfaces"; -import { DefaultPubsubTopic } from "@waku/interfaces"; -import { Protocols } from "@waku/interfaces"; +import { + DefaultPubsubTopic, + IFilterSubscription, + LightNode, + Protocols +} from "@waku/interfaces"; import { ecies, generatePrivateKey, @@ -9,7 +12,7 @@ import { getPublicKey, symmetric } from "@waku/message-encryption"; -import { utf8ToBytes } from "@waku/utils/bytes"; +import { utf8ToBytes } from "@waku/sdk"; import { expect } from "chai"; import { @@ -25,13 +28,14 @@ import { import { messagePayload, messageText, - runNodes, TestContentTopic, TestDecoder, TestEncoder } from "../utils.js"; -describe("Waku Filter V2: Subscribe", function () { +import { runNodes } from "./utils.js"; + +describe("Waku Filter V2: Subscribe: Single Service Node", function () { // Set the timeout for all tests in this suite. Can be overwritten at test level this.timeout(10000); let waku: LightNode; @@ -371,7 +375,7 @@ describe("Waku Filter V2: Subscribe", function () { // Send messages to the first set of topics. for (let i = 0; i < topicCount1; i++) { - const messageText = `Message for Topic ${i + 1}`; + const messageText = `Topic Set 1: Message Number: ${i + 1}`; await waku.lightPush.send(td1.encoders[i], { payload: utf8ToBytes(messageText) }); @@ -379,7 +383,8 @@ describe("Waku Filter V2: Subscribe", function () { // Send messages to the second set of topics. for (let i = 0; i < topicCount2; i++) { - const messageText = `Message for Topic ${i + 1}`; + const messageText = `Topic Set 2: Message Number: ${i + 1}`; + await waku.lightPush.send(td2.encoders[i], { payload: utf8ToBytes(messageText) }); diff --git a/packages/tests/tests/filter/single_node/unsubscribe.node.spec.ts b/packages/tests/tests/filter/single_node/unsubscribe.node.spec.ts index 4b6f545930..2496f2ecfe 100644 --- a/packages/tests/tests/filter/single_node/unsubscribe.node.spec.ts +++ b/packages/tests/tests/filter/single_node/unsubscribe.node.spec.ts @@ -1,7 +1,7 @@ import { createDecoder, createEncoder } from "@waku/core"; -import type { IFilterSubscription, LightNode } from "@waku/interfaces"; -import { DefaultPubsubTopic } from "@waku/interfaces"; -import { utf8ToBytes } from "@waku/utils/bytes"; +import { DefaultPubsubTopic, IFilterSubscription } from "@waku/interfaces"; +import { LightNode } from "@waku/interfaces"; +import { utf8ToBytes } from "@waku/sdk"; import { expect } from "chai"; import { @@ -10,10 +10,10 @@ import { ServiceNode, tearDownNodes } from "../../../src/index.js"; +import { runNodes } from "../../light-push/utils"; import { messagePayload, messageText, - runNodes, TestContentTopic, TestDecoder, TestEncoder diff --git a/packages/tests/tests/filter/single_node/utils.ts b/packages/tests/tests/filter/single_node/utils.ts new file mode 100644 index 0000000000..f52959c3fc --- /dev/null +++ b/packages/tests/tests/filter/single_node/utils.ts @@ -0,0 +1,67 @@ +import { waitForRemotePeer } from "@waku/core"; +import { + DefaultPubsubTopic, + LightNode, + ProtocolCreateOptions, + Protocols, + ShardingParams +} from "@waku/interfaces"; +import { createLightNode } from "@waku/sdk"; +import { Logger } from "@waku/utils"; +import { Context } from "mocha"; + +import { + makeLogFileName, + NOISE_KEY_1, + ServiceNode +} from "../../../src/index.js"; + +export const log = new Logger("test:filter:single_node"); + +export async function runNodes( + context: Context, + //TODO: change this to use `ShardInfo` instead of `string[]` + pubsubTopics: string[], + shardInfo?: ShardingParams +): Promise<[ServiceNode, LightNode]> { + const nwaku = new ServiceNode(makeLogFileName(context)); + + await nwaku.start( + { + filter: true, + lightpush: true, + relay: true, + pubsubTopic: pubsubTopics, + ...(shardInfo && { clusterId: shardInfo.clusterId }) + }, + { retries: 3 } + ); + + const waku_options: ProtocolCreateOptions = { + staticNoiseKey: NOISE_KEY_1, + libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }, + pubsubTopics: shardInfo ? undefined : pubsubTopics, + ...((pubsubTopics.length !== 1 || + pubsubTopics[0] !== DefaultPubsubTopic) && { + shardInfo: shardInfo + }) + }; + + log.info("Starting js waku node with :", JSON.stringify(waku_options)); + let waku: LightNode | undefined; + try { + waku = await createLightNode(waku_options); + await waku.start(); + } catch (error) { + log.error("jswaku node failed to start:", error); + } + + if (waku) { + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); + await nwaku.ensureSubscriptions(pubsubTopics); + return [nwaku, waku]; + } else { + throw new Error("Failed to initialize waku"); + } +} diff --git a/packages/tests/tests/filter/subscribe.node.spec.ts b/packages/tests/tests/filter/subscribe.node.spec.ts new file mode 100644 index 0000000000..60afc62567 --- /dev/null +++ b/packages/tests/tests/filter/subscribe.node.spec.ts @@ -0,0 +1,453 @@ +import { createDecoder, createEncoder } from "@waku/core"; +import { + DefaultPubsubTopic, + IFilterSubscription, + LightNode +} from "@waku/interfaces"; +import { + ecies, + generatePrivateKey, + generateSymmetricKey, + getPublicKey, + symmetric +} from "@waku/message-encryption"; +import { utf8ToBytes } from "@waku/sdk"; +import { expect } from "chai"; + +import { + delay, + generateTestData, + ServiceNodesFleet, + TEST_STRING +} from "../../src/index.js"; + +import { + messagePayload, + messageText, + runMultipleNodes, + teardownNodesWithRedundancy, + TestContentTopic, + TestDecoder, + TestEncoder +} from "./utils.js"; + +const runTests = (strictCheckNodes: boolean): void => { + describe(`Waku Filter V2: Subscribe: Multiple Service Nodes: Strict Check mode: ${strictCheckNodes}`, function () { + this.timeout(100000); + let waku: LightNode; + let serviceNodes: ServiceNodesFleet; + let subscription: IFilterSubscription; + + this.beforeEach(async function () { + this.timeout(15000); + [serviceNodes, waku] = await runMultipleNodes( + this, + [DefaultPubsubTopic], + strictCheckNodes + ); + subscription = await waku.filter.createSubscription(); + }); + + this.afterEach(async function () { + this.timeout(15000); + await teardownNodesWithRedundancy(serviceNodes, waku); + }); + + it("Subscribe and receive messages via lightPush", async function () { + expect(waku.libp2p.getConnections()).has.length(3); + + await subscription.subscribe( + [TestDecoder], + serviceNodes.messageCollector.callback + ); + + await waku.lightPush.send(TestEncoder, messagePayload); + + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic + }); + + await serviceNodes.confirmMessageLength(1); + }); + + it("Subscribe and receive ecies encrypted messages via lightPush", async function () { + const privateKey = generatePrivateKey(); + const publicKey = getPublicKey(privateKey); + const encoder = ecies.createEncoder({ + contentTopic: TestContentTopic, + publicKey + }); + const decoder = ecies.createDecoder(TestContentTopic, privateKey); + + await subscription.subscribe( + [decoder], + serviceNodes.messageCollector.callback + ); + + await waku.lightPush.send(encoder, messagePayload); + + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic, + expectedVersion: 1 + }); + + await serviceNodes.confirmMessageLength(1); + }); + + it("Subscribe and receive symmetrically encrypted messages via lightPush", async function () { + const symKey = generateSymmetricKey(); + const encoder = symmetric.createEncoder({ + contentTopic: TestContentTopic, + symKey + }); + const decoder = symmetric.createDecoder(TestContentTopic, symKey); + + await subscription.subscribe( + [decoder], + serviceNodes.messageCollector.callback + ); + + await waku.lightPush.send(encoder, messagePayload); + + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic, + expectedVersion: 1 + }); + + await serviceNodes.confirmMessageLength(1); + }); + + it("Subscribe and receive messages via waku relay post", async function () { + await subscription.subscribe( + [TestDecoder], + serviceNodes.messageCollector.callback + ); + + await delay(400); + + // Send a test message using the relay post method. + const relayMessage = ServiceNodesFleet.toMessageRpcQuery({ + contentTopic: TestContentTopic, + payload: utf8ToBytes(messageText) + }); + await serviceNodes.sendRelayMessage(relayMessage); + + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic + }); + + await serviceNodes.confirmMessageLength(1); + }); + + it("Subscribe and receive 2 messages on the same topic", async function () { + await subscription.subscribe( + [TestDecoder], + serviceNodes.messageCollector.callback + ); + + await waku.lightPush.send(TestEncoder, messagePayload); + + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic + }); + + // Send another message on the same topic. + const newMessageText = "Filtering still works!"; + await waku.lightPush.send(TestEncoder, { + payload: utf8ToBytes(newMessageText) + }); + + // Verify that the second message was successfully received. + expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(1, { + expectedMessageText: newMessageText, + expectedContentTopic: TestContentTopic + }); + + await serviceNodes.confirmMessageLength(2); + }); + + it("Subscribe and receive messages on 2 different content topics", async function () { + // Subscribe to the first content topic and send a message. + await subscription.subscribe( + [TestDecoder], + serviceNodes.messageCollector.callback + ); + await waku.lightPush.send(TestEncoder, messagePayload); + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic + }); + + // Modify subscription to include a new content topic and send a message. + const newMessageText = "Filtering still works!"; + const newMessagePayload = { payload: utf8ToBytes(newMessageText) }; + const newContentTopic = "/test/2/waku-filter"; + const newEncoder = createEncoder({ contentTopic: newContentTopic }); + const newDecoder = createDecoder(newContentTopic); + await subscription.subscribe( + [newDecoder], + serviceNodes.messageCollector.callback + ); + await waku.lightPush.send(newEncoder, { + payload: utf8ToBytes(newMessageText) + }); + expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(1, { + expectedContentTopic: newContentTopic, + expectedMessageText: newMessageText + }); + + // Send another message on the initial content topic to verify it still works. + await waku.lightPush.send(TestEncoder, newMessagePayload); + expect(await serviceNodes.messageCollector.waitForMessages(3)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(2, { + expectedMessageText: newMessageText, + expectedContentTopic: TestContentTopic + }); + + await serviceNodes.confirmMessageLength(3); + }); + + it("Subscribe and receives messages on 20 topics", async function () { + const topicCount = 20; + const td = generateTestData(topicCount); + + // Subscribe to all 20 topics. + for (let i = 0; i < topicCount; i++) { + await subscription.subscribe( + [td.decoders[i]], + serviceNodes.messageCollector.callback + ); + } + + // Send a unique message on each topic. + for (let i = 0; i < topicCount; i++) { + await waku.lightPush.send(td.encoders[i], { + payload: utf8ToBytes(`Message for Topic ${i + 1}`) + }); + } + + // Verify that each message was received on the corresponding topic. + expect(await serviceNodes.messageCollector.waitForMessages(20)).to.eq( + true + ); + td.contentTopics.forEach((topic, index) => { + serviceNodes.messageCollector.verifyReceivedMessage(index, { + expectedContentTopic: topic, + expectedMessageText: `Message for Topic ${index + 1}` + }); + }); + }); + + it("Subscribe to 30 topics at once and receives messages", async function () { + const topicCount = 30; + const td = generateTestData(topicCount); + + // Subscribe to all 30 topics. + await subscription.subscribe( + td.decoders, + serviceNodes.messageCollector.callback + ); + + // Send a unique message on each topic. + for (let i = 0; i < topicCount; i++) { + await waku.lightPush.send(td.encoders[i], { + payload: utf8ToBytes(`Message for Topic ${i + 1}`) + }); + } + + // Verify that each message was received on the corresponding topic. + expect(await serviceNodes.messageCollector.waitForMessages(30)).to.eq( + true + ); + td.contentTopics.forEach((topic, index) => { + serviceNodes.messageCollector.verifyReceivedMessage(index, { + expectedContentTopic: topic, + expectedMessageText: `Message for Topic ${index + 1}` + }); + }); + }); + + it("Error when try to subscribe to more than 30 topics", async function () { + const topicCount = 31; + const td = generateTestData(topicCount); + + // Attempt to subscribe to 31 topics + try { + await subscription.subscribe( + td.decoders, + serviceNodes.messageCollector.callback + ); + throw new Error( + "Subscribe to 31 topics was successful but was expected to fail with a specific error." + ); + } catch (err) { + if ( + err instanceof Error && + err.message.includes("exceeds maximum content topics: 30") + ) { + return; + } else { + throw err; + } + } + }); + + it("Overlapping topic subscription", async function () { + // Define two sets of test data with overlapping topics. + const topicCount1 = 2; + const td1 = generateTestData(topicCount1); + const topicCount2 = 4; + const td2 = generateTestData(topicCount2); + + // Subscribe to the first set of topics. + await subscription.subscribe( + td1.decoders, + serviceNodes.messageCollector.callback + ); + + // Subscribe to the second set of topics which has overlapping topics with the first set. + await subscription.subscribe( + td2.decoders, + serviceNodes.messageCollector.callback + ); + + // Send messages to the first set of topics. + for (let i = 0; i < topicCount1; i++) { + const messageText = `Topic Set 1: Message Number: ${i + 1}`; + await waku.lightPush.send(td1.encoders[i], { + payload: utf8ToBytes(messageText) + }); + } + + // Send messages to the second set of topics. + for (let i = 0; i < topicCount2; i++) { + const messageText = `Topic Set 2: Message Number: ${i + 1}`; + await waku.lightPush.send(td2.encoders[i], { + payload: utf8ToBytes(messageText) + }); + } + + // Check if all messages were received. + // Since there are overlapping topics, there should be 6 messages in total (2 from the first set + 4 from the second set). + expect( + await serviceNodes.messageCollector.waitForMessages(6, { exact: true }) + ).to.eq(true); + }); + + it("Refresh subscription", async function () { + await subscription.subscribe( + [TestDecoder], + serviceNodes.messageCollector.callback + ); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); + + // Resubscribe (refresh) to the same topic and send another message. + await subscription.subscribe( + [TestDecoder], + serviceNodes.messageCollector.callback + ); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") }); + + // Confirm both messages were received. + expect( + await serviceNodes.messageCollector.waitForMessages(2, { exact: true }) + ).to.eq(true); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: "M1", + expectedContentTopic: TestContentTopic + }); + serviceNodes.messageCollector.verifyReceivedMessage(1, { + expectedMessageText: "M2", + expectedContentTopic: TestContentTopic + }); + }); + + TEST_STRING.forEach((testItem) => { + it(`Subscribe to topic containing ${testItem.description} and receive message`, async function () { + const newContentTopic = testItem.value; + const newEncoder = createEncoder({ contentTopic: newContentTopic }); + const newDecoder = createDecoder(newContentTopic); + + await subscription.subscribe( + [newDecoder], + serviceNodes.messageCollector.callback + ); + await waku.lightPush.send(newEncoder, messagePayload); + + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: newContentTopic + }); + }); + }); + + it("Add multiple subscription objects on single nwaku node", async function () { + await subscription.subscribe( + [TestDecoder], + serviceNodes.messageCollector.callback + ); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); + + // Create a second subscription on a different topic + const subscription2 = await waku.filter.createSubscription(); + const newContentTopic = "/test/2/waku-filter"; + const newEncoder = createEncoder({ contentTopic: newContentTopic }); + const newDecoder = createDecoder(newContentTopic); + await subscription2.subscribe( + [newDecoder], + serviceNodes.messageCollector.callback + ); + + await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") }); + + // Check if both messages were received + expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: "M1", + expectedContentTopic: TestContentTopic + }); + serviceNodes.messageCollector.verifyReceivedMessage(1, { + expectedContentTopic: newContentTopic, + expectedMessageText: "M2" + }); + }); + }); +}; + +[true, false].map((strictCheckNodes) => runTests(strictCheckNodes)); diff --git a/packages/tests/tests/filter/unsubscribe.node.spec.ts b/packages/tests/tests/filter/unsubscribe.node.spec.ts new file mode 100644 index 0000000000..55b96e9777 --- /dev/null +++ b/packages/tests/tests/filter/unsubscribe.node.spec.ts @@ -0,0 +1,216 @@ +import { createDecoder, createEncoder } from "@waku/core"; +import { + DefaultPubsubTopic, + IFilterSubscription, + LightNode +} from "@waku/interfaces"; +import { utf8ToBytes } from "@waku/sdk"; +import { expect } from "chai"; + +import { generateTestData, ServiceNodesFleet } from "../../src/index.js"; + +import { + messagePayload, + messageText, + runMultipleNodes, + teardownNodesWithRedundancy, + TestContentTopic, + TestDecoder, + TestEncoder +} from "./utils.js"; + +const runTests = (strictCheckNodes: boolean): void => { + describe(`Waku Filter V2: Unsubscribe: Multiple Nodes: Strict Checking: ${strictCheckNodes}`, function () { + // Set the timeout for all tests in this suite. Can be overwritten at test level + this.timeout(10000); + let waku: LightNode; + let serviceNodes: ServiceNodesFleet; + let subscription: IFilterSubscription; + + this.beforeEach(async function () { + this.timeout(15000); + [serviceNodes, waku] = await runMultipleNodes(this, [DefaultPubsubTopic]); + subscription = await waku.filter.createSubscription(); + }); + + this.afterEach(async function () { + this.timeout(15000); + await teardownNodesWithRedundancy(serviceNodes, waku); + }); + + it("Unsubscribe 1 topic - node subscribed to 1 topic", async function () { + await subscription.subscribe( + [TestDecoder], + serviceNodes.messageCollector.callback + ); + await waku.lightPush.send(TestEncoder, messagePayload); + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + + // Unsubscribe from the topic and send again + await subscription.unsubscribe([TestContentTopic]); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); + expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq( + false + ); + + // Check that from 2 messages send only the 1st was received + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic + }); + expect(serviceNodes.messageCollector.count).to.eq(1); + + await serviceNodes.confirmMessageLength(2); + }); + + it("Unsubscribe 1 topic - node subscribed to 2 topics", async function () { + // Subscribe to 2 topics and send messages + await subscription.subscribe( + [TestDecoder], + serviceNodes.messageCollector.callback + ); + const newContentTopic = "/test/2/waku-filter"; + const newEncoder = createEncoder({ contentTopic: newContentTopic }); + const newDecoder = createDecoder(newContentTopic); + await subscription.subscribe( + [newDecoder], + serviceNodes.messageCollector.callback + ); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); + await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") }); + expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq( + true + ); + + // Unsubscribe from the first topic and send again + await subscription.unsubscribe([TestContentTopic]); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M3") }); + await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M4") }); + expect(await serviceNodes.messageCollector.waitForMessages(3)).to.eq( + true + ); + + // Check that from 4 messages send 3 were received + expect(serviceNodes.messageCollector.count).to.eq(3); + await serviceNodes.confirmMessageLength(4); + }); + + it("Unsubscribe 2 topics - node subscribed to 2 topics", async function () { + // Subscribe to 2 topics and send messages + await subscription.subscribe( + [TestDecoder], + serviceNodes.messageCollector.callback + ); + const newContentTopic = "/test/2/waku-filter"; + const newEncoder = createEncoder({ contentTopic: newContentTopic }); + const newDecoder = createDecoder(newContentTopic); + await subscription.subscribe( + [newDecoder], + serviceNodes.messageCollector.callback + ); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); + await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") }); + expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq( + true + ); + + // Unsubscribe from both and send again + await subscription.unsubscribe([TestContentTopic, newContentTopic]); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M3") }); + await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M4") }); + expect(await serviceNodes.messageCollector.waitForMessages(3)).to.eq( + false + ); + + // Check that from 4 messages send 2 were received + expect(serviceNodes.messageCollector.count).to.eq(2); + await serviceNodes.confirmMessageLength(4); + }); + + it("Unsubscribe topics the node is not subscribed to", async function () { + // Subscribe to 1 topic and send message + await subscription.subscribe( + [TestDecoder], + serviceNodes.messageCollector.callback + ); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + + expect(serviceNodes.messageCollector.count).to.eq(1); + + // Unsubscribe from topics that the node is not not subscribed to and send again + await subscription.unsubscribe([]); + await subscription.unsubscribe(["/test/2/waku-filter"]); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") }); + expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq( + true + ); + + // Check that both messages were received + expect(serviceNodes.messageCollector.count).to.eq(2); + await serviceNodes.confirmMessageLength(2); + }); + + it("Unsubscribes all - node subscribed to 1 topic", async function () { + await subscription.subscribe( + [TestDecoder], + serviceNodes.messageCollector.callback + ); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + expect(serviceNodes.messageCollector.count).to.eq(1); + + // Unsubscribe from all topics and send again + await subscription.unsubscribeAll(); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") }); + expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq( + false + ); + + // Check that from 2 messages send only the 1st was received + expect(serviceNodes.messageCollector.count).to.eq(1); + await serviceNodes.confirmMessageLength(2); + }); + + it("Unsubscribes all - node subscribed to 10 topics", async function () { + // Subscribe to 10 topics and send message + const topicCount = 10; + const td = generateTestData(topicCount); + await subscription.subscribe( + td.decoders, + serviceNodes.messageCollector.callback + ); + for (let i = 0; i < topicCount; i++) { + await waku.lightPush.send(td.encoders[i], { + payload: utf8ToBytes(`M${i + 1}`) + }); + } + expect(await serviceNodes.messageCollector.waitForMessages(10)).to.eq( + true + ); + + // Unsubscribe from all topics and send again + await subscription.unsubscribeAll(); + for (let i = 0; i < topicCount; i++) { + await waku.lightPush.send(td.encoders[i], { + payload: utf8ToBytes(`M${topicCount + i + 1}`) + }); + } + expect(await serviceNodes.messageCollector.waitForMessages(11)).to.eq( + false + ); + + // Check that from 20 messages send only 10 were received + expect(serviceNodes.messageCollector.count).to.eq(10); + await serviceNodes.confirmMessageLength(20); + }); + }); +}; + +[true, false].map(runTests); diff --git a/packages/tests/tests/filter/utils.ts b/packages/tests/tests/filter/utils.ts index bcc3b26d6f..a49830656a 100644 --- a/packages/tests/tests/filter/utils.ts +++ b/packages/tests/tests/filter/utils.ts @@ -3,15 +3,22 @@ import { DefaultPubsubTopic, IFilterSubscription, LightNode, + ProtocolCreateOptions, Protocols, - ShardingParams + ShardingParams, + Waku } from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; import { Logger } from "@waku/utils"; import { utf8ToBytes } from "@waku/utils/bytes"; import { Context } from "mocha"; +import pRetry from "p-retry"; -import { makeLogFileName, NOISE_KEY_1, ServiceNode } from "../../src/index.js"; +import { + NOISE_KEY_1, + ServiceNodesFleet, + waitForConnections +} from "../../src/index.js"; // Constants for test configuration. export const log = new Logger("test:filter"); @@ -42,28 +49,31 @@ export async function validatePingError( } } -export async function runNodes( +export async function runMultipleNodes( context: Context, //TODO: change this to use `ShardInfo` instead of `string[]` pubsubTopics: string[], - shardInfo?: ShardingParams -): Promise<[ServiceNode, LightNode]> { - const nwaku = new ServiceNode(makeLogFileName(context)); - - await nwaku.start( - { - filter: true, - lightpush: true, - relay: true, - pubsubTopic: pubsubTopics, - ...(shardInfo && { clusterId: shardInfo.clusterId }) - }, - { retries: 3 } + strictChecking: boolean = false, + shardInfo?: ShardingParams, + numServiceNodes = 3, + withoutFilter = false +): Promise<[ServiceNodesFleet, LightNode]> { + // create numServiceNodes nodes + const serviceNodes = await ServiceNodesFleet.createAndRun( + context, + pubsubTopics, + numServiceNodes, + strictChecking, + shardInfo, + undefined, + withoutFilter ); - const waku_options = { + const waku_options: ProtocolCreateOptions = { staticNoiseKey: NOISE_KEY_1, - libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }, + libp2p: { + addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } + }, pubsubTopics: shardInfo ? undefined : pubsubTopics, ...((pubsubTopics.length !== 1 || pubsubTopics[0] !== DefaultPubsubTopic) && { @@ -84,18 +94,61 @@ export async function runNodes( throw new Error("Failed to initialize waku"); } - await waku.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); - await nwaku.ensureSubscriptions(pubsubTopics); + for (const node of serviceNodes.nodes) { + await waku.dial(await node.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); + await node.ensureSubscriptions(pubsubTopics); - const wakuConnections = waku.libp2p.getConnections(); - const nwakuPeers = await nwaku.peers(); + const wakuConnections = waku.libp2p.getConnections(); + const nodePeers = await node.peers(); - if (wakuConnections.length < 1 || nwakuPeers.length < 1) { - throw new Error( - `Expected at least 1 peer in each node. Got waku connections: ${wakuConnections.length} and nwaku: ${nwakuPeers.length}` - ); + if (wakuConnections.length < 1 || nodePeers.length < 1) { + throw new Error( + `Expected at least 1 peer in each node. Got waku connections: ${wakuConnections.length} and service nodes: ${nodePeers.length}` + ); + } } - return [nwaku, waku]; + await waitForConnections(numServiceNodes, waku); + + return [serviceNodes, waku]; +} + +export async function teardownNodesWithRedundancy( + serviceNodes: ServiceNodesFleet, + wakuNodes: Waku | Waku[] +): Promise { + const wNodes = Array.isArray(wakuNodes) ? wakuNodes : [wakuNodes]; + + const stopNwakuNodes = serviceNodes.nodes.map(async (node) => { + await pRetry( + async () => { + try { + await node.stop(); + } catch (error) { + log.error("Service Node failed to stop:", error); + throw error; + } + }, + { retries: 3 } + ); + }); + + const stopWakuNodes = wNodes.map(async (waku) => { + if (waku) { + await pRetry( + async () => { + try { + await waku.stop(); + } catch (error) { + log.error("Waku failed to stop:", error); + throw error; + } + }, + { retries: 3 } + ); + } + }); + + await Promise.all([...stopNwakuNodes, ...stopWakuNodes]); } diff --git a/packages/tests/tests/light-push/index.node.spec.ts b/packages/tests/tests/light-push/index.node.spec.ts new file mode 100644 index 0000000000..770a37636d --- /dev/null +++ b/packages/tests/tests/light-push/index.node.spec.ts @@ -0,0 +1,268 @@ +import { createEncoder } from "@waku/core"; +import { + DefaultPubsubTopic, + IRateLimitProof, + LightNode, + SendError +} from "@waku/interfaces"; +import { utf8ToBytes } from "@waku/sdk"; +import { expect } from "chai"; + +import { + generateRandomUint8Array, + ServiceNodesFleet, + TEST_STRING +} from "../../src"; +import { + runMultipleNodes, + teardownNodesWithRedundancy +} from "../filter/utils.js"; + +import { + messagePayload, + messageText, + TestContentTopic, + TestEncoder +} from "./utils"; + +const runTests = (strictNodeCheck: boolean): void => { + const numServiceNodes = 3; + describe(`Waku Light Push: Multiple Nodes: Strict Check: ${strictNodeCheck}`, function () { + // Set the timeout for all tests in this suite. Can be overwritten at test level + this.timeout(15000); + let waku: LightNode; + let serviceNodes: ServiceNodesFleet; + + this.beforeEach(async function () { + this.timeout(15000); + [serviceNodes, waku] = await runMultipleNodes( + this, + [DefaultPubsubTopic], + strictNodeCheck, + undefined, + numServiceNodes, + true + ); + }); + + this.afterEach(async function () { + this.timeout(15000); + await teardownNodesWithRedundancy(serviceNodes, waku); + }); + + TEST_STRING.forEach((testItem) => { + it(`Push message with ${testItem.description} payload`, async function () { + const pushResponse = await waku.lightPush.send(TestEncoder, { + payload: utf8ToBytes(testItem.value) + }); + expect(pushResponse.recipients.length).to.eq(numServiceNodes); + + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: testItem.value, + expectedContentTopic: TestContentTopic + }); + }); + }); + + it("Push 30 different messages", async function () { + const generateMessageText = (index: number): string => `M${index}`; + + for (let i = 0; i < 30; i++) { + const pushResponse = await waku.lightPush.send(TestEncoder, { + payload: utf8ToBytes(generateMessageText(i)) + }); + expect(pushResponse.recipients.length).to.eq(numServiceNodes); + } + + expect(await serviceNodes.messageCollector.waitForMessages(30)).to.eq( + true + ); + + for (let i = 0; i < 30; i++) { + serviceNodes.messageCollector.verifyReceivedMessage(i, { + expectedMessageText: generateMessageText(i), + expectedContentTopic: TestContentTopic + }); + } + }); + + it("Throws when trying to push message with empty payload", async function () { + const pushResponse = await waku.lightPush.send(TestEncoder, { + payload: new Uint8Array() + }); + + expect(pushResponse.recipients.length).to.eq(0); + console.log("validated 1"); + expect(pushResponse.errors).to.include(SendError.EMPTY_PAYLOAD); + console.log("validated 2"); + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + false + ); + console.log("validated 3"); + }); + + TEST_STRING.forEach((testItem) => { + it(`Push message with content topic containing ${testItem.description}`, async function () { + const customEncoder = createEncoder({ + contentTopic: testItem.value + }); + const pushResponse = await waku.lightPush.send( + customEncoder, + messagePayload + ); + expect(pushResponse.recipients.length).to.eq(numServiceNodes); + + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: testItem.value + }); + }); + }); + + it("Fails to push message with empty content topic", async function () { + try { + createEncoder({ contentTopic: "" }); + expect.fail("Expected an error but didn't get one"); + } catch (error) { + expect((error as Error).message).to.equal( + "Content topic must be specified" + ); + } + }); + + it("Push message with meta", async function () { + const customTestEncoder = createEncoder({ + contentTopic: TestContentTopic, + metaSetter: () => new Uint8Array(10) + }); + + const pushResponse = await waku.lightPush.send( + customTestEncoder, + messagePayload + ); + expect(pushResponse.recipients.length).to.eq(numServiceNodes); + + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic + }); + }); + + it("Fails to push message with large meta", async function () { + const customTestEncoder = createEncoder({ + contentTopic: TestContentTopic, + metaSetter: () => new Uint8Array(105024) // see the note below *** + }); + + // *** note: this test used 10 ** 6 when `nwaku` node had MaxWakuMessageSize == 1MiB ( 1*2^20 .) + // `nwaku` establishes the max lightpush msg size as `const MaxRpcSize* = MaxWakuMessageSize + 64 * 1024` + // see: https://github.com/waku-org/nwaku/blob/07beea02095035f4f4c234ec2dec1f365e6955b8/waku/waku_lightpush/rpc_codec.nim#L15 + // In the PR https://github.com/waku-org/nwaku/pull/2298 we reduced the MaxWakuMessageSize + // from 1MiB to 150KiB. Therefore, the 105024 number comes from substracting ( 1*2^20 - 150*2^10 ) + // to the original 10^6 that this test had when MaxWakuMessageSize == 1*2^20 + + const pushResponse = await waku.lightPush.send( + customTestEncoder, + messagePayload + ); + + if (serviceNodes.type == "go-waku") { + expect(pushResponse.recipients.length).to.eq(numServiceNodes); + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic + }); + } else { + expect(pushResponse.recipients.length).to.eq(0); + expect(pushResponse.errors).to.include(SendError.REMOTE_PEER_REJECTED); + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + false + ); + } + }); + + it("Push message with rate limit", async function () { + const rateLimitProof: IRateLimitProof = { + proof: utf8ToBytes("proofData"), + merkleRoot: utf8ToBytes("merkleRootData"), + epoch: utf8ToBytes("epochData"), + shareX: utf8ToBytes("shareXData"), + shareY: utf8ToBytes("shareYData"), + nullifier: utf8ToBytes("nullifierData"), + rlnIdentifier: utf8ToBytes("rlnIdentifierData") + }; + + const pushResponse = await waku.lightPush.send(TestEncoder, { + payload: utf8ToBytes(messageText), + rateLimitProof: rateLimitProof + }); + expect(pushResponse.recipients.length).to.eq(numServiceNodes); + + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic + }); + }); + + [ + Date.now() - 3600000 * 24 * 356, + Date.now() - 3600000, + Date.now() + 3600000 + ].forEach((testItem) => { + it(`Push message with custom timestamp: ${testItem}`, async function () { + const pushResponse = await waku.lightPush.send(TestEncoder, { + payload: utf8ToBytes(messageText), + timestamp: new Date(testItem) + }); + expect(pushResponse.recipients.length).to.eq(numServiceNodes); + + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedTimestamp: testItem, + expectedContentTopic: TestContentTopic + }); + }); + }); + + it("Push message equal or less that 1MB", async function () { + const bigPayload = generateRandomUint8Array(65536); + const pushResponse = await waku.lightPush.send(TestEncoder, { + payload: bigPayload + }); + expect(pushResponse.recipients.length).to.greaterThan(0); + }); + + it("Fails to push message bigger that 1MB", async function () { + const MB = 1024 ** 2; + + const pushResponse = await waku.lightPush.send(TestEncoder, { + payload: generateRandomUint8Array(MB + 65536) + }); + expect(pushResponse.recipients.length).to.eq(0); + expect(pushResponse.errors).to.include(SendError.SIZE_TOO_BIG); + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + false + ); + }); + }); +}; + +[true].map(runTests); diff --git a/packages/tests/tests/light-push/single_node/index.node.spec.ts b/packages/tests/tests/light-push/single_node/index.node.spec.ts index cc12f846bc..e36b98aed1 100644 --- a/packages/tests/tests/light-push/single_node/index.node.spec.ts +++ b/packages/tests/tests/light-push/single_node/index.node.spec.ts @@ -5,7 +5,7 @@ import { LightNode, SendError } from "@waku/interfaces"; -import { utf8ToBytes } from "@waku/utils/bytes"; +import { utf8ToBytes } from "@waku/sdk"; import { expect } from "chai"; import { @@ -23,7 +23,7 @@ import { TestEncoder } from "../utils.js"; -describe("Waku Light Push", function () { +describe("Waku Light Push: Single Node", function () { // Set the timeout for all tests in this suite. Can be overwritten at test level this.timeout(15000); let waku: LightNode; diff --git a/packages/tests/tests/light-push/utils.ts b/packages/tests/tests/light-push/utils.ts index c279fa713a..3e131c470c 100644 --- a/packages/tests/tests/light-push/utils.ts +++ b/packages/tests/tests/light-push/utils.ts @@ -26,6 +26,7 @@ export async function runNodes( await nwaku.start( { lightpush: true, + filter: true, relay: true, pubsubTopic: pubsubTopics, ...(shardInfo && { clusterId: shardInfo.clusterId })