From fb19fed31d07eb2fd088a9d827465352e9bba1ef Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Mon, 7 Aug 2023 11:59:44 +0530 Subject: [PATCH 1/6] refactor store protocol for readability --- packages/core/src/lib/store/index.ts | 89 +++++++++++++++++----------- 1 file changed, 53 insertions(+), 36 deletions(-) diff --git a/packages/core/src/lib/store/index.ts b/packages/core/src/lib/store/index.ts index e3f0c07b03..c7f9ffae25 100644 --- a/packages/core/src/lib/store/index.ts +++ b/packages/core/src/lib/store/index.ts @@ -86,6 +86,49 @@ class Store extends BaseProtocol implements IStore { this.options = options ?? {}; } + /** + * Processes messages based on the provided callback and options. + * @private + */ + private async processMessages( + messages: Promise[], + callback: (message: T) => Promise | boolean | void, + options?: QueryOptions + ): Promise { + let abort = false; + const messagesOrUndef: Array = await Promise.all(messages); + let processedMessages: Array = messagesOrUndef.filter(isDefined); + + if (this.shouldReverseOrder(options)) { + processedMessages = processedMessages.reverse(); + } + + await Promise.all( + processedMessages.map(async (msg) => { + if (msg && !abort) { + abort = Boolean(await callback(msg)); + } + }) + ); + + return abort; + } + + /** + * Determines whether to reverse the order of messages based on the provided options. + * + * Messages in pages are ordered from oldest (first) to most recent (last). + * https://github.com/vacp2p/rfc/issues/533 + * + * @private + */ + private shouldReverseOrder(options?: QueryOptions): boolean { + return ( + typeof options?.pageDirection === "undefined" || + options?.pageDirection === PageDirection.BACKWARD + ); + } + /** * Do a query to a Waku Store to retrieve historical/missed messages. * @@ -103,42 +146,20 @@ class Store extends BaseProtocol implements IStore { * or if an error is encountered when processing the reply, * or if two decoders with the same content topic are passed. */ - async queryOrderedCallback( + async queryWithOrderedCallback( decoders: IDecoder[], callback: (message: T) => Promise | boolean | void, options?: QueryOptions ): Promise { - let abort = false; for await (const promises of this.queryGenerator(decoders, options)) { - if (abort) break; - const messagesOrUndef: Array = await Promise.all(promises); - - let messages: Array = messagesOrUndef.filter(isDefined); - - // Messages in pages are ordered from oldest (first) to most recent (last). - // https://github.com/vacp2p/rfc/issues/533 - if ( - typeof options?.pageDirection === "undefined" || - options?.pageDirection === PageDirection.BACKWARD - ) { - messages = messages.reverse(); - } - - await Promise.all( - messages.map(async (msg) => { - if (msg && !abort) { - abort = Boolean(await callback(msg)); - } - }) - ); + if (await this.processMessages(promises, callback, options)) break; } } /** * Do a query to a Waku Store to retrieve historical/missed messages. - * * The callback function takes a `Promise` in input, - * useful if messages needs to be decrypted and performance matters. + * useful if messages need to be decrypted and performance matters. * * The order of the messages passed to the callback is as follows: * - within a page, messages are expected to be ordered from oldest to most recent @@ -152,7 +173,7 @@ class Store extends BaseProtocol implements IStore { * or if an error is encountered when processing the reply, * or if two decoders with the same content topic are passed. */ - async queryCallbackOnPromise( + async queryWithPromiseCallback( decoders: IDecoder[], callback: ( message: Promise @@ -160,17 +181,15 @@ class Store extends BaseProtocol implements IStore { options?: QueryOptions ): Promise { let abort = false; - let promises: Promise[] = []; for await (const page of this.queryGenerator(decoders, options)) { - const _promises = page.map(async (msg) => { - if (!abort) { - abort = Boolean(await callback(msg)); - } + const _promises = page.map(async (msgPromise) => { + if (abort) return; + abort = Boolean(await callback(msgPromise)); }); - promises = promises.concat(_promises); + await Promise.all(_promises); + if (abort) break; } - await Promise.all(promises); } /** @@ -183,9 +202,6 @@ class Store extends BaseProtocol implements IStore { * as follows: * - within a page, messages SHOULD be ordered from oldest to most recent * - pages direction depends on { @link QueryOptions.pageDirection } - * - * However, there is no way to guarantee the behavior of the remote node. - * * @throws If not able to reach a Waku Store peer to query, * or if an error is encountered when processing the reply, * or if two decoders with the same content topic are passed. @@ -194,6 +210,7 @@ class Store extends BaseProtocol implements IStore { decoders: IDecoder[], options?: QueryOptions ): AsyncGenerator[]> { + // The existing code for the queryGenerator method remains the same const { pubSubTopic = DefaultPubSubTopic } = this.options; let startTime, endTime; From 13d6affa7c324bcc0e6aa9af91e7ef6f5dbc750d Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Mon, 7 Aug 2023 12:07:08 +0530 Subject: [PATCH 2/6] update interface --- packages/interfaces/src/store.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/interfaces/src/store.ts b/packages/interfaces/src/store.ts index 8063174490..b44f52a88a 100644 --- a/packages/interfaces/src/store.ts +++ b/packages/interfaces/src/store.ts @@ -46,12 +46,12 @@ export type StoreQueryOptions = { } & ProtocolOptions; export interface IStore extends IBaseProtocol { - queryOrderedCallback: ( + queryWithOrderedCallback: ( decoders: IDecoder[], callback: (message: T) => Promise | boolean | void, options?: StoreQueryOptions ) => Promise; - queryCallbackOnPromise: ( + queryWithPromiseCallback: ( decoders: IDecoder[], callback: ( message: Promise From 44b4fa8d97a3f192591915b9438278bc28aad813 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Mon, 7 Aug 2023 17:42:02 +0530 Subject: [PATCH 3/6] fix: test --- packages/tests/tests/store.node.spec.ts | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/packages/tests/tests/store.node.spec.ts b/packages/tests/tests/store.node.spec.ts index 59cb0c84da..f792b2fe78 100644 --- a/packages/tests/tests/store.node.spec.ts +++ b/packages/tests/tests/store.node.spec.ts @@ -204,7 +204,7 @@ describe("Waku Store", () => { await waitForRemotePeer(waku, [Protocols.Store]); const messages: IMessage[] = []; - await waku.store.queryCallbackOnPromise( + await waku.store.queryWithPromiseCallback( [TestDecoder], async (msgPromise) => { const msg = await msgPromise; @@ -246,7 +246,7 @@ describe("Waku Store", () => { const desiredMsgs = 14; const messages: IMessage[] = []; - await waku.store.queryCallbackOnPromise( + await waku.store.queryWithPromiseCallback( [TestDecoder], async (msgPromise) => { const msg = await msgPromise; @@ -285,7 +285,7 @@ describe("Waku Store", () => { await waitForRemotePeer(waku, [Protocols.Store]); const messages: IMessage[] = []; - await waku.store.queryOrderedCallback( + await waku.store.queryWithOrderedCallback( [TestDecoder], async (msg) => { messages.push(msg); @@ -324,7 +324,7 @@ describe("Waku Store", () => { await waitForRemotePeer(waku, [Protocols.Store]); let messages: IMessage[] = []; - await waku.store.queryOrderedCallback( + await waku.store.queryWithOrderedCallback( [TestDecoder], async (msg) => { messages.push(msg); @@ -491,7 +491,7 @@ describe("Waku Store", () => { const nwakuPeerId = await nwaku.getPeerId(); const firstMessages: IMessage[] = []; - await waku.store.queryOrderedCallback( + await waku.store.queryWithOrderedCallback( [TestDecoder], (msg) => { if (msg) { @@ -505,7 +505,7 @@ describe("Waku Store", () => { ); const bothMessages: IMessage[] = []; - await waku.store.queryOrderedCallback( + await waku.store.queryWithOrderedCallback( [TestDecoder], async (msg) => { bothMessages.push(msg); @@ -552,7 +552,7 @@ describe("Waku Store", () => { const desiredMsgs = 14; const messages: IMessage[] = []; - await waku.store.queryOrderedCallback( + await waku.store.queryWithOrderedCallback( [TestDecoder], async (msg) => { messages.push(msg); From ccf571534a289fee91f4a0335960ca3b99460641 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Mon, 7 Aug 2023 17:43:29 +0530 Subject: [PATCH 4/6] rm: comments --- packages/core/src/lib/store/index.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/core/src/lib/store/index.ts b/packages/core/src/lib/store/index.ts index c7f9ffae25..30fd9eb0d5 100644 --- a/packages/core/src/lib/store/index.ts +++ b/packages/core/src/lib/store/index.ts @@ -210,7 +210,6 @@ class Store extends BaseProtocol implements IStore { decoders: IDecoder[], options?: QueryOptions ): AsyncGenerator[]> { - // The existing code for the queryGenerator method remains the same const { pubSubTopic = DefaultPubSubTopic } = this.options; let startTime, endTime; From 677300b7c4a97d8f4f06e5e7bdf1f5d76190e4c9 Mon Sep 17 00:00:00 2001 From: Danish Arora <35004822+danisharora099@users.noreply.github.com> Date: Tue, 22 Aug 2023 15:12:06 +0530 Subject: [PATCH 5/6] Update packages/core/src/lib/store/index.ts Co-authored-by: fryorcraken <110212804+fryorcraken@users.noreply.github.com> --- packages/core/src/lib/store/index.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packages/core/src/lib/store/index.ts b/packages/core/src/lib/store/index.ts index 0800723bcd..0944c0d70d 100644 --- a/packages/core/src/lib/store/index.ts +++ b/packages/core/src/lib/store/index.ts @@ -146,6 +146,10 @@ class Store extends BaseProtocol implements IStore { * or if an error is encountered when processing the reply, * or if two decoders with the same content topic are passed. */ +/** +* @deprecated Use `queryWithOrderedCallback` instead +**/ + async queryOrderedCallback = queryWithOrderedCallback async queryWithOrderedCallback( decoders: IDecoder[], callback: (message: T) => Promise | boolean | void, From e25bbae4c89a229171b63796e5661856a0c4508c Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Tue, 22 Aug 2023 15:55:10 +0530 Subject: [PATCH 6/6] fix commit --- packages/core/src/lib/store/index.ts | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/packages/core/src/lib/store/index.ts b/packages/core/src/lib/store/index.ts index 0944c0d70d..05068c1358 100644 --- a/packages/core/src/lib/store/index.ts +++ b/packages/core/src/lib/store/index.ts @@ -129,6 +129,11 @@ class Store extends BaseProtocol implements IStore { ); } + /** + * @deprecated Use `queryWithOrderedCallback` instead + **/ + queryOrderedCallback = this.queryWithOrderedCallback; + /** * Do a query to a Waku Store to retrieve historical/missed messages. * @@ -146,10 +151,6 @@ class Store extends BaseProtocol implements IStore { * or if an error is encountered when processing the reply, * or if two decoders with the same content topic are passed. */ -/** -* @deprecated Use `queryWithOrderedCallback` instead -**/ - async queryOrderedCallback = queryWithOrderedCallback async queryWithOrderedCallback( decoders: IDecoder[], callback: (message: T) => Promise | boolean | void,