Skip to content

Commit

Permalink
chore!: refactor store protocol for readability (#1456)
Browse files Browse the repository at this point in the history
* refactor store protocol for readability

* update interface

* fix: test

* rm: comments

* Update packages/core/src/lib/store/index.ts

Co-authored-by: fryorcraken <[email protected]>

* fix commit

---------

Co-authored-by: fryorcraken <[email protected]>
  • Loading branch information
danisharora099 and fryorcraken authored Aug 28, 2023
1 parent 45baa76 commit 2389977
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 45 deletions.
93 changes: 57 additions & 36 deletions packages/core/src/lib/store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,54 @@ class Store extends BaseProtocol implements IStore {
this.options = options ?? {};
}

/**
* Processes messages based on the provided callback and options.
* @private
*/
private async processMessages<T extends IDecodedMessage>(
messages: Promise<T | undefined>[],
callback: (message: T) => Promise<void | boolean> | boolean | void,
options?: QueryOptions
): Promise<boolean> {
let abort = false;
const messagesOrUndef: Array<T | undefined> = await Promise.all(messages);
let processedMessages: Array<T> = messagesOrUndef.filter(isDefined);

if (this.shouldReverseOrder(options)) {
processedMessages = processedMessages.reverse();
}

await Promise.all(
processedMessages.map(async (msg) => {
if (msg && !abort) {
abort = Boolean(await callback(msg));
}
})
);

return abort;
}

/**
* Determines whether to reverse the order of messages based on the provided options.
*
* Messages in pages are ordered from oldest (first) to most recent (last).
* https://github.com/vacp2p/rfc/issues/533
*
* @private
*/
private shouldReverseOrder(options?: QueryOptions): boolean {
return (
typeof options?.pageDirection === "undefined" ||
options?.pageDirection === PageDirection.BACKWARD
);
}

/**
* @deprecated Use `queryWithOrderedCallback` instead
**/
queryOrderedCallback = this.queryWithOrderedCallback;

/**
* Do a query to a Waku Store to retrieve historical/missed messages.
*
Expand All @@ -103,42 +151,20 @@ class Store extends BaseProtocol implements IStore {
* or if an error is encountered when processing the reply,
* or if two decoders with the same content topic are passed.
*/
async queryOrderedCallback<T extends IDecodedMessage>(
async queryWithOrderedCallback<T extends IDecodedMessage>(
decoders: IDecoder<T>[],
callback: (message: T) => Promise<void | boolean> | boolean | void,
options?: QueryOptions
): Promise<void> {
let abort = false;
for await (const promises of this.queryGenerator(decoders, options)) {
if (abort) break;
const messagesOrUndef: Array<T | undefined> = await Promise.all(promises);

let messages: Array<T> = messagesOrUndef.filter(isDefined);

// Messages in pages are ordered from oldest (first) to most recent (last).
// https://github.com/vacp2p/rfc/issues/533
if (
typeof options?.pageDirection === "undefined" ||
options?.pageDirection === PageDirection.BACKWARD
) {
messages = messages.reverse();
}

await Promise.all(
messages.map(async (msg) => {
if (msg && !abort) {
abort = Boolean(await callback(msg));
}
})
);
if (await this.processMessages(promises, callback, options)) break;
}
}

/**
* Do a query to a Waku Store to retrieve historical/missed messages.
*
* The callback function takes a `Promise<WakuMessage>` in input,
* useful if messages needs to be decrypted and performance matters.
* useful if messages need to be decrypted and performance matters.
*
* The order of the messages passed to the callback is as follows:
* - within a page, messages are expected to be ordered from oldest to most recent
Expand All @@ -152,25 +178,23 @@ class Store extends BaseProtocol implements IStore {
* or if an error is encountered when processing the reply,
* or if two decoders with the same content topic are passed.
*/
async queryCallbackOnPromise<T extends IDecodedMessage>(
async queryWithPromiseCallback<T extends IDecodedMessage>(
decoders: IDecoder<T>[],
callback: (
message: Promise<T | undefined>
) => Promise<void | boolean> | boolean | void,
options?: QueryOptions
): Promise<void> {
let abort = false;
let promises: Promise<void>[] = [];
for await (const page of this.queryGenerator(decoders, options)) {
const _promises = page.map(async (msg) => {
if (!abort) {
abort = Boolean(await callback(msg));
}
const _promises = page.map(async (msgPromise) => {
if (abort) return;
abort = Boolean(await callback(msgPromise));
});

promises = promises.concat(_promises);
await Promise.all(_promises);
if (abort) break;
}
await Promise.all(promises);
}

/**
Expand All @@ -183,9 +207,6 @@ class Store extends BaseProtocol implements IStore {
* as follows:
* - within a page, messages SHOULD be ordered from oldest to most recent
* - pages direction depends on { @link QueryOptions.pageDirection }
*
* However, there is no way to guarantee the behavior of the remote node.
*
* @throws If not able to reach a Waku Store peer to query,
* or if an error is encountered when processing the reply,
* or if two decoders with the same content topic are passed.
Expand Down
4 changes: 2 additions & 2 deletions packages/interfaces/src/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ export type StoreQueryOptions = {
} & ProtocolOptions;

export interface IStore extends IBaseProtocol {
queryOrderedCallback: <T extends IDecodedMessage>(
queryWithOrderedCallback: <T extends IDecodedMessage>(
decoders: IDecoder<T>[],
callback: (message: T) => Promise<void | boolean> | boolean | void,
options?: StoreQueryOptions
) => Promise<void>;
queryCallbackOnPromise: <T extends IDecodedMessage>(
queryWithPromiseCallback: <T extends IDecodedMessage>(
decoders: IDecoder<T>[],
callback: (
message: Promise<T | undefined>
Expand Down
14 changes: 7 additions & 7 deletions packages/tests/tests/store.node.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ describe("Waku Store", () => {
await waitForRemotePeer(waku, [Protocols.Store]);

const messages: IMessage[] = [];
await waku.store.queryCallbackOnPromise(
await waku.store.queryWithPromiseCallback(
[TestDecoder],
async (msgPromise) => {
const msg = await msgPromise;
Expand Down Expand Up @@ -246,7 +246,7 @@ describe("Waku Store", () => {

const desiredMsgs = 14;
const messages: IMessage[] = [];
await waku.store.queryCallbackOnPromise(
await waku.store.queryWithPromiseCallback(
[TestDecoder],
async (msgPromise) => {
const msg = await msgPromise;
Expand Down Expand Up @@ -285,7 +285,7 @@ describe("Waku Store", () => {
await waitForRemotePeer(waku, [Protocols.Store]);

const messages: IMessage[] = [];
await waku.store.queryOrderedCallback(
await waku.store.queryWithOrderedCallback(
[TestDecoder],
async (msg) => {
messages.push(msg);
Expand Down Expand Up @@ -324,7 +324,7 @@ describe("Waku Store", () => {
await waitForRemotePeer(waku, [Protocols.Store]);

let messages: IMessage[] = [];
await waku.store.queryOrderedCallback(
await waku.store.queryWithOrderedCallback(
[TestDecoder],
async (msg) => {
messages.push(msg);
Expand Down Expand Up @@ -491,7 +491,7 @@ describe("Waku Store", () => {
const nwakuPeerId = await nwaku.getPeerId();

const firstMessages: IMessage[] = [];
await waku.store.queryOrderedCallback(
await waku.store.queryWithOrderedCallback(
[TestDecoder],
(msg) => {
if (msg) {
Expand All @@ -505,7 +505,7 @@ describe("Waku Store", () => {
);

const bothMessages: IMessage[] = [];
await waku.store.queryOrderedCallback(
await waku.store.queryWithOrderedCallback(
[TestDecoder],
async (msg) => {
bothMessages.push(msg);
Expand Down Expand Up @@ -552,7 +552,7 @@ describe("Waku Store", () => {

const desiredMsgs = 14;
const messages: IMessage[] = [];
await waku.store.queryOrderedCallback(
await waku.store.queryWithOrderedCallback(
[TestDecoder],
async (msg) => {
messages.push(msg);
Expand Down

0 comments on commit 2389977

Please sign in to comment.