diff --git a/packages/core/src/lib/light_push/index.ts b/packages/core/src/lib/light_push/index.ts index bca6cd9ea9..a0259b3d12 100644 --- a/packages/core/src/lib/light_push/index.ts +++ b/packages/core/src/lib/light_push/index.ts @@ -38,37 +38,70 @@ class LightPush extends BaseProtocol implements ILightPush { this.options = options || {}; } + private async preparePushMessage( + encoder: IEncoder, + message: IMessage, + pubSubTopic: string + ): Promise<{ + query: PushRpc | null; + error?: SendError; + }> { + if (!isSizeValid(message.payload)) { + log("Failed to send waku light push: message is bigger than 1MB"); + return { query: null, error: SendError.SIZE_TOO_BIG }; + } + + const protoMessage = await encoder.toProtoObj(message); + if (!protoMessage) { + log("Failed to encode to protoMessage, aborting push"); + return { + query: null, + error: SendError.ENCODE_FAILED + }; + } + + const query = PushRpc.createRequest(protoMessage, pubSubTopic); + return { query }; + } + async send( encoder: IEncoder, message: IMessage, opts?: ProtocolOptions ): Promise { const { pubSubTopic = DefaultPubSubTopic } = this.options; - - const peer = await this.getPeer(opts?.peerId); - const stream = await this.newStream(peer); - const recipients: PeerId[] = []; let error: undefined | SendError = undefined; + let query: PushRpc | null = null; + try { - if (!isSizeValid(message.payload)) { - log("Failed to send waku light push: message is bigger that 1MB"); - return { - recipients, - error: SendError.SIZE_TOO_BIG - }; - } + const { query: preparedQuery, error: preparationError } = + await this.preparePushMessage(encoder, message, pubSubTopic); - const protoMessage = await encoder.toProtoObj(message); - if (!protoMessage) { - log("Failed to encode to protoMessage, aborting push"); + if (preparationError) { return { recipients, - error: SendError.ENCODE_FAILED + error: preparationError }; } - const query = PushRpc.createRequest(protoMessage, pubSubTopic); + + query = preparedQuery; + } catch (error) { + log("Failed to prepare push message", error); + } + + if (!query) { + return { + recipients, + error: SendError.GENERIC_FAIL + }; + } + + const peer = await this.getPeer(opts?.peerId); + const stream = await this.newStream(peer); + + try { const res = await pipe( [query.encode()], lp.encode, @@ -76,6 +109,7 @@ class LightPush extends BaseProtocol implements ILightPush { lp.decode, async (source) => await all(source) ); + try { const bytes = new Uint8ArrayList(); res.forEach((chunk) => { @@ -98,9 +132,10 @@ class LightPush extends BaseProtocol implements ILightPush { log("Failed to send waku light push request", err); error = SendError.GENERIC_FAIL; } + return { - error, - recipients + recipients, + error }; } }