Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore!: refactor store protocol for readability #1456

Merged
merged 9 commits into from
Aug 28, 2023
88 changes: 52 additions & 36 deletions packages/core/src/lib/store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,49 @@ class Store extends BaseProtocol implements IStore {
this.options = options ?? {};
}

/**
* Processes messages based on the provided callback and options.
* @private
*/
private async processMessages<T extends IDecodedMessage>(
messages: Promise<T | undefined>[],
callback: (message: T) => Promise<void | boolean> | boolean | void,
options?: QueryOptions
): Promise<boolean> {
let abort = false;
const messagesOrUndef: Array<T | undefined> = await Promise.all(messages);
let processedMessages: Array<T> = messagesOrUndef.filter(isDefined);

if (this.shouldReverseOrder(options)) {
processedMessages = processedMessages.reverse();
}

await Promise.all(
processedMessages.map(async (msg) => {
if (msg && !abort) {
abort = Boolean(await callback(msg));
}
})
);

return abort;
}

/**
* Determines whether to reverse the order of messages based on the provided options.
*
* Messages in pages are ordered from oldest (first) to most recent (last).
* https://github.com/vacp2p/rfc/issues/533
*
* @private
*/
private shouldReverseOrder(options?: QueryOptions): boolean {
return (
typeof options?.pageDirection === "undefined" ||
options?.pageDirection === PageDirection.BACKWARD
);
}

/**
* Do a query to a Waku Store to retrieve historical/missed messages.
*
Expand All @@ -103,42 +146,20 @@ class Store extends BaseProtocol implements IStore {
* or if an error is encountered when processing the reply,
* or if two decoders with the same content topic are passed.
*/
async queryOrderedCallback<T extends IDecodedMessage>(
async queryWithOrderedCallback<T extends IDecodedMessage>(
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
decoders: IDecoder<T>[],
callback: (message: T) => Promise<void | boolean> | boolean | void,
options?: QueryOptions
): Promise<void> {
let abort = false;
for await (const promises of this.queryGenerator(decoders, options)) {
if (abort) break;
const messagesOrUndef: Array<T | undefined> = await Promise.all(promises);

let messages: Array<T> = messagesOrUndef.filter(isDefined);

// Messages in pages are ordered from oldest (first) to most recent (last).
// https://github.com/vacp2p/rfc/issues/533
if (
typeof options?.pageDirection === "undefined" ||
options?.pageDirection === PageDirection.BACKWARD
) {
messages = messages.reverse();
}

await Promise.all(
messages.map(async (msg) => {
if (msg && !abort) {
abort = Boolean(await callback(msg));
}
})
);
if (await this.processMessages(promises, callback, options)) break;
}
}

/**
* Do a query to a Waku Store to retrieve historical/missed messages.
*
* The callback function takes a `Promise<WakuMessage>` in input,
* useful if messages needs to be decrypted and performance matters.
* useful if messages need to be decrypted and performance matters.
*
* The order of the messages passed to the callback is as follows:
* - within a page, messages are expected to be ordered from oldest to most recent
Expand All @@ -152,25 +173,23 @@ class Store extends BaseProtocol implements IStore {
* or if an error is encountered when processing the reply,
* or if two decoders with the same content topic are passed.
*/
async queryCallbackOnPromise<T extends IDecodedMessage>(
async queryWithPromiseCallback<T extends IDecodedMessage>(
decoders: IDecoder<T>[],
callback: (
message: Promise<T | undefined>
) => Promise<void | boolean> | boolean | void,
options?: QueryOptions
): Promise<void> {
let abort = false;
let promises: Promise<void>[] = [];
for await (const page of this.queryGenerator(decoders, options)) {
const _promises = page.map(async (msg) => {
if (!abort) {
abort = Boolean(await callback(msg));
}
const _promises = page.map(async (msgPromise) => {
if (abort) return;
abort = Boolean(await callback(msgPromise));
});

promises = promises.concat(_promises);
await Promise.all(_promises);
if (abort) break;
}
await Promise.all(promises);
}

/**
Expand All @@ -183,9 +202,6 @@ class Store extends BaseProtocol implements IStore {
* as follows:
* - within a page, messages SHOULD be ordered from oldest to most recent
* - pages direction depends on { @link QueryOptions.pageDirection }
*
* However, there is no way to guarantee the behavior of the remote node.
*
* @throws If not able to reach a Waku Store peer to query,
* or if an error is encountered when processing the reply,
* or if two decoders with the same content topic are passed.
Expand Down
4 changes: 2 additions & 2 deletions packages/interfaces/src/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ export type StoreQueryOptions = {
} & ProtocolOptions;

export interface IStore extends IBaseProtocol {
queryOrderedCallback: <T extends IDecodedMessage>(
queryWithOrderedCallback: <T extends IDecodedMessage>(
decoders: IDecoder<T>[],
callback: (message: T) => Promise<void | boolean> | boolean | void,
options?: StoreQueryOptions
) => Promise<void>;
queryCallbackOnPromise: <T extends IDecodedMessage>(
queryWithPromiseCallback: <T extends IDecodedMessage>(
decoders: IDecoder<T>[],
callback: (
message: Promise<T | undefined>
Expand Down
14 changes: 7 additions & 7 deletions packages/tests/tests/store.node.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ describe("Waku Store", () => {
await waitForRemotePeer(waku, [Protocols.Store]);

const messages: IMessage[] = [];
await waku.store.queryCallbackOnPromise(
await waku.store.queryWithPromiseCallback(
[TestDecoder],
async (msgPromise) => {
const msg = await msgPromise;
Expand Down Expand Up @@ -246,7 +246,7 @@ describe("Waku Store", () => {

const desiredMsgs = 14;
const messages: IMessage[] = [];
await waku.store.queryCallbackOnPromise(
await waku.store.queryWithPromiseCallback(
[TestDecoder],
async (msgPromise) => {
const msg = await msgPromise;
Expand Down Expand Up @@ -285,7 +285,7 @@ describe("Waku Store", () => {
await waitForRemotePeer(waku, [Protocols.Store]);

const messages: IMessage[] = [];
await waku.store.queryOrderedCallback(
await waku.store.queryWithOrderedCallback(
[TestDecoder],
async (msg) => {
messages.push(msg);
Expand Down Expand Up @@ -324,7 +324,7 @@ describe("Waku Store", () => {
await waitForRemotePeer(waku, [Protocols.Store]);

let messages: IMessage[] = [];
await waku.store.queryOrderedCallback(
await waku.store.queryWithOrderedCallback(
[TestDecoder],
async (msg) => {
messages.push(msg);
Expand Down Expand Up @@ -491,7 +491,7 @@ describe("Waku Store", () => {
const nwakuPeerId = await nwaku.getPeerId();

const firstMessages: IMessage[] = [];
await waku.store.queryOrderedCallback(
await waku.store.queryWithOrderedCallback(
[TestDecoder],
(msg) => {
if (msg) {
Expand All @@ -505,7 +505,7 @@ describe("Waku Store", () => {
);

const bothMessages: IMessage[] = [];
await waku.store.queryOrderedCallback(
await waku.store.queryWithOrderedCallback(
[TestDecoder],
async (msg) => {
bothMessages.push(msg);
Expand Down Expand Up @@ -552,7 +552,7 @@ describe("Waku Store", () => {

const desiredMsgs = 14;
const messages: IMessage[] = [];
await waku.store.queryOrderedCallback(
await waku.store.queryWithOrderedCallback(
[TestDecoder],
async (msg) => {
messages.push(msg);
Expand Down
Loading