Skip to content

Commit

Permalink
feat: rpc ng
Browse files Browse the repository at this point in the history
  • Loading branch information
maan2003 committed Oct 31, 2024
1 parent 9468ef5 commit 06899de
Show file tree
Hide file tree
Showing 20 changed files with 352 additions and 684 deletions.
37 changes: 17 additions & 20 deletions packages/core-web/src/FedimintWallet.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { WorkerClient } from './worker'
import { RpcClient } from './rpc'
import { WebWorkerTransportInit } from './worker/WorkerTransport'
import {
BalanceService,
MintService,
Expand All @@ -11,7 +12,7 @@ import { logger, type LogLevel } from './utils/logger'
const DEFAULT_CLIENT_NAME = 'fm-default' as const

export class FedimintWallet {
private _client: WorkerClient
private _client: RpcClient

public balance: BalanceService
public mint: MintService
Expand Down Expand Up @@ -56,7 +57,7 @@ export class FedimintWallet {
this._openPromise = new Promise((resolve) => {
this._resolveOpen = resolve
})
this._client = new WorkerClient()
this._client = new RpcClient(new WebWorkerTransportInit())
this.mint = new MintService(this._client)
this.lightning = new LightningService(this._client)
this.balance = new BalanceService(this._client)
Expand All @@ -71,9 +72,9 @@ export class FedimintWallet {
}

async initialize() {
logger.info('Initializing WorkerClient')
logger.info('Initializing RpcClient')
await this._client.initialize()
logger.info('WorkerClient initialized')
logger.info('RpcClient initialized')
}

async waitForOpen() {
Expand All @@ -85,14 +86,15 @@ export class FedimintWallet {
await this._client.initialize()
// TODO: Determine if this should be safe or throw
if (this._isOpen) throw new Error('The FedimintWallet is already open.')
const { success } = await this._client.sendSingleMessage<{
success: boolean
}>('open', { clientName })
if (success) {
this._isOpen = !!success
try {
await this._client.openClient(clientName)
this._isOpen = true
this._resolveOpen()
return true
} catch (e) {
logger.error('Error opening client', e)
return false
}
return success
}

async joinFederation(
Expand All @@ -106,15 +108,10 @@ export class FedimintWallet {
'The FedimintWallet is already open. You can only call `joinFederation` on closed clients.',
)
try {
const response = await this._client.sendSingleMessage<{
success: boolean
}>('join', { inviteCode, clientName })
if (response.success) {
this._isOpen = true
this._resolveOpen()
}

return response.success
await this._client.joinFederation(inviteCode, clientName)
this._isOpen = true
this._resolveOpen()
return true
} catch (e) {
logger.error('Error joining federation', e)
return false
Expand Down
207 changes: 207 additions & 0 deletions packages/core-web/src/rpc.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
import type {
CancelFunction,
JSONValue,
ModuleKind,
RpcRequest,
RpcResponse,
RpcRequestFull,
RpcResponseFull,
} from './types'
import { logger } from './utils/logger'

export interface RpcTransport {
sendRequest(request: RpcRequestFull): void
destroy(): void
}

export interface RpcTransportInit {
init(
onRpcResponse: (response: RpcResponseFull) => void,
): Promise<RpcTransport>
}

// Handles communication with the wasm worker
// TODO: Move rpc stream management to a separate "SubscriptionManager" class
export class RpcClient {
private transport?: RpcTransport
private transportInit: RpcTransportInit
private requestCounter = 0
private requestCallbacks = new Map<number, (response: RpcResponse) => void>()
private initPromise?: Promise<void>
private clientName: string | undefined

constructor(transportInit: RpcTransportInit) {
this.transportInit = transportInit
}

private async initializeInner(): Promise<void> {
this.transport = await this.transportInit.init(
this.handleWorkerMessage.bind(this),
)
}

async initialize() {
if (this.initPromise) {
return this.initPromise
}

this.initPromise = this.initializeInner()
return this.initPromise
}

private handleWorkerMessage = (response: RpcResponseFull) => {
const callback = this.requestCallbacks.get(response.request_id)

if (callback) {
callback(response)
} else {
logger.warn(
'RpcClient - handleWorkerMessage - received message with no callback',
response.request_id,
response,
)
}
}

async joinFederation(inviteCode: string, clientName: string) {
await this.internalRpcSingle({
type: 'join_federation',
invite_code: inviteCode,
client_name: clientName,
})
}

async openClient(clientName: string) {
await this.internalRpcSingle({
type: 'open_client',
client_name: clientName,
})
this.clientName = clientName
}

async closeClient(clientName: string) {
await this.internalRpcSingle({
type: 'close_client',
client_name: clientName,
})
this.clientName = undefined
}

private internalRpcStream<Response extends JSONValue = JSONValue>(
request: RpcRequest,
onData: (data: Response) => void,
onError: (error: string) => void,
onEnd: () => void = () => {},
): CancelFunction {
const requestId = ++this.requestCounter
logger.debug('RpcClient - rpcStream', requestId, request)
let unsubscribe = () => {
const cancelRequest: RpcRequestFull = {
request_id: ++this.requestCounter,
type: 'cancel_rpc',
cancel_request_id: requestId,
}
this.transport?.sendRequest(cancelRequest)
}

const requestFull: RpcRequestFull = {
...request,
request_id: requestId,
}

this.requestCallbacks.set(requestId, (response: RpcResponse) => {
switch (response.type) {
case 'data':
onData(response.data)
break
case 'error':
onError(response.error)
break
case 'end':
this.requestCallbacks.delete(requestId)
onEnd()
break
case 'aborted':
this.requestCallbacks.delete(requestId)
onEnd()
break
}
})
this.transport?.sendRequest(requestFull)
return unsubscribe
}

private internalRpcSingle<T extends JSONValue = JSONValue>(
request: RpcRequest,
): Promise<T> {
return new Promise((resolve, reject) => {
const unsubscribe = this.internalRpcStream(
request,
(data) => resolve(data as T),
(error) => reject(new Error(error)),
() => {},
)
// No need to unsubscribe for single requests as they auto-complete
})
}

rpcStream<
Response extends JSONValue = JSONValue,
Body extends JSONValue = JSONValue,
>(
module: ModuleKind,
method: string,
body: Body,
onData: (data: Response) => void,
onError: (error: string) => void,
onEnd: () => void = () => {},
): CancelFunction {
if (this.clientName === undefined) {
throw new Error('Wallet is not open')
}
return this.internalRpcStream(
{
type: 'client_rpc',
client_name: this.clientName,
module,
method,
payload: body,
},
onData,
onError,
onEnd,
)
}

rpcSingle<T extends JSONValue = JSONValue, P extends JSONValue = JSONValue>(
module: string,
method: string,
payload: P,
): Promise<T> {
if (this.clientName === undefined) {
throw new Error('Wallet is not open')
}
return this.internalRpcSingle<T>({
type: 'client_rpc',
client_name: this.clientName,
module,
method,
payload,
})
}

async cleanup() {
this.transport?.destroy()
this.requestCounter = 0
this.initPromise = undefined
this.requestCallbacks.clear()
}

// For Testing
_getRequestCounter() {
return this.requestCounter
}
_getRequestCallbackMap() {
return this.requestCallbacks
}
}
4 changes: 2 additions & 2 deletions packages/core-web/src/services/BalanceService.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { WorkerClient } from '../worker'
import { RpcClient } from '../rpc'

/**
* Balance Service
*
* The Balance Service provides methods to interact with the balance of a Fedimint wallet.
*/
export class BalanceService {
constructor(private client: WorkerClient) {}
constructor(private client: RpcClient) {}

/**
* Get the balance of the current wallet in milli-satoshis (MSats)
Expand Down
4 changes: 2 additions & 2 deletions packages/core-web/src/services/FederationService.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import type { JSONValue } from '../types'
import { WorkerClient } from '../worker'
import { RpcClient } from '../rpc'

export class FederationService {
constructor(private client: WorkerClient) {}
constructor(private client: RpcClient) {}

async getConfig(): Promise<JSONValue> {
return await this.client.rpcSingle('', 'get_config', {})
Expand Down
4 changes: 2 additions & 2 deletions packages/core-web/src/services/LightningService.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { WorkerClient } from '../worker'
import { RpcClient } from '../rpc'
import type {
CreateBolt11Response,
GatewayInfo,
Expand All @@ -11,7 +11,7 @@ import type {
} from '../types'

export class LightningService {
constructor(private client: WorkerClient) {}
constructor(private client: RpcClient) {}

async createInvoice(
amountMsats: number,
Expand Down
8 changes: 3 additions & 5 deletions packages/core-web/src/services/MintService.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { WorkerClient } from '../worker'
import { RpcClient } from '../rpc'
import type {
Duration,
JSONObject,
Expand All @@ -9,7 +9,7 @@ import type {
} from '../types'

export class MintService {
constructor(private client: WorkerClient) {}
constructor(private client: RpcClient) {}

async redeemEcash(notes: string): Promise<void> {
await this.client.rpcSingle('mint', 'reissue_external_notes', {
Expand Down Expand Up @@ -58,7 +58,7 @@ export class MintService {
? { nanos: 0, secs: tryCancelAfter }
: tryCancelAfter

const res = await this.client.rpcSingle<Array<string>>(
const [operationId, notes] = await this.client.rpcSingle<[string, string]>(
'mint',
'spend_notes',
{
Expand All @@ -68,8 +68,6 @@ export class MintService {
extra_meta: extraMeta,
},
)
const notes = res[1]
const operationId = res[0]

return {
notes,
Expand Down
4 changes: 2 additions & 2 deletions packages/core-web/src/services/RecoveryService.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import type { JSONValue } from '../types'
import { WorkerClient } from '../worker'
import { RpcClient } from '../rpc'

export class RecoveryService {
constructor(private client: WorkerClient) {}
constructor(private client: RpcClient) {}

async hasPendingRecoveries(): Promise<boolean> {
return await this.client.rpcSingle('', 'has_pending_recoveries', {})
Expand Down
4 changes: 2 additions & 2 deletions packages/core-web/src/test/TestFedimintWallet.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { FedimintWallet } from '../FedimintWallet'
import { WorkerClient } from '../worker/WorkerClient'
import { RpcClient } from '../rpc'
import { TestingService } from './TestingService'

export class TestFedimintWallet extends FedimintWallet {
Expand All @@ -20,7 +20,7 @@ export class TestFedimintWallet extends FedimintWallet {
}

// Method to expose the WorkerClient
getWorkerClient(): WorkerClient {
getWorkerClient(): RpcClient {
return this['_client']
}
}
Loading

0 comments on commit 06899de

Please sign in to comment.