From 06899de1fbb87378be7daf8bb2a5c08cfc5276cd Mon Sep 17 00:00:00 2001 From: maan2003 Date: Wed, 30 Oct 2024 21:13:43 +0530 Subject: [PATCH] feat: rpc ng --- packages/core-web/src/FedimintWallet.ts | 37 ++- packages/core-web/src/rpc.ts | 207 +++++++++++++++ .../core-web/src/services/BalanceService.ts | 4 +- .../src/services/FederationService.ts | 4 +- .../core-web/src/services/LightningService.ts | 4 +- packages/core-web/src/services/MintService.ts | 8 +- .../core-web/src/services/RecoveryService.ts | 4 +- .../core-web/src/test/TestFedimintWallet.ts | 4 +- packages/core-web/src/test/TestingService.ts | 4 +- packages/core-web/src/test/fixtures.ts | 2 +- packages/core-web/src/types/index.ts | 2 +- packages/core-web/src/types/rpc.ts | 49 ++++ packages/core-web/src/types/worker.ts | 13 - .../core-web/src/worker/WorkerClient.test.ts | 6 - packages/core-web/src/worker/WorkerClient.ts | 237 ------------------ .../core-web/src/worker/WorkerTransport.ts | 47 ++++ packages/core-web/src/worker/index.ts | 2 +- packages/core-web/src/worker/worker.js | 105 ++------ packages/core-web/src/worker/worker.test.ts | 90 ------- pnpm-lock.yaml | 207 --------------- 20 files changed, 352 insertions(+), 684 deletions(-) create mode 100644 packages/core-web/src/rpc.ts create mode 100644 packages/core-web/src/types/rpc.ts delete mode 100644 packages/core-web/src/types/worker.ts delete mode 100644 packages/core-web/src/worker/WorkerClient.test.ts delete mode 100644 packages/core-web/src/worker/WorkerClient.ts create mode 100644 packages/core-web/src/worker/WorkerTransport.ts delete mode 100644 packages/core-web/src/worker/worker.test.ts diff --git a/packages/core-web/src/FedimintWallet.ts b/packages/core-web/src/FedimintWallet.ts index a06db39..e53ed99 100644 --- a/packages/core-web/src/FedimintWallet.ts +++ b/packages/core-web/src/FedimintWallet.ts @@ -1,4 +1,5 @@ -import { WorkerClient } from './worker' +import { RpcClient } from './rpc' +import { WebWorkerTransportInit } from './worker/WorkerTransport' import { BalanceService, MintService, @@ -11,7 +12,7 @@ import { logger, type LogLevel } from './utils/logger' const DEFAULT_CLIENT_NAME = 'fm-default' as const export class FedimintWallet { - private _client: WorkerClient + private _client: RpcClient public balance: BalanceService public mint: MintService @@ -56,7 +57,7 @@ export class FedimintWallet { this._openPromise = new Promise((resolve) => { this._resolveOpen = resolve }) - this._client = new WorkerClient() + this._client = new RpcClient(new WebWorkerTransportInit()) this.mint = new MintService(this._client) this.lightning = new LightningService(this._client) this.balance = new BalanceService(this._client) @@ -71,9 +72,9 @@ export class FedimintWallet { } async initialize() { - logger.info('Initializing WorkerClient') + logger.info('Initializing RpcClient') await this._client.initialize() - logger.info('WorkerClient initialized') + logger.info('RpcClient initialized') } async waitForOpen() { @@ -85,14 +86,15 @@ export class FedimintWallet { await this._client.initialize() // TODO: Determine if this should be safe or throw if (this._isOpen) throw new Error('The FedimintWallet is already open.') - const { success } = await this._client.sendSingleMessage<{ - success: boolean - }>('open', { clientName }) - if (success) { - this._isOpen = !!success + try { + await this._client.openClient(clientName) + this._isOpen = true this._resolveOpen() + return true + } catch (e) { + logger.error('Error opening client', e) + return false } - return success } async joinFederation( @@ -106,15 +108,10 @@ export class FedimintWallet { 'The FedimintWallet is already open. You can only call `joinFederation` on closed clients.', ) try { - const response = await this._client.sendSingleMessage<{ - success: boolean - }>('join', { inviteCode, clientName }) - if (response.success) { - this._isOpen = true - this._resolveOpen() - } - - return response.success + await this._client.joinFederation(inviteCode, clientName) + this._isOpen = true + this._resolveOpen() + return true } catch (e) { logger.error('Error joining federation', e) return false diff --git a/packages/core-web/src/rpc.ts b/packages/core-web/src/rpc.ts new file mode 100644 index 0000000..d2d9cd2 --- /dev/null +++ b/packages/core-web/src/rpc.ts @@ -0,0 +1,207 @@ +import type { + CancelFunction, + JSONValue, + ModuleKind, + RpcRequest, + RpcResponse, + RpcRequestFull, + RpcResponseFull, +} from './types' +import { logger } from './utils/logger' + +export interface RpcTransport { + sendRequest(request: RpcRequestFull): void + destroy(): void +} + +export interface RpcTransportInit { + init( + onRpcResponse: (response: RpcResponseFull) => void, + ): Promise +} + +// Handles communication with the wasm worker +// TODO: Move rpc stream management to a separate "SubscriptionManager" class +export class RpcClient { + private transport?: RpcTransport + private transportInit: RpcTransportInit + private requestCounter = 0 + private requestCallbacks = new Map void>() + private initPromise?: Promise + private clientName: string | undefined + + constructor(transportInit: RpcTransportInit) { + this.transportInit = transportInit + } + + private async initializeInner(): Promise { + this.transport = await this.transportInit.init( + this.handleWorkerMessage.bind(this), + ) + } + + async initialize() { + if (this.initPromise) { + return this.initPromise + } + + this.initPromise = this.initializeInner() + return this.initPromise + } + + private handleWorkerMessage = (response: RpcResponseFull) => { + const callback = this.requestCallbacks.get(response.request_id) + + if (callback) { + callback(response) + } else { + logger.warn( + 'RpcClient - handleWorkerMessage - received message with no callback', + response.request_id, + response, + ) + } + } + + async joinFederation(inviteCode: string, clientName: string) { + await this.internalRpcSingle({ + type: 'join_federation', + invite_code: inviteCode, + client_name: clientName, + }) + } + + async openClient(clientName: string) { + await this.internalRpcSingle({ + type: 'open_client', + client_name: clientName, + }) + this.clientName = clientName + } + + async closeClient(clientName: string) { + await this.internalRpcSingle({ + type: 'close_client', + client_name: clientName, + }) + this.clientName = undefined + } + + private internalRpcStream( + request: RpcRequest, + onData: (data: Response) => void, + onError: (error: string) => void, + onEnd: () => void = () => {}, + ): CancelFunction { + const requestId = ++this.requestCounter + logger.debug('RpcClient - rpcStream', requestId, request) + let unsubscribe = () => { + const cancelRequest: RpcRequestFull = { + request_id: ++this.requestCounter, + type: 'cancel_rpc', + cancel_request_id: requestId, + } + this.transport?.sendRequest(cancelRequest) + } + + const requestFull: RpcRequestFull = { + ...request, + request_id: requestId, + } + + this.requestCallbacks.set(requestId, (response: RpcResponse) => { + switch (response.type) { + case 'data': + onData(response.data) + break + case 'error': + onError(response.error) + break + case 'end': + this.requestCallbacks.delete(requestId) + onEnd() + break + case 'aborted': + this.requestCallbacks.delete(requestId) + onEnd() + break + } + }) + this.transport?.sendRequest(requestFull) + return unsubscribe + } + + private internalRpcSingle( + request: RpcRequest, + ): Promise { + return new Promise((resolve, reject) => { + const unsubscribe = this.internalRpcStream( + request, + (data) => resolve(data as T), + (error) => reject(new Error(error)), + () => {}, + ) + // No need to unsubscribe for single requests as they auto-complete + }) + } + + rpcStream< + Response extends JSONValue = JSONValue, + Body extends JSONValue = JSONValue, + >( + module: ModuleKind, + method: string, + body: Body, + onData: (data: Response) => void, + onError: (error: string) => void, + onEnd: () => void = () => {}, + ): CancelFunction { + if (this.clientName === undefined) { + throw new Error('Wallet is not open') + } + return this.internalRpcStream( + { + type: 'client_rpc', + client_name: this.clientName, + module, + method, + payload: body, + }, + onData, + onError, + onEnd, + ) + } + + rpcSingle( + module: string, + method: string, + payload: P, + ): Promise { + if (this.clientName === undefined) { + throw new Error('Wallet is not open') + } + return this.internalRpcSingle({ + type: 'client_rpc', + client_name: this.clientName, + module, + method, + payload, + }) + } + + async cleanup() { + this.transport?.destroy() + this.requestCounter = 0 + this.initPromise = undefined + this.requestCallbacks.clear() + } + + // For Testing + _getRequestCounter() { + return this.requestCounter + } + _getRequestCallbackMap() { + return this.requestCallbacks + } +} diff --git a/packages/core-web/src/services/BalanceService.ts b/packages/core-web/src/services/BalanceService.ts index f1b51e4..e9873c5 100644 --- a/packages/core-web/src/services/BalanceService.ts +++ b/packages/core-web/src/services/BalanceService.ts @@ -1,4 +1,4 @@ -import { WorkerClient } from '../worker' +import { RpcClient } from '../rpc' /** * Balance Service @@ -6,7 +6,7 @@ import { WorkerClient } from '../worker' * The Balance Service provides methods to interact with the balance of a Fedimint wallet. */ export class BalanceService { - constructor(private client: WorkerClient) {} + constructor(private client: RpcClient) {} /** * Get the balance of the current wallet in milli-satoshis (MSats) diff --git a/packages/core-web/src/services/FederationService.ts b/packages/core-web/src/services/FederationService.ts index 8f9f046..773a326 100644 --- a/packages/core-web/src/services/FederationService.ts +++ b/packages/core-web/src/services/FederationService.ts @@ -1,8 +1,8 @@ import type { JSONValue } from '../types' -import { WorkerClient } from '../worker' +import { RpcClient } from '../rpc' export class FederationService { - constructor(private client: WorkerClient) {} + constructor(private client: RpcClient) {} async getConfig(): Promise { return await this.client.rpcSingle('', 'get_config', {}) diff --git a/packages/core-web/src/services/LightningService.ts b/packages/core-web/src/services/LightningService.ts index d270a14..fd8a21d 100644 --- a/packages/core-web/src/services/LightningService.ts +++ b/packages/core-web/src/services/LightningService.ts @@ -1,4 +1,4 @@ -import { WorkerClient } from '../worker' +import { RpcClient } from '../rpc' import type { CreateBolt11Response, GatewayInfo, @@ -11,7 +11,7 @@ import type { } from '../types' export class LightningService { - constructor(private client: WorkerClient) {} + constructor(private client: RpcClient) {} async createInvoice( amountMsats: number, diff --git a/packages/core-web/src/services/MintService.ts b/packages/core-web/src/services/MintService.ts index f9212a8..d22af30 100644 --- a/packages/core-web/src/services/MintService.ts +++ b/packages/core-web/src/services/MintService.ts @@ -1,4 +1,4 @@ -import { WorkerClient } from '../worker' +import { RpcClient } from '../rpc' import type { Duration, JSONObject, @@ -9,7 +9,7 @@ import type { } from '../types' export class MintService { - constructor(private client: WorkerClient) {} + constructor(private client: RpcClient) {} async redeemEcash(notes: string): Promise { await this.client.rpcSingle('mint', 'reissue_external_notes', { @@ -58,7 +58,7 @@ export class MintService { ? { nanos: 0, secs: tryCancelAfter } : tryCancelAfter - const res = await this.client.rpcSingle>( + const [operationId, notes] = await this.client.rpcSingle<[string, string]>( 'mint', 'spend_notes', { @@ -68,8 +68,6 @@ export class MintService { extra_meta: extraMeta, }, ) - const notes = res[1] - const operationId = res[0] return { notes, diff --git a/packages/core-web/src/services/RecoveryService.ts b/packages/core-web/src/services/RecoveryService.ts index 7b702ff..c6350b1 100644 --- a/packages/core-web/src/services/RecoveryService.ts +++ b/packages/core-web/src/services/RecoveryService.ts @@ -1,8 +1,8 @@ import type { JSONValue } from '../types' -import { WorkerClient } from '../worker' +import { RpcClient } from '../rpc' export class RecoveryService { - constructor(private client: WorkerClient) {} + constructor(private client: RpcClient) {} async hasPendingRecoveries(): Promise { return await this.client.rpcSingle('', 'has_pending_recoveries', {}) diff --git a/packages/core-web/src/test/TestFedimintWallet.ts b/packages/core-web/src/test/TestFedimintWallet.ts index 3ebd244..e0b9669 100644 --- a/packages/core-web/src/test/TestFedimintWallet.ts +++ b/packages/core-web/src/test/TestFedimintWallet.ts @@ -1,5 +1,5 @@ import { FedimintWallet } from '../FedimintWallet' -import { WorkerClient } from '../worker/WorkerClient' +import { RpcClient } from '../rpc' import { TestingService } from './TestingService' export class TestFedimintWallet extends FedimintWallet { @@ -20,7 +20,7 @@ export class TestFedimintWallet extends FedimintWallet { } // Method to expose the WorkerClient - getWorkerClient(): WorkerClient { + getWorkerClient(): RpcClient { return this['_client'] } } diff --git a/packages/core-web/src/test/TestingService.ts b/packages/core-web/src/test/TestingService.ts index e033c98..b49ab4d 100644 --- a/packages/core-web/src/test/TestingService.ts +++ b/packages/core-web/src/test/TestingService.ts @@ -1,5 +1,5 @@ import { LightningService } from '../services' -import { WorkerClient } from '../worker' +import { RpcClient } from '../rpc' export const TESTING_INVITE = 'fed11qgqrsdnhwden5te0v9cxjtt4dekxzamxw4kz6mmjvvkhydted9ukg6r9xfsnx7th0fhn26tf093juamwv4u8gtnpwpcz7qqpyz0e327ua8geceutfrcaezwt22mk6s2rdy09kg72jrcmncng2gn0kp2m5sk' @@ -9,7 +9,7 @@ export const TESTING_INVITE = export class TestingService { public TESTING_INVITE: string constructor( - private client: WorkerClient, + private client: RpcClient, private lightning: LightningService, ) { // Solo Mint on mutinynet diff --git a/packages/core-web/src/test/fixtures.ts b/packages/core-web/src/test/fixtures.ts index a809152..dcaceb9 100644 --- a/packages/core-web/src/test/fixtures.ts +++ b/packages/core-web/src/test/fixtures.ts @@ -58,7 +58,7 @@ export const workerTest = test.extend<{ await use(randomTestingId) }, workerClient: async ({}, use) => { - const workerClient = new WorkerClient() + const workerClient = new RpcClient() await use(workerClient) }, }) diff --git a/packages/core-web/src/types/index.ts b/packages/core-web/src/types/index.ts index a2b9e6e..9825a9f 100644 --- a/packages/core-web/src/types/index.ts +++ b/packages/core-web/src/types/index.ts @@ -1,3 +1,3 @@ export * from './wallet' export * from './utils' -export * from './worker' +export * from './rpc' diff --git a/packages/core-web/src/types/rpc.ts b/packages/core-web/src/types/rpc.ts new file mode 100644 index 0000000..c65a62b --- /dev/null +++ b/packages/core-web/src/types/rpc.ts @@ -0,0 +1,49 @@ +export type RpcRequestFull = { + request_id: number +} & RpcRequest + +export type RpcRequest = + | { + type: 'join_federation' + invite_code: string + client_name: string + } + | { + type: 'open_client' + client_name: string + } + | { + type: 'close_client' + client_name: string + } + | { + type: 'client_rpc' + client_name: string + module: string + method: string + payload: any + } + | { + type: 'cancel_rpc' + cancel_request_id: number + } + +export type RpcResponseFull = { + request_id: number +} & RpcResponse + +export type RpcResponse = + | { + type: 'data' + data: any + } + | { + type: 'error' + error: string + } + | { + type: 'aborted' + } + | { + type: 'end' + } diff --git a/packages/core-web/src/types/worker.ts b/packages/core-web/src/types/worker.ts deleted file mode 100644 index cc25e61..0000000 --- a/packages/core-web/src/types/worker.ts +++ /dev/null @@ -1,13 +0,0 @@ -const WorkerMessageTypes = [ - 'init', - 'initialized', - 'rpc', - 'log', - 'open', - 'join', - 'error', - 'unsubscribe', - 'cleanup', -] as const - -export type WorkerMessageType = (typeof WorkerMessageTypes)[number] diff --git a/packages/core-web/src/worker/WorkerClient.test.ts b/packages/core-web/src/worker/WorkerClient.test.ts deleted file mode 100644 index 369d603..0000000 --- a/packages/core-web/src/worker/WorkerClient.test.ts +++ /dev/null @@ -1,6 +0,0 @@ -import { expect } from 'vitest' -import { workerTest } from '../test/fixtures' - -workerTest('should initialize', async ({ workerClient }) => { - expect(workerClient).toBeDefined() -}) diff --git a/packages/core-web/src/worker/WorkerClient.ts b/packages/core-web/src/worker/WorkerClient.ts deleted file mode 100644 index c9cd4cd..0000000 --- a/packages/core-web/src/worker/WorkerClient.ts +++ /dev/null @@ -1,237 +0,0 @@ -import type { - CancelFunction, - JSONValue, - ModuleKind, - StreamError, - StreamResult, - WorkerMessageType, -} from '../types' -import { logger } from '../utils/logger' - -// Handles communication with the wasm worker -// TODO: Move rpc stream management to a separate "SubscriptionManager" class -export class WorkerClient { - private worker: Worker - private requestCounter = 0 - private requestCallbacks = new Map void>() - private initPromise: Promise | undefined = undefined - - constructor() { - // Must create the URL inside the constructor for vite - this.worker = new Worker(new URL('./worker.js', import.meta.url), { - type: 'module', - }) - this.worker.onmessage = this.handleWorkerMessage.bind(this) - this.worker.onerror = this.handleWorkerError.bind(this) - logger.info('WorkerClient instantiated') - logger.debug('WorkerClient', this.worker) - } - - // Idempotent setup - Loads the wasm module - initialize() { - if (this.initPromise) return this.initPromise - this.initPromise = this.sendSingleMessage('init') - return this.initPromise - } - - private handleWorkerLogs(event: MessageEvent) { - const { type, level, message, ...data } = event.data - logger.log(level, message, ...data) - } - - private handleWorkerError(event: ErrorEvent) { - logger.error('Worker error', event) - } - - private handleWorkerMessage(event: MessageEvent) { - const { type, requestId, ...data } = event.data - if (type === 'log') { - this.handleWorkerLogs(event.data) - } - const streamCallback = this.requestCallbacks.get(requestId) - // TODO: Handle errors... maybe have another callbacks list for errors? - logger.debug('WorkerClient - handleWorkerMessage', event.data) - if (streamCallback) { - streamCallback(data) // {data: something} OR {error: something} - } else { - logger.warn( - 'WorkerClient - handleWorkerMessage - received message with no callback', - requestId, - event.data, - ) - } - } - - // TODO: Handle errors... maybe have another callbacks list for errors? - // TODO: Handle timeouts - // TODO: Handle multiple errors - - sendSingleMessage< - Response extends JSONValue = JSONValue, - Payload extends JSONValue = JSONValue, - >(type: WorkerMessageType, payload?: Payload): Promise { - return new Promise((resolve, reject) => { - const requestId = ++this.requestCounter - logger.debug('WorkerClient - sendSingleMessage', requestId, type, payload) - this.requestCallbacks.set( - requestId, - (response: StreamResult) => { - this.requestCallbacks.delete(requestId) - logger.debug( - 'WorkerClient - sendSingleMessage - response', - requestId, - response, - ) - if (response.data) resolve(response.data) - else if (response.error) reject(response.error) - else - logger.warn( - 'WorkerClient - sendSingleMessage - malformed response', - requestId, - response, - ) - }, - ) - this.worker.postMessage({ type, payload, requestId }) - }) - } - - /** - * @summary Initiates an RPC stream with the specified module and method. - * - * @description - * This function sets up an RPC stream by sending a request to a worker and - * handling responses asynchronously. It ensures that unsubscription is handled - * correctly, even if the unsubscribe function is called before the subscription - * is fully established, by deferring the unsubscription attempt using `setTimeout`. - * - * The function operates in a non-blocking manner, leveraging Promises to manage - * asynchronous operations and callbacks to handle responses. - * - * - * @template Response - The expected type of the successful response. - * @template Body - The type of the request body. - * @param module - The module kind to interact with. - * @param method - The method name to invoke on the module. - * @param body - The request payload. - * @param onSuccess - Callback invoked with the response data on success. - * @param onError - Callback invoked with error information if an error occurs. - * @param onEnd - Optional callback invoked when the stream ends. - * @returns A function that can be called to cancel the subscription. - * - */ - rpcStream< - Response extends JSONValue = JSONValue, - Body extends JSONValue = JSONValue, - >( - module: ModuleKind, - method: string, - body: Body, - onSuccess: (res: Response) => void, - onError: (res: StreamError['error']) => void, - onEnd: () => void = () => {}, - ): CancelFunction { - const requestId = ++this.requestCounter - logger.debug('WorkerClient - rpcStream', requestId, module, method, body) - let unsubscribe: (value: void) => void = () => {} - let isSubscribed = false - - const unsubscribePromise = new Promise((resolve) => { - unsubscribe = () => { - if (isSubscribed) { - // If already subscribed, resolve immediately to trigger unsubscription - resolve() - } else { - // If not yet subscribed, defer the unsubscribe attempt to the next event loop tick - // This ensures that subscription setup has time to complete - setTimeout(() => unsubscribe(), 0) - } - } - }) - - // Initiate the inner RPC stream setup asynchronously - this._rpcStreamInner( - requestId, - module, - method, - body, - onSuccess, - onError, - onEnd, - unsubscribePromise, - ).then(() => { - isSubscribed = true - }) - - return unsubscribe - } - - private async _rpcStreamInner< - Response extends JSONValue = JSONValue, - Body extends JSONValue = JSONValue, - >( - requestId: number, - module: ModuleKind, - method: string, - body: Body, - onSuccess: (res: Response) => void, - onError: (res: StreamError['error']) => void, - onEnd: () => void = () => {}, - unsubscribePromise: Promise, - // Unsubscribe function - ): Promise { - // await this.openPromise - // if (!this.worker || !this._isOpen) - // throw new Error('FedimintWallet is not open') - - this.requestCallbacks.set(requestId, (response: StreamResult) => { - if (response.error !== undefined) { - onError(response.error) - } else if (response.data !== undefined) { - onSuccess(response.data) - } else if (response.end !== undefined) { - this.requestCallbacks.delete(requestId) - onEnd() - } - }) - this.worker.postMessage({ - type: 'rpc', - payload: { module, method, body }, - requestId, - }) - - unsubscribePromise.then(() => { - this.worker?.postMessage({ - type: 'unsubscribe', - requestId, - }) - this.requestCallbacks.delete(requestId) - }) - } - - rpcSingle( - module: ModuleKind, - method: string, - body: JSONValue, - ): Promise { - logger.debug('WorkerClient - rpcSingle', module, method, body) - return new Promise((resolve, reject) => { - this.rpcStream(module, method, body, resolve, reject) - }) - } - - async cleanup() { - await this.sendSingleMessage('cleanup') - this.requestCounter = 0 - this.initPromise = undefined - this.requestCallbacks.clear() - } - - // For Testing - _getRequestCounter() { - return this.requestCounter - } - _getRequestCallbackMap() { - return this.requestCallbacks - } -} diff --git a/packages/core-web/src/worker/WorkerTransport.ts b/packages/core-web/src/worker/WorkerTransport.ts new file mode 100644 index 0000000..3a0ae75 --- /dev/null +++ b/packages/core-web/src/worker/WorkerTransport.ts @@ -0,0 +1,47 @@ +import type { RpcResponseFull, RpcRequestFull } from '../types' +import { logger } from '../utils/logger' +import type { RpcTransport, RpcTransportInit } from '../rpc' + +class WebWorkerTransport implements RpcTransport { + constructor(private worker: Worker) {} + + sendRequest(request: RpcRequestFull): void { + this.worker.postMessage(request) + } + + destroy(): void { + this.worker.terminate() + } +} + +export class WebWorkerTransportInit implements RpcTransportInit { + async init( + onRpcResponse: (response: RpcResponseFull) => void, + ): Promise { + const worker = new Worker(new URL('../worker.js', import.meta.url), { + type: 'module', + }) + + await new Promise((resolve, reject) => { + const handleInit = (event: MessageEvent) => { + const response = event.data + if (response.type === 'init_error') { + reject(new Error(response.error)) + } else if (response.type === 'init_success') { + logger.info('WebWorker instantiated') + resolve() + } + } + + worker.onmessage = handleInit + worker.postMessage({ + type: 'init', + }) + }) + + worker.onmessage = (event: MessageEvent) => { + onRpcResponse(event.data as RpcResponseFull) + } + return new WebWorkerTransport(worker) + } +} diff --git a/packages/core-web/src/worker/index.ts b/packages/core-web/src/worker/index.ts index cf57f44..19c1421 100644 --- a/packages/core-web/src/worker/index.ts +++ b/packages/core-web/src/worker/index.ts @@ -1 +1 @@ -export { WorkerClient } from './WorkerClient' +export { WebWorkerTransportInit } from './WorkerTransport' diff --git a/packages/core-web/src/worker/worker.js b/packages/core-web/src/worker/worker.js index 5fe9d4d..e99dc3f 100644 --- a/packages/core-web/src/worker/worker.js +++ b/packages/core-web/src/worker/worker.js @@ -4,102 +4,25 @@ // TODO: remove once https://github.com/vitest-dev/vitest/pull/6569 lands in a release globalThis.__vitest_browser_runner__ = { wrapDynamicImport: (foo) => foo() } -// dynamically imported Constructor for WasmClient -let WasmClient = null -// client instance -let client = null - -const streamCancelMap = new Map() - -const handleFree = (requestId) => { - streamCancelMap.delete(requestId) -} +let rpcHandler = null console.log('Worker - init') self.onmessage = async (event) => { - const { type, payload, requestId } = event.data - - try { - if (type === 'init') { - WasmClient = (await import('@fedimint/fedimint-client-wasm-bundler')) - .WasmClient - self.postMessage({ type: 'initialized', data: {}, requestId }) - } else if (type === 'open') { - const { clientName } = payload - client = (await WasmClient.open(clientName)) || null - self.postMessage({ - type: 'open', - data: { success: !!client }, - requestId, - }) - } else if (type === 'join') { - const { inviteCode, clientName: joinClientName } = payload - try { - client = await WasmClient.join_federation(joinClientName, inviteCode) - self.postMessage({ - type: 'join', - data: { success: !!client }, - requestId, - }) - } catch (e) { - self.postMessage({ type: 'error', error: e.message, requestId }) - } - } else if (type === 'rpc') { - const { module, method, body } = payload - console.log('RPC received', module, method, body) - if (!client) { - self.postMessage({ - type: 'error', - error: 'WasmClient not initialized', - requestId, - }) - return - } - const rpcHandle = await client.rpc( - module, - method, - JSON.stringify(body), - (res) => { - console.log('RPC response', requestId, res) - const data = JSON.parse(res) - self.postMessage({ type: 'rpcResponse', requestId, ...data }) - - if (data.end !== undefined) { - // Handle stream ending - const handle = streamCancelMap.get(requestId) - handle?.free() - } - }, + if (event.type === 'init') { + try { + const { RpcHandler } = await import( + '@fedimint/fedimint-client-wasm-bundler' ) - streamCancelMap.set(requestId, rpcHandle) - } else if (type === 'unsubscribe') { - const rpcHandle = streamCancelMap.get(requestId) - if (rpcHandle) { - rpcHandle.cancel() - rpcHandle.free() - streamCancelMap.delete(requestId) - } - } else if (type === 'cleanup') { - console.log('cleanup message received') - client?.free() - self.postMessage({ - type: 'cleanup', - data: {}, - requestId, - }) - close() - } else { - self.postMessage({ - type: 'error', - error: 'Unknown message type', - requestId, - }) + rpcHandler = new RpcHandler() + self.postMessage({ type: 'init_success' }) + } catch (err) { + console.error('Worker init failed:', err) + self.postMessage({ type: 'init_error', error: err.toString() }) } - } catch (e) { - console.error('ERROR', e) - self.postMessage({ type: 'error', error: e, requestId }) + } else { + rpcHandler.rpc(JSON.stringify(event.data), (response) => + self.postMessage(JSON.parse(response)), + ) } } - -// self.postMessage({ type: 'init', data: {} }) diff --git a/packages/core-web/src/worker/worker.test.ts b/packages/core-web/src/worker/worker.test.ts deleted file mode 100644 index 866dde3..0000000 --- a/packages/core-web/src/worker/worker.test.ts +++ /dev/null @@ -1,90 +0,0 @@ -import { expect } from 'vitest' -import { TESTING_INVITE } from '../test/TestingService' -import { JSONObject } from '../types' -import { workerTest } from '../test/fixtures' - -// Waits for a message of a given type from the worker -const waitForWorkerResponse = ( - worker: Worker, - messageType: string, -): Promise => { - return new Promise((resolve, reject) => { - worker.onmessage = (event) => { - if (event.data.type === messageType) { - resolve(event.data) - } else if (event.data.type === 'error') { - reject(event.data.error) - } - } - worker.onerror = (error) => { - reject(error.message) - } - }) -} - -workerTest( - 'should initialize WasmClient on init message', - async ({ worker }) => { - worker.postMessage({ type: 'init', requestId: 1 }) - const response = await waitForWorkerResponse(worker, 'initialized') - expect(response.data).toEqual({}) - }, -) - -workerTest( - 'should return false on open for a new client', - async ({ worker, clientName }) => { - worker.postMessage({ type: 'init', requestId: 1 }) - await waitForWorkerResponse(worker, 'initialized') - - worker.postMessage({ - type: 'open', - requestId: 2, - payload: { clientName }, - }) - const response = await waitForWorkerResponse(worker, 'open') - expect(response.data).toEqual({ success: false }) - }, -) - -workerTest( - 'should error on fake federation invitation', - async ({ worker, clientName }) => { - worker.postMessage({ type: 'init', requestId: 1 }) - await waitForWorkerResponse(worker, 'initialized') - - worker.postMessage({ - type: 'join', - requestId: 2, - payload: { inviteCode: 'fakefederationinvitation', clientName }, - }) - try { - await waitForWorkerResponse(worker, 'open') - expect.unreachable() - } catch (e) { - expect(e).toBe('parsing failed') - } - }, -) - -workerTest( - 'should handle joining a federation', - async ({ worker, clientName }) => { - worker.postMessage({ type: 'init', requestId: 1 }) - await waitForWorkerResponse(worker, 'initialized') - - worker.postMessage({ - type: 'join', - requestId: 2, - payload: { inviteCode: TESTING_INVITE, clientName }, - }) - const response = await waitForWorkerResponse(worker, 'join') - expect(response.data).toEqual({ success: true }) - }, -) - -workerTest('should handle unknown message type', async ({ worker }) => { - worker.postMessage({ type: 'unknown', requestId: 2 }) - const response = await waitForWorkerResponse(worker, 'error') - expect(response.error).toBe('Unknown message type') -}) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 5b2f102..2427bfc 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -106,49 +106,6 @@ importers: specifier: ^3.3.0 version: 3.3.0(vite@5.4.8(@types/node@22.5.5)(terser@5.32.0)) - examples/vite-react: - dependencies: - react: - specifier: ^18.3.1 - version: 18.3.1 - react-dom: - specifier: '>=18.3.1' - version: 18.3.1(react@18.3.1) - devDependencies: - '@eslint/js': - specifier: ^9.11.1 - version: 9.13.0 - '@types/react': - specifier: '>=18.3.11' - version: 18.3.11 - '@types/react-dom': - specifier: ^18.3.0 - version: 18.3.0 - '@vitejs/plugin-react': - specifier: ^4.3.2 - version: 4.3.2(vite@5.4.8(@types/node@22.5.5)(terser@5.32.0)) - eslint: - specifier: ^9.11.1 - version: 9.13.0 - eslint-plugin-react-hooks: - specifier: ^4.6.2 - version: 4.6.2(eslint@9.13.0) - eslint-plugin-react-refresh: - specifier: ^0.4.12 - version: 0.4.12(eslint@9.13.0) - globals: - specifier: ^15.9.0 - version: 15.11.0 - typescript: - specifier: ^5.6.2 - version: 5.6.2 - typescript-eslint: - specifier: ^8.7.0 - version: 8.10.0(eslint@9.13.0)(typescript@5.6.2) - vite: - specifier: ^5.4.8 - version: 5.4.8(@types/node@22.5.5)(terser@5.32.0) - packages/core-web: dependencies: '@fedimint/fedimint-client-wasm-bundler': @@ -1078,17 +1035,6 @@ packages: typescript: optional: true - '@typescript-eslint/eslint-plugin@8.10.0': - resolution: {integrity: sha512-phuB3hoP7FFKbRXxjl+DRlQDuJqhpOnm5MmtROXyWi3uS/Xg2ZXqiQfcG2BJHiN4QKyzdOJi3NEn/qTnjUlkmQ==} - engines: {node: ^18.18.0 || ^20.9.0 || >=21.1.0} - peerDependencies: - '@typescript-eslint/parser': ^8.0.0 || ^8.0.0-alpha.0 - eslint: ^8.57.0 || ^9.0.0 - typescript: '*' - peerDependenciesMeta: - typescript: - optional: true - '@typescript-eslint/parser@7.18.0': resolution: {integrity: sha512-4Z+L8I2OqhZV8qA132M4wNL30ypZGYOQVBfMgxDH/K5UX0PNqTu1c6za9ST5r9+tavvHiTWmBnKzpCJ/GlVFtg==} engines: {node: ^18.18.0 || >=20.0.0} @@ -1099,24 +1045,10 @@ packages: typescript: optional: true - '@typescript-eslint/parser@8.10.0': - resolution: {integrity: sha512-E24l90SxuJhytWJ0pTQydFT46Nk0Z+bsLKo/L8rtQSL93rQ6byd1V/QbDpHUTdLPOMsBCcYXZweADNCfOCmOAg==} - engines: {node: ^18.18.0 || ^20.9.0 || >=21.1.0} - peerDependencies: - eslint: ^8.57.0 || ^9.0.0 - typescript: '*' - peerDependenciesMeta: - typescript: - optional: true - '@typescript-eslint/scope-manager@7.18.0': resolution: {integrity: sha512-jjhdIE/FPF2B7Z1uzc6i3oWKbGcHb87Qw7AWj6jmEqNOfDFbJWtjt/XfwCpvNkpGWlcJaog5vTR+VV8+w9JflA==} engines: {node: ^18.18.0 || >=20.0.0} - '@typescript-eslint/scope-manager@8.10.0': - resolution: {integrity: sha512-AgCaEjhfql9MDKjMUxWvH7HjLeBqMCBfIaBbzzIcBbQPZE7CPh1m6FF+L75NUMJFMLYhCywJXIDEMa3//1A0dw==} - engines: {node: ^18.18.0 || ^20.9.0 || >=21.1.0} - '@typescript-eslint/type-utils@7.18.0': resolution: {integrity: sha512-XL0FJXuCLaDuX2sYqZUUSOJ2sG5/i1AAze+axqmLnSkNEVMVYLF+cbwlB2w8D1tinFuSikHmFta+P+HOofrLeA==} engines: {node: ^18.18.0 || >=20.0.0} @@ -1127,23 +1059,10 @@ packages: typescript: optional: true - '@typescript-eslint/type-utils@8.10.0': - resolution: {integrity: sha512-PCpUOpyQSpxBn230yIcK+LeCQaXuxrgCm2Zk1S+PTIRJsEfU6nJ0TtwyH8pIwPK/vJoA+7TZtzyAJSGBz+s/dg==} - engines: {node: ^18.18.0 || ^20.9.0 || >=21.1.0} - peerDependencies: - typescript: '*' - peerDependenciesMeta: - typescript: - optional: true - '@typescript-eslint/types@7.18.0': resolution: {integrity: sha512-iZqi+Ds1y4EDYUtlOOC+aUmxnE9xS/yCigkjA7XpTKV6nCBd3Hp/PRGGmdwnfkV2ThMyYldP1wRpm/id99spTQ==} engines: {node: ^18.18.0 || >=20.0.0} - '@typescript-eslint/types@8.10.0': - resolution: {integrity: sha512-k/E48uzsfJCRRbGLapdZgrX52csmWJ2rcowwPvOZ8lwPUv3xW6CcFeJAXgx4uJm+Ge4+a4tFOkdYvSpxhRhg1w==} - engines: {node: ^18.18.0 || ^20.9.0 || >=21.1.0} - '@typescript-eslint/typescript-estree@7.18.0': resolution: {integrity: sha512-aP1v/BSPnnyhMHts8cf1qQ6Q1IFwwRvAQGRvBFkWlo3/lH29OXA3Pts+c10nxRxIBrDnoMqzhgdwVe5f2D6OzA==} engines: {node: ^18.18.0 || >=20.0.0} @@ -1153,35 +1072,16 @@ packages: typescript: optional: true - '@typescript-eslint/typescript-estree@8.10.0': - resolution: {integrity: sha512-3OE0nlcOHaMvQ8Xu5gAfME3/tWVDpb/HxtpUZ1WeOAksZ/h/gwrBzCklaGzwZT97/lBbbxJ16dMA98JMEngW4w==} - engines: {node: ^18.18.0 || ^20.9.0 || >=21.1.0} - peerDependencies: - typescript: '*' - peerDependenciesMeta: - typescript: - optional: true - '@typescript-eslint/utils@7.18.0': resolution: {integrity: sha512-kK0/rNa2j74XuHVcoCZxdFBMF+aq/vH83CXAOHieC+2Gis4mF8jJXT5eAfyD3K0sAxtPuwxaIOIOvhwzVDt/kw==} engines: {node: ^18.18.0 || >=20.0.0} peerDependencies: eslint: ^8.56.0 - '@typescript-eslint/utils@8.10.0': - resolution: {integrity: sha512-Oq4uZ7JFr9d1ZunE/QKy5egcDRXT/FrS2z/nlxzPua2VHFtmMvFNDvpq1m/hq0ra+T52aUezfcjGRIB7vNJF9w==} - engines: {node: ^18.18.0 || ^20.9.0 || >=21.1.0} - peerDependencies: - eslint: ^8.57.0 || ^9.0.0 - '@typescript-eslint/visitor-keys@7.18.0': resolution: {integrity: sha512-cDF0/Gf81QpY3xYyJKDV14Zwdmid5+uuENhjH2EqFaF0ni+yAyq/LzMaIJdhNJXZI7uLzwIlA+V7oWoyn6Curg==} engines: {node: ^18.18.0 || >=20.0.0} - '@typescript-eslint/visitor-keys@8.10.0': - resolution: {integrity: sha512-k8nekgqwr7FadWk548Lfph6V3r9OVqjzAIVskE7orMZR23cGJjAOVazsZSJW+ElyjfTM4wx/1g88Mi70DDtG9A==} - engines: {node: ^18.18.0 || ^20.9.0 || >=21.1.0} - '@typescript/vfs@1.6.0': resolution: {integrity: sha512-hvJUjNVeBMp77qPINuUvYXj4FyWeeMMKZkxEATEU3hqBAQ7qdTBCUFT7Sp0Zu0faeEtFf+ldXxMEDr/bk73ISg==} peerDependencies: @@ -1947,10 +1847,6 @@ packages: resolution: {integrity: sha512-oahGvuMGQlPw/ivIYBjVSrWAfWLBeku5tpPE2fOPLi+WHffIWbuh2tCjhyQhTBPMf5E9jDEH4FOmTYgYwbKwtQ==} engines: {node: '>=18'} - globals@15.11.0: - resolution: {integrity: sha512-yeyNSjdbyVaWurlwCpcA6XNBrHTMIeDdj0/hnvX/OLJ9ekOXYbLsLinH/MucQyGvNnXhidTdNhTtJaffL2sMfw==} - engines: {node: '>=18'} - globby@11.1.0: resolution: {integrity: sha512-jhIXaOzy1sb8IyocaruWSn1TjmnBVs8Ayhcy83rmxNJ8q2uWKCAj3CnJY+KpGSXCueAPc0i05kVvVKtP1t9S3g==} engines: {node: '>=10'} @@ -3016,15 +2912,6 @@ packages: resolution: {integrity: sha512-yOGpmOAL7CkKe/91I5O3gPICmJNLJ1G4zFYVAsRHg7M64biSnPtRj0WNQt++bRkjYOqjWXrhnUw1utzmVErAdg==} engines: {node: '>=16'} - typescript-eslint@8.10.0: - resolution: {integrity: sha512-YIu230PeN7z9zpu/EtqCIuRVHPs4iSlqW6TEvjbyDAE3MZsSl2RXBo+5ag+lbABCG8sFM1WVKEXhlQ8Ml8A3Fw==} - engines: {node: ^18.18.0 || ^20.9.0 || >=21.1.0} - peerDependencies: - typescript: '*' - peerDependenciesMeta: - typescript: - optional: true - typescript@5.4.2: resolution: {integrity: sha512-+2/g0Fds1ERlP6JsakQQDXjZdZMM+rqpamFZJEKh4kwTIn3iDkgKtby0CeNd5ATNZ4Ry1ax15TMx0W2V+miizQ==} engines: {node: '>=14.17'} @@ -4318,24 +4205,6 @@ snapshots: transitivePeerDependencies: - supports-color - '@typescript-eslint/eslint-plugin@8.10.0(@typescript-eslint/parser@8.10.0(eslint@9.13.0)(typescript@5.6.2))(eslint@9.13.0)(typescript@5.6.2)': - dependencies: - '@eslint-community/regexpp': 4.11.0 - '@typescript-eslint/parser': 8.10.0(eslint@9.13.0)(typescript@5.6.2) - '@typescript-eslint/scope-manager': 8.10.0 - '@typescript-eslint/type-utils': 8.10.0(eslint@9.13.0)(typescript@5.6.2) - '@typescript-eslint/utils': 8.10.0(eslint@9.13.0)(typescript@5.6.2) - '@typescript-eslint/visitor-keys': 8.10.0 - eslint: 9.13.0 - graphemer: 1.4.0 - ignore: 5.3.2 - natural-compare: 1.4.0 - ts-api-utils: 1.3.0(typescript@5.6.2) - optionalDependencies: - typescript: 5.6.2 - transitivePeerDependencies: - - supports-color - '@typescript-eslint/parser@7.18.0(eslint@9.13.0)(typescript@5.6.2)': dependencies: '@typescript-eslint/scope-manager': 7.18.0 @@ -4349,29 +4218,11 @@ snapshots: transitivePeerDependencies: - supports-color - '@typescript-eslint/parser@8.10.0(eslint@9.13.0)(typescript@5.6.2)': - dependencies: - '@typescript-eslint/scope-manager': 8.10.0 - '@typescript-eslint/types': 8.10.0 - '@typescript-eslint/typescript-estree': 8.10.0(typescript@5.6.2) - '@typescript-eslint/visitor-keys': 8.10.0 - debug: 4.3.6 - eslint: 9.13.0 - optionalDependencies: - typescript: 5.6.2 - transitivePeerDependencies: - - supports-color - '@typescript-eslint/scope-manager@7.18.0': dependencies: '@typescript-eslint/types': 7.18.0 '@typescript-eslint/visitor-keys': 7.18.0 - '@typescript-eslint/scope-manager@8.10.0': - dependencies: - '@typescript-eslint/types': 8.10.0 - '@typescript-eslint/visitor-keys': 8.10.0 - '@typescript-eslint/type-utils@7.18.0(eslint@9.13.0)(typescript@5.6.2)': dependencies: '@typescript-eslint/typescript-estree': 7.18.0(typescript@5.6.2) @@ -4384,22 +4235,8 @@ snapshots: transitivePeerDependencies: - supports-color - '@typescript-eslint/type-utils@8.10.0(eslint@9.13.0)(typescript@5.6.2)': - dependencies: - '@typescript-eslint/typescript-estree': 8.10.0(typescript@5.6.2) - '@typescript-eslint/utils': 8.10.0(eslint@9.13.0)(typescript@5.6.2) - debug: 4.3.6 - ts-api-utils: 1.3.0(typescript@5.6.2) - optionalDependencies: - typescript: 5.6.2 - transitivePeerDependencies: - - eslint - - supports-color - '@typescript-eslint/types@7.18.0': {} - '@typescript-eslint/types@8.10.0': {} - '@typescript-eslint/typescript-estree@7.18.0(typescript@5.6.2)': dependencies: '@typescript-eslint/types': 7.18.0 @@ -4415,21 +4252,6 @@ snapshots: transitivePeerDependencies: - supports-color - '@typescript-eslint/typescript-estree@8.10.0(typescript@5.6.2)': - dependencies: - '@typescript-eslint/types': 8.10.0 - '@typescript-eslint/visitor-keys': 8.10.0 - debug: 4.3.6 - fast-glob: 3.3.2 - is-glob: 4.0.3 - minimatch: 9.0.5 - semver: 7.6.3 - ts-api-utils: 1.3.0(typescript@5.6.2) - optionalDependencies: - typescript: 5.6.2 - transitivePeerDependencies: - - supports-color - '@typescript-eslint/utils@7.18.0(eslint@9.13.0)(typescript@5.6.2)': dependencies: '@eslint-community/eslint-utils': 4.4.0(eslint@9.13.0) @@ -4441,27 +4263,11 @@ snapshots: - supports-color - typescript - '@typescript-eslint/utils@8.10.0(eslint@9.13.0)(typescript@5.6.2)': - dependencies: - '@eslint-community/eslint-utils': 4.4.0(eslint@9.13.0) - '@typescript-eslint/scope-manager': 8.10.0 - '@typescript-eslint/types': 8.10.0 - '@typescript-eslint/typescript-estree': 8.10.0(typescript@5.6.2) - eslint: 9.13.0 - transitivePeerDependencies: - - supports-color - - typescript - '@typescript-eslint/visitor-keys@7.18.0': dependencies: '@typescript-eslint/types': 7.18.0 eslint-visitor-keys: 3.4.3 - '@typescript-eslint/visitor-keys@8.10.0': - dependencies: - '@typescript-eslint/types': 8.10.0 - eslint-visitor-keys: 3.4.3 - '@typescript/vfs@1.6.0(typescript@5.6.2)': dependencies: debug: 4.3.6 @@ -5295,8 +5101,6 @@ snapshots: globals@14.0.0: {} - globals@15.11.0: {} - globby@11.1.0: dependencies: array-union: 2.1.0 @@ -6456,17 +6260,6 @@ snapshots: type-fest@4.26.1: {} - typescript-eslint@8.10.0(eslint@9.13.0)(typescript@5.6.2): - dependencies: - '@typescript-eslint/eslint-plugin': 8.10.0(@typescript-eslint/parser@8.10.0(eslint@9.13.0)(typescript@5.6.2))(eslint@9.13.0)(typescript@5.6.2) - '@typescript-eslint/parser': 8.10.0(eslint@9.13.0)(typescript@5.6.2) - '@typescript-eslint/utils': 8.10.0(eslint@9.13.0)(typescript@5.6.2) - optionalDependencies: - typescript: 5.6.2 - transitivePeerDependencies: - - eslint - - supports-color - typescript@5.4.2: {} typescript@5.5.2: {}