From 0132a2e39725174fb43d5178494d447dde6881cb Mon Sep 17 00:00:00 2001 From: maan2003 Date: Wed, 30 Oct 2024 21:13:43 +0530 Subject: [PATCH] feat: rpc ng --- packages/core-web/src/FedimintWallet.ts | 37 ++- packages/core-web/src/rpc.ts | 207 +++++++++++++++ .../core-web/src/services/BalanceService.ts | 4 +- .../src/services/FederationService.ts | 4 +- .../core-web/src/services/LightningService.ts | 4 +- packages/core-web/src/services/MintService.ts | 8 +- .../core-web/src/services/RecoveryService.ts | 4 +- .../core-web/src/test/TestFedimintWallet.ts | 4 +- packages/core-web/src/test/TestingService.ts | 4 +- packages/core-web/src/test/fixtures.ts | 2 +- packages/core-web/src/types/index.ts | 2 +- packages/core-web/src/types/rpc.ts | 49 ++++ packages/core-web/src/types/worker.ts | 13 - .../core-web/src/worker/WorkerClient.test.ts | 6 - packages/core-web/src/worker/WorkerClient.ts | 236 ------------------ .../core-web/src/worker/WorkerTransport.ts | 47 ++++ packages/core-web/src/worker/index.ts | 2 +- packages/core-web/src/worker/worker.js | 105 ++------ packages/core-web/src/worker/worker.test.ts | 90 ------- 19 files changed, 352 insertions(+), 476 deletions(-) create mode 100644 packages/core-web/src/rpc.ts create mode 100644 packages/core-web/src/types/rpc.ts delete mode 100644 packages/core-web/src/types/worker.ts delete mode 100644 packages/core-web/src/worker/WorkerClient.test.ts delete mode 100644 packages/core-web/src/worker/WorkerClient.ts create mode 100644 packages/core-web/src/worker/WorkerTransport.ts delete mode 100644 packages/core-web/src/worker/worker.test.ts diff --git a/packages/core-web/src/FedimintWallet.ts b/packages/core-web/src/FedimintWallet.ts index a06db39..e53ed99 100644 --- a/packages/core-web/src/FedimintWallet.ts +++ b/packages/core-web/src/FedimintWallet.ts @@ -1,4 +1,5 @@ -import { WorkerClient } from './worker' +import { RpcClient } from './rpc' +import { WebWorkerTransportInit } from './worker/WorkerTransport' import { BalanceService, MintService, @@ -11,7 +12,7 @@ import { logger, type LogLevel } from './utils/logger' const DEFAULT_CLIENT_NAME = 'fm-default' as const export class FedimintWallet { - private _client: WorkerClient + private _client: RpcClient public balance: BalanceService public mint: MintService @@ -56,7 +57,7 @@ export class FedimintWallet { this._openPromise = new Promise((resolve) => { this._resolveOpen = resolve }) - this._client = new WorkerClient() + this._client = new RpcClient(new WebWorkerTransportInit()) this.mint = new MintService(this._client) this.lightning = new LightningService(this._client) this.balance = new BalanceService(this._client) @@ -71,9 +72,9 @@ export class FedimintWallet { } async initialize() { - logger.info('Initializing WorkerClient') + logger.info('Initializing RpcClient') await this._client.initialize() - logger.info('WorkerClient initialized') + logger.info('RpcClient initialized') } async waitForOpen() { @@ -85,14 +86,15 @@ export class FedimintWallet { await this._client.initialize() // TODO: Determine if this should be safe or throw if (this._isOpen) throw new Error('The FedimintWallet is already open.') - const { success } = await this._client.sendSingleMessage<{ - success: boolean - }>('open', { clientName }) - if (success) { - this._isOpen = !!success + try { + await this._client.openClient(clientName) + this._isOpen = true this._resolveOpen() + return true + } catch (e) { + logger.error('Error opening client', e) + return false } - return success } async joinFederation( @@ -106,15 +108,10 @@ export class FedimintWallet { 'The FedimintWallet is already open. You can only call `joinFederation` on closed clients.', ) try { - const response = await this._client.sendSingleMessage<{ - success: boolean - }>('join', { inviteCode, clientName }) - if (response.success) { - this._isOpen = true - this._resolveOpen() - } - - return response.success + await this._client.joinFederation(inviteCode, clientName) + this._isOpen = true + this._resolveOpen() + return true } catch (e) { logger.error('Error joining federation', e) return false diff --git a/packages/core-web/src/rpc.ts b/packages/core-web/src/rpc.ts new file mode 100644 index 0000000..55933ba --- /dev/null +++ b/packages/core-web/src/rpc.ts @@ -0,0 +1,207 @@ +import type { + CancelFunction, + JSONValue, + ModuleKind, + RpcRequest, + RpcResponse, + RpcRequestFull, + RpcResponseFull, +} from './types' +import { logger } from './utils/logger' + +export interface RpcTransport { + sendRequest(request: RpcRequestFull): void + destroy(): void +} + +export interface RpcTransportInit { + init( + onRpcResponse: (response: RpcResponseFull) => void, + ): Promise +} + +// Handles communication with the wasm worker +// TODO: Move rpc stream management to a separate "SubscriptionManager" class +export class RpcClient { + private transport?: RpcTransport + private transportInit: RpcTransportInit + private requestCounter = 0 + private requestCallbacks = new Map void>() + private initPromise?: Promise + private clientName: string | undefined + + constructor(transportInit: RpcTransportInit) { + this.transportInit = transportInit + } + + private async initializeInner(): Promise { + this.transport = await this.transportInit.init( + this.handleWorkerMessage.bind(this), + ) + } + + async initialize() { + if (this.initPromise) { + return this.initPromise + } + + this.initPromise = this.initializeInner() + return this.initPromise + } + + private handleWorkerMessage = (response: RpcResponseFull) => { + const callback = this.requestCallbacks.get(response.request_id) + + if (callback) { + callback(response) + } else { + logger.warn( + 'RpcClient - handleWorkerMessage - received message with no callback', + response.request_id, + response, + ) + } + } + + async joinFederation(inviteCode: string, clientName: string) { + await this.internalRpcSingle({ + type: 'join_federation', + invite_code: inviteCode, + client_name: clientName, + }) + } + + async openClient(clientName: string) { + await this.internalRpcSingle({ + type: 'open_client', + client_name: clientName, + }) + this.clientName = clientName + } + + async closeClient(clientName: string) { + await this.internalRpcSingle({ + type: 'close_client', + client_name: clientName, + }) + this.clientName = undefined + } + + private internalRpcStream( + request: RpcRequest, + onData: (data: Response) => void, + onError: (error: string) => void, + onEnd: () => void = () => {}, + ): CancelFunction { + const requestId = ++this.requestCounter + logger.debug('RpcClient - rpcStream', requestId, request) + let unsubscribe = () => { + const cancelRequest: RpcRequestFull = { + request_id: ++this.requestCounter, + type: 'cancel_rpc', + cancel_request_id: requestId, + } + this.transport?.sendRequest(cancelRequest) + } + + const requestFull: RpcRequestFull = { + ...request, + request_id: requestId, + } + + this.requestCallbacks.set(requestId, (response: RpcResponse) => { + switch (response.type) { + case 'data': + onData(response.data) + break + case 'error': + onError(response.error) + break + case 'end': + this.requestCallbacks.delete(requestId) + onEnd() + break + case 'aborted': + this.requestCallbacks.delete(requestId) + onEnd() + break + } + }) + this.transport?.sendRequest(requestFull) + return unsubscribe + } + + private internalRpcSingle( + request: RpcRequest, + ) { + return new Promise((resolve, reject) => { + const unsubscribe = this.internalRpcStream( + request, + (data) => resolve(data), + (error) => reject(new Error(error)), + () => {}, + ) + // No need to unsubscribe for single requests as they auto-complete + }) + } + + rpcStream< + Response extends JSONValue = JSONValue, + Body extends JSONValue = JSONValue, + >( + module: ModuleKind, + method: string, + body: Body, + onData: (data: Response) => void, + onError: (error: string) => void, + onEnd: () => void = () => {}, + ): CancelFunction { + if (this.clientName === undefined) { + throw new Error('Wallet is not open') + } + return this.internalRpcStream( + { + type: 'client_rpc', + client_name: this.clientName, + module, + method, + payload: body, + }, + onData, + onError, + onEnd, + ) + } + + rpcSingle( + module: string, + method: string, + payload: P, + ): Promise { + if (this.clientName === undefined) { + throw new Error('Wallet is not open') + } + return this.internalRpcSingle({ + type: 'client_rpc', + client_name: this.clientName, + module, + method, + payload, + }) + } + + async cleanup() { + this.transport?.destroy() + this.requestCounter = 0 + this.initPromise = undefined + this.requestCallbacks.clear() + } + + // For Testing + _getRequestCounter() { + return this.requestCounter + } + _getRequestCallbackMap() { + return this.requestCallbacks + } +} diff --git a/packages/core-web/src/services/BalanceService.ts b/packages/core-web/src/services/BalanceService.ts index 5bba0ff..469ab85 100644 --- a/packages/core-web/src/services/BalanceService.ts +++ b/packages/core-web/src/services/BalanceService.ts @@ -1,4 +1,4 @@ -import { WorkerClient } from '../worker' +import { RpcClient } from '../rpc' /** * Balance Service @@ -6,7 +6,7 @@ import { WorkerClient } from '../worker' * The Balance Service provides methods to interact with the balance of a Fedimint wallet. */ export class BalanceService { - constructor(private client: WorkerClient) {} + constructor(private client: RpcClient) {} /** https://web.fedimint.org/core/FedimintWallet/BalanceService/getBalance */ async getBalance() { diff --git a/packages/core-web/src/services/FederationService.ts b/packages/core-web/src/services/FederationService.ts index bbbcf43..3dd19aa 100644 --- a/packages/core-web/src/services/FederationService.ts +++ b/packages/core-web/src/services/FederationService.ts @@ -1,8 +1,8 @@ import type { JSONValue } from '../types' -import { WorkerClient } from '../worker' +import { RpcClient } from '../rpc' export class FederationService { - constructor(private client: WorkerClient) {} + constructor(private client: RpcClient) {} async getConfig() { return await this.client.rpcSingle('', 'get_config', {}) diff --git a/packages/core-web/src/services/LightningService.ts b/packages/core-web/src/services/LightningService.ts index bf91e57..9a56b2b 100644 --- a/packages/core-web/src/services/LightningService.ts +++ b/packages/core-web/src/services/LightningService.ts @@ -1,4 +1,4 @@ -import { WorkerClient } from '../worker' +import { RpcClient } from '../rpc' import type { CreateBolt11Response, GatewayInfo, @@ -10,7 +10,7 @@ import type { } from '../types' export class LightningService { - constructor(private client: WorkerClient) {} + constructor(private client: RpcClient) {} /** https://web.fedimint.org/core/FedimintWallet/LightningService/createInvoice#lightning-createinvoice */ async createInvoice( diff --git a/packages/core-web/src/services/MintService.ts b/packages/core-web/src/services/MintService.ts index d1ab0d8..da7dd22 100644 --- a/packages/core-web/src/services/MintService.ts +++ b/packages/core-web/src/services/MintService.ts @@ -1,4 +1,4 @@ -import { WorkerClient } from '../worker' +import { RpcClient } from '../rpc' import type { Duration, JSONObject, @@ -10,7 +10,7 @@ import type { } from '../types' export class MintService { - constructor(private client: WorkerClient) {} + constructor(private client: RpcClient) {} /** https://web.fedimint.org/core/FedimintWallet/MintService/redeemEcash */ async redeemEcash(notes: string) { @@ -66,7 +66,7 @@ export class MintService { ? { nanos: 0, secs: tryCancelAfter } : tryCancelAfter - const res = await this.client.rpcSingle( + const [operationId, notes] = await this.client.rpcSingle<[string, string]>( 'mint', 'spend_notes', { @@ -76,8 +76,6 @@ export class MintService { extra_meta: extraMeta, }, ) - const notes = res[1] - const operationId = res[0] return { notes, diff --git a/packages/core-web/src/services/RecoveryService.ts b/packages/core-web/src/services/RecoveryService.ts index 876165b..55b9d98 100644 --- a/packages/core-web/src/services/RecoveryService.ts +++ b/packages/core-web/src/services/RecoveryService.ts @@ -1,8 +1,8 @@ import type { JSONValue } from '../types' -import { WorkerClient } from '../worker' +import { RpcClient } from '../rpc' export class RecoveryService { - constructor(private client: WorkerClient) {} + constructor(private client: RpcClient) {} async hasPendingRecoveries() { return await this.client.rpcSingle( diff --git a/packages/core-web/src/test/TestFedimintWallet.ts b/packages/core-web/src/test/TestFedimintWallet.ts index 3ebd244..e0b9669 100644 --- a/packages/core-web/src/test/TestFedimintWallet.ts +++ b/packages/core-web/src/test/TestFedimintWallet.ts @@ -1,5 +1,5 @@ import { FedimintWallet } from '../FedimintWallet' -import { WorkerClient } from '../worker/WorkerClient' +import { RpcClient } from '../rpc' import { TestingService } from './TestingService' export class TestFedimintWallet extends FedimintWallet { @@ -20,7 +20,7 @@ export class TestFedimintWallet extends FedimintWallet { } // Method to expose the WorkerClient - getWorkerClient(): WorkerClient { + getWorkerClient(): RpcClient { return this['_client'] } } diff --git a/packages/core-web/src/test/TestingService.ts b/packages/core-web/src/test/TestingService.ts index e033c98..b49ab4d 100644 --- a/packages/core-web/src/test/TestingService.ts +++ b/packages/core-web/src/test/TestingService.ts @@ -1,5 +1,5 @@ import { LightningService } from '../services' -import { WorkerClient } from '../worker' +import { RpcClient } from '../rpc' export const TESTING_INVITE = 'fed11qgqrsdnhwden5te0v9cxjtt4dekxzamxw4kz6mmjvvkhydted9ukg6r9xfsnx7th0fhn26tf093juamwv4u8gtnpwpcz7qqpyz0e327ua8geceutfrcaezwt22mk6s2rdy09kg72jrcmncng2gn0kp2m5sk' @@ -9,7 +9,7 @@ export const TESTING_INVITE = export class TestingService { public TESTING_INVITE: string constructor( - private client: WorkerClient, + private client: RpcClient, private lightning: LightningService, ) { // Solo Mint on mutinynet diff --git a/packages/core-web/src/test/fixtures.ts b/packages/core-web/src/test/fixtures.ts index a809152..dcaceb9 100644 --- a/packages/core-web/src/test/fixtures.ts +++ b/packages/core-web/src/test/fixtures.ts @@ -58,7 +58,7 @@ export const workerTest = test.extend<{ await use(randomTestingId) }, workerClient: async ({}, use) => { - const workerClient = new WorkerClient() + const workerClient = new RpcClient() await use(workerClient) }, }) diff --git a/packages/core-web/src/types/index.ts b/packages/core-web/src/types/index.ts index a2b9e6e..9825a9f 100644 --- a/packages/core-web/src/types/index.ts +++ b/packages/core-web/src/types/index.ts @@ -1,3 +1,3 @@ export * from './wallet' export * from './utils' -export * from './worker' +export * from './rpc' diff --git a/packages/core-web/src/types/rpc.ts b/packages/core-web/src/types/rpc.ts new file mode 100644 index 0000000..c65a62b --- /dev/null +++ b/packages/core-web/src/types/rpc.ts @@ -0,0 +1,49 @@ +export type RpcRequestFull = { + request_id: number +} & RpcRequest + +export type RpcRequest = + | { + type: 'join_federation' + invite_code: string + client_name: string + } + | { + type: 'open_client' + client_name: string + } + | { + type: 'close_client' + client_name: string + } + | { + type: 'client_rpc' + client_name: string + module: string + method: string + payload: any + } + | { + type: 'cancel_rpc' + cancel_request_id: number + } + +export type RpcResponseFull = { + request_id: number +} & RpcResponse + +export type RpcResponse = + | { + type: 'data' + data: any + } + | { + type: 'error' + error: string + } + | { + type: 'aborted' + } + | { + type: 'end' + } diff --git a/packages/core-web/src/types/worker.ts b/packages/core-web/src/types/worker.ts deleted file mode 100644 index cc25e61..0000000 --- a/packages/core-web/src/types/worker.ts +++ /dev/null @@ -1,13 +0,0 @@ -const WorkerMessageTypes = [ - 'init', - 'initialized', - 'rpc', - 'log', - 'open', - 'join', - 'error', - 'unsubscribe', - 'cleanup', -] as const - -export type WorkerMessageType = (typeof WorkerMessageTypes)[number] diff --git a/packages/core-web/src/worker/WorkerClient.test.ts b/packages/core-web/src/worker/WorkerClient.test.ts deleted file mode 100644 index 369d603..0000000 --- a/packages/core-web/src/worker/WorkerClient.test.ts +++ /dev/null @@ -1,6 +0,0 @@ -import { expect } from 'vitest' -import { workerTest } from '../test/fixtures' - -workerTest('should initialize', async ({ workerClient }) => { - expect(workerClient).toBeDefined() -}) diff --git a/packages/core-web/src/worker/WorkerClient.ts b/packages/core-web/src/worker/WorkerClient.ts deleted file mode 100644 index bd169d3..0000000 --- a/packages/core-web/src/worker/WorkerClient.ts +++ /dev/null @@ -1,236 +0,0 @@ -import type { - CancelFunction, - JSONValue, - ModuleKind, - StreamError, - StreamResult, - WorkerMessageType, -} from '../types' -import { logger } from '../utils/logger' - -// Handles communication with the wasm worker -// TODO: Move rpc stream management to a separate "SubscriptionManager" class -export class WorkerClient { - private worker: Worker - private requestCounter = 0 - private requestCallbacks = new Map void>() - private initPromise: Promise | undefined = undefined - - constructor() { - // Must create the URL inside the constructor for vite - this.worker = new Worker(new URL('./worker.js', import.meta.url), { - type: 'module', - }) - this.worker.onmessage = this.handleWorkerMessage.bind(this) - this.worker.onerror = this.handleWorkerError.bind(this) - logger.info('WorkerClient instantiated') - logger.debug('WorkerClient', this.worker) - } - - // Idempotent setup - Loads the wasm module - initialize() { - if (this.initPromise) return this.initPromise - this.initPromise = this.sendSingleMessage('init') - return this.initPromise - } - - private handleWorkerLogs(event: MessageEvent) { - const { type, level, message, ...data } = event.data - logger.log(level, message, ...data) - } - - private handleWorkerError(event: ErrorEvent) { - logger.error('Worker error', event) - } - - private handleWorkerMessage(event: MessageEvent) { - const { type, requestId, ...data } = event.data - if (type === 'log') { - this.handleWorkerLogs(event.data) - } - const streamCallback = this.requestCallbacks.get(requestId) - // TODO: Handle errors... maybe have another callbacks list for errors? - logger.debug('WorkerClient - handleWorkerMessage', event.data) - if (streamCallback) { - streamCallback(data) // {data: something} OR {error: something} - } else { - logger.warn( - 'WorkerClient - handleWorkerMessage - received message with no callback', - requestId, - event.data, - ) - } - } - - // TODO: Handle errors... maybe have another callbacks list for errors? - // TODO: Handle timeouts - // TODO: Handle multiple errors - - sendSingleMessage< - Response extends JSONValue = JSONValue, - Payload extends JSONValue = JSONValue, - >(type: WorkerMessageType, payload?: Payload) { - return new Promise((resolve, reject) => { - const requestId = ++this.requestCounter - logger.debug('WorkerClient - sendSingleMessage', requestId, type, payload) - this.requestCallbacks.set( - requestId, - (response: StreamResult) => { - this.requestCallbacks.delete(requestId) - logger.debug( - 'WorkerClient - sendSingleMessage - response', - requestId, - response, - ) - if (response.data) resolve(response.data) - else if (response.error) reject(response.error) - else - logger.warn( - 'WorkerClient - sendSingleMessage - malformed response', - requestId, - response, - ) - }, - ) - this.worker.postMessage({ type, payload, requestId }) - }) - } - - /** - * @summary Initiates an RPC stream with the specified module and method. - * - * @description - * This function sets up an RPC stream by sending a request to a worker and - * handling responses asynchronously. It ensures that unsubscription is handled - * correctly, even if the unsubscribe function is called before the subscription - * is fully established, by deferring the unsubscription attempt using `setTimeout`. - * - * The function operates in a non-blocking manner, leveraging Promises to manage - * asynchronous operations and callbacks to handle responses. - * - * - * @template Response - The expected type of the successful response. - * @template Body - The type of the request body. - * @param module - The module kind to interact with. - * @param method - The method name to invoke on the module. - * @param body - The request payload. - * @param onSuccess - Callback invoked with the response data on success. - * @param onError - Callback invoked with error information if an error occurs. - * @param onEnd - Optional callback invoked when the stream ends. - * @returns A function that can be called to cancel the subscription. - * - */ - rpcStream< - Response extends JSONValue = JSONValue, - Body extends JSONValue = JSONValue, - >( - module: ModuleKind, - method: string, - body: Body, - onSuccess: (res: Response) => void, - onError: (res: StreamError['error']) => void, - onEnd: () => void = () => {}, - ): CancelFunction { - const requestId = ++this.requestCounter - logger.debug('WorkerClient - rpcStream', requestId, module, method, body) - let unsubscribe: (value: void) => void = () => {} - let isSubscribed = false - - const unsubscribePromise = new Promise((resolve) => { - unsubscribe = () => { - if (isSubscribed) { - // If already subscribed, resolve immediately to trigger unsubscription - resolve() - } else { - // If not yet subscribed, defer the unsubscribe attempt to the next event loop tick - // This ensures that subscription setup has time to complete - setTimeout(() => unsubscribe(), 0) - } - } - }) - - // Initiate the inner RPC stream setup asynchronously - this._rpcStreamInner( - requestId, - module, - method, - body, - onSuccess, - onError, - onEnd, - unsubscribePromise, - ).then(() => { - isSubscribed = true - }) - - return unsubscribe - } - - private async _rpcStreamInner< - Response extends JSONValue = JSONValue, - Body extends JSONValue = JSONValue, - >( - requestId: number, - module: ModuleKind, - method: string, - body: Body, - onSuccess: (res: Response) => void, - onError: (res: StreamError['error']) => void, - onEnd: () => void = () => {}, - unsubscribePromise: Promise, - // Unsubscribe function - ) { - // await this.openPromise - // if (!this.worker || !this._isOpen) - // throw new Error('FedimintWallet is not open') - - this.requestCallbacks.set(requestId, (response: StreamResult) => { - if (response.error !== undefined) { - onError(response.error) - } else if (response.data !== undefined) { - onSuccess(response.data) - } else if (response.end !== undefined) { - this.requestCallbacks.delete(requestId) - onEnd() - } - }) - this.worker.postMessage({ - type: 'rpc', - payload: { module, method, body }, - requestId, - }) - - unsubscribePromise.then(() => { - this.worker?.postMessage({ - type: 'unsubscribe', - requestId, - }) - this.requestCallbacks.delete(requestId) - }) - } - - rpcSingle< - Response extends JSONValue = JSONValue, - Error extends string = string, - >(module: ModuleKind, method: string, body: JSONValue) { - logger.debug('WorkerClient - rpcSingle', module, method, body) - return new Promise((resolve, reject) => { - this.rpcStream(module, method, body, resolve, reject) - }) - } - - async cleanup() { - await this.sendSingleMessage('cleanup') - this.requestCounter = 0 - this.initPromise = undefined - this.requestCallbacks.clear() - } - - // For Testing - _getRequestCounter() { - return this.requestCounter - } - _getRequestCallbackMap() { - return this.requestCallbacks - } -} diff --git a/packages/core-web/src/worker/WorkerTransport.ts b/packages/core-web/src/worker/WorkerTransport.ts new file mode 100644 index 0000000..3a0ae75 --- /dev/null +++ b/packages/core-web/src/worker/WorkerTransport.ts @@ -0,0 +1,47 @@ +import type { RpcResponseFull, RpcRequestFull } from '../types' +import { logger } from '../utils/logger' +import type { RpcTransport, RpcTransportInit } from '../rpc' + +class WebWorkerTransport implements RpcTransport { + constructor(private worker: Worker) {} + + sendRequest(request: RpcRequestFull): void { + this.worker.postMessage(request) + } + + destroy(): void { + this.worker.terminate() + } +} + +export class WebWorkerTransportInit implements RpcTransportInit { + async init( + onRpcResponse: (response: RpcResponseFull) => void, + ): Promise { + const worker = new Worker(new URL('../worker.js', import.meta.url), { + type: 'module', + }) + + await new Promise((resolve, reject) => { + const handleInit = (event: MessageEvent) => { + const response = event.data + if (response.type === 'init_error') { + reject(new Error(response.error)) + } else if (response.type === 'init_success') { + logger.info('WebWorker instantiated') + resolve() + } + } + + worker.onmessage = handleInit + worker.postMessage({ + type: 'init', + }) + }) + + worker.onmessage = (event: MessageEvent) => { + onRpcResponse(event.data as RpcResponseFull) + } + return new WebWorkerTransport(worker) + } +} diff --git a/packages/core-web/src/worker/index.ts b/packages/core-web/src/worker/index.ts index cf57f44..19c1421 100644 --- a/packages/core-web/src/worker/index.ts +++ b/packages/core-web/src/worker/index.ts @@ -1 +1 @@ -export { WorkerClient } from './WorkerClient' +export { WebWorkerTransportInit } from './WorkerTransport' diff --git a/packages/core-web/src/worker/worker.js b/packages/core-web/src/worker/worker.js index 5fe9d4d..e99dc3f 100644 --- a/packages/core-web/src/worker/worker.js +++ b/packages/core-web/src/worker/worker.js @@ -4,102 +4,25 @@ // TODO: remove once https://github.com/vitest-dev/vitest/pull/6569 lands in a release globalThis.__vitest_browser_runner__ = { wrapDynamicImport: (foo) => foo() } -// dynamically imported Constructor for WasmClient -let WasmClient = null -// client instance -let client = null - -const streamCancelMap = new Map() - -const handleFree = (requestId) => { - streamCancelMap.delete(requestId) -} +let rpcHandler = null console.log('Worker - init') self.onmessage = async (event) => { - const { type, payload, requestId } = event.data - - try { - if (type === 'init') { - WasmClient = (await import('@fedimint/fedimint-client-wasm-bundler')) - .WasmClient - self.postMessage({ type: 'initialized', data: {}, requestId }) - } else if (type === 'open') { - const { clientName } = payload - client = (await WasmClient.open(clientName)) || null - self.postMessage({ - type: 'open', - data: { success: !!client }, - requestId, - }) - } else if (type === 'join') { - const { inviteCode, clientName: joinClientName } = payload - try { - client = await WasmClient.join_federation(joinClientName, inviteCode) - self.postMessage({ - type: 'join', - data: { success: !!client }, - requestId, - }) - } catch (e) { - self.postMessage({ type: 'error', error: e.message, requestId }) - } - } else if (type === 'rpc') { - const { module, method, body } = payload - console.log('RPC received', module, method, body) - if (!client) { - self.postMessage({ - type: 'error', - error: 'WasmClient not initialized', - requestId, - }) - return - } - const rpcHandle = await client.rpc( - module, - method, - JSON.stringify(body), - (res) => { - console.log('RPC response', requestId, res) - const data = JSON.parse(res) - self.postMessage({ type: 'rpcResponse', requestId, ...data }) - - if (data.end !== undefined) { - // Handle stream ending - const handle = streamCancelMap.get(requestId) - handle?.free() - } - }, + if (event.type === 'init') { + try { + const { RpcHandler } = await import( + '@fedimint/fedimint-client-wasm-bundler' ) - streamCancelMap.set(requestId, rpcHandle) - } else if (type === 'unsubscribe') { - const rpcHandle = streamCancelMap.get(requestId) - if (rpcHandle) { - rpcHandle.cancel() - rpcHandle.free() - streamCancelMap.delete(requestId) - } - } else if (type === 'cleanup') { - console.log('cleanup message received') - client?.free() - self.postMessage({ - type: 'cleanup', - data: {}, - requestId, - }) - close() - } else { - self.postMessage({ - type: 'error', - error: 'Unknown message type', - requestId, - }) + rpcHandler = new RpcHandler() + self.postMessage({ type: 'init_success' }) + } catch (err) { + console.error('Worker init failed:', err) + self.postMessage({ type: 'init_error', error: err.toString() }) } - } catch (e) { - console.error('ERROR', e) - self.postMessage({ type: 'error', error: e, requestId }) + } else { + rpcHandler.rpc(JSON.stringify(event.data), (response) => + self.postMessage(JSON.parse(response)), + ) } } - -// self.postMessage({ type: 'init', data: {} }) diff --git a/packages/core-web/src/worker/worker.test.ts b/packages/core-web/src/worker/worker.test.ts deleted file mode 100644 index 866dde3..0000000 --- a/packages/core-web/src/worker/worker.test.ts +++ /dev/null @@ -1,90 +0,0 @@ -import { expect } from 'vitest' -import { TESTING_INVITE } from '../test/TestingService' -import { JSONObject } from '../types' -import { workerTest } from '../test/fixtures' - -// Waits for a message of a given type from the worker -const waitForWorkerResponse = ( - worker: Worker, - messageType: string, -): Promise => { - return new Promise((resolve, reject) => { - worker.onmessage = (event) => { - if (event.data.type === messageType) { - resolve(event.data) - } else if (event.data.type === 'error') { - reject(event.data.error) - } - } - worker.onerror = (error) => { - reject(error.message) - } - }) -} - -workerTest( - 'should initialize WasmClient on init message', - async ({ worker }) => { - worker.postMessage({ type: 'init', requestId: 1 }) - const response = await waitForWorkerResponse(worker, 'initialized') - expect(response.data).toEqual({}) - }, -) - -workerTest( - 'should return false on open for a new client', - async ({ worker, clientName }) => { - worker.postMessage({ type: 'init', requestId: 1 }) - await waitForWorkerResponse(worker, 'initialized') - - worker.postMessage({ - type: 'open', - requestId: 2, - payload: { clientName }, - }) - const response = await waitForWorkerResponse(worker, 'open') - expect(response.data).toEqual({ success: false }) - }, -) - -workerTest( - 'should error on fake federation invitation', - async ({ worker, clientName }) => { - worker.postMessage({ type: 'init', requestId: 1 }) - await waitForWorkerResponse(worker, 'initialized') - - worker.postMessage({ - type: 'join', - requestId: 2, - payload: { inviteCode: 'fakefederationinvitation', clientName }, - }) - try { - await waitForWorkerResponse(worker, 'open') - expect.unreachable() - } catch (e) { - expect(e).toBe('parsing failed') - } - }, -) - -workerTest( - 'should handle joining a federation', - async ({ worker, clientName }) => { - worker.postMessage({ type: 'init', requestId: 1 }) - await waitForWorkerResponse(worker, 'initialized') - - worker.postMessage({ - type: 'join', - requestId: 2, - payload: { inviteCode: TESTING_INVITE, clientName }, - }) - const response = await waitForWorkerResponse(worker, 'join') - expect(response.data).toEqual({ success: true }) - }, -) - -workerTest('should handle unknown message type', async ({ worker }) => { - worker.postMessage({ type: 'unknown', requestId: 2 }) - const response = await waitForWorkerResponse(worker, 'error') - expect(response.error).toBe('Unknown message type') -})