Skip to content

Commit

Permalink
chore: update sc provider
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanleecode committed Jan 28, 2025
1 parent b132f64 commit 48588b2
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 165 deletions.
11 changes: 8 additions & 3 deletions packages/rpc-provider/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,14 @@
"tslib": "^2.8.1"
},
"devDependencies": {
"@substrate/connect": "0.8.11"
"@substrate/connect": "^2.1.2"
},
"optionalDependencies": {
"@substrate/connect": "0.8.11"
"peerDependencies": {
"@substrate/connect": "^2.1.2"
},
"peerDependenciesMeta": {
"@substrate/connect": {
"optional": true
}
}
}
116 changes: 68 additions & 48 deletions packages/rpc-provider/src/substrate-connect/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const wait = (ms: number) =>
setTimeout(resolve, ms)
);

function healthCheckerMock (): MockedHealthChecker {
function healthCheckerMock(): MockedHealthChecker {
let cb: (health: SmoldotHealth) => void = () => undefined;
let sendJsonRpc: (request: string) => void = () => undefined;
let isActive = false;
Expand All @@ -66,7 +66,7 @@ function healthCheckerMock (): MockedHealthChecker {
};
}

function healthCheckerFactory () {
function healthCheckerFactory() {
const _healthCheckers: MockedHealthChecker[] = [];

return {
Expand All @@ -82,13 +82,33 @@ function healthCheckerFactory () {
};
}

function getFakeChain (spec: string, callback: Sc.JsonRpcCallback): MockChain {
function getFakeChain(spec: string): MockChain {
const _receivedRequests: string[] = [];
let _isTerminated = false;

let terminateInterceptor = Function.prototype;
let sendJsonRpcInterceptor = Function.prototype;

const responseQueue: string[] = []

const nextJsonRpcResponse = async () => {
if (responseQueue.length === 0) {
await new Promise((resolve) => setTimeout(resolve, 0));
return nextJsonRpcResponse();
}

return responseQueue.shift()!;
}

async function* jsonRpcResponsesGenerator(): AsyncIterableIterator<string> {
while (true) {
const response = await nextJsonRpcResponse();
if (response) {
yield response;
}
}
}

return {
_getLatestRequest: () => _receivedRequests[_receivedRequests.length - 1],
_isTerminated: () => _isTerminated,
Expand All @@ -101,14 +121,15 @@ function getFakeChain (spec: string, callback: Sc.JsonRpcCallback): MockChain {
},
_spec: () => spec,
_triggerCallback: (response) => {
callback(
typeof response === 'string'
? response
: stringify(response)
);
const message = typeof response === 'string'
? response
: stringify(response)
responseQueue.push(message)

},
addChain: (chainSpec, jsonRpcCallback) =>
Promise.resolve(getFakeChain(chainSpec, jsonRpcCallback ?? noop)),
nextJsonRpcResponse,
jsonRpcResponses: jsonRpcResponsesGenerator(),
addChain: (chainSpec) => Promise.resolve(getFakeChain(chainSpec)),
remove: () => {
terminateInterceptor();
_isTerminated = true;
Expand All @@ -120,11 +141,27 @@ function getFakeChain (spec: string, callback: Sc.JsonRpcCallback): MockChain {
};
}

function getFakeClient () {
function getFakeClient() {
const chains: MockChain[] = [];
let addChainInterceptor: Promise<void> = Promise.resolve();
let addWellKnownChainInterceptor: Promise<void> = Promise.resolve();

const addChain: Sc.AddChain = async (chainSpec) => addChainInterceptor.then(() => {
const result = getFakeChain(chainSpec);

chains.push(result);

return result;
})

const addWellKnownChain: Sc.AddWellKnownChain = async (wellKnownChain) => addWellKnownChainInterceptor.then(() => {
const result = getFakeChain(wellKnownChain);

chains.push(result);

return result;
})

return {
_chains: () => chains,
_setAddChainInterceptor: (interceptor: Promise<void>) => {
Expand All @@ -133,29 +170,12 @@ function getFakeClient () {
_setAddWellKnownChainInterceptor: (interceptor: Promise<void>) => {
addWellKnownChainInterceptor = interceptor;
},
addChain: (chainSpec: string, cb: Sc.JsonRpcCallback): Promise<MockChain> =>
addChainInterceptor.then(() => {
const result = getFakeChain(chainSpec, cb);

chains.push(result);

return result;
}),
addWellKnownChain: (
wellKnownChain: string,
cb: Sc.JsonRpcCallback
): Promise<MockChain> =>
addWellKnownChainInterceptor.then(() => {
const result = getFakeChain(wellKnownChain, cb);

chains.push(result);

return result;
})
addChain,
addWellKnownChain
};
}

function connectorFactory (): MockSc {
function connectorFactory(): MockSc {
const clients: ReturnType<typeof getFakeClient>[] = [];
const latestClient = () => clients[clients.length - 1];

Expand All @@ -175,7 +195,7 @@ function connectorFactory (): MockSc {
} as unknown as MockSc;
}

function setChainSyncyingStatus (isSyncing: boolean): void {
function setChainSyncyingStatus(isSyncing: boolean): void {
getCurrentHealthChecker()._triggerHealthUpdate({
isSyncing,
peers: 1,
Expand Down Expand Up @@ -206,7 +226,7 @@ describe('ScProvider', () => {
expect(onConnected).not.toHaveBeenCalled();
setChainSyncyingStatus(false);
expect(onConnected).toHaveBeenCalled();
});
}, 5000);

it('stops receiving notifications after unsubscribing', async () => {
const provider = new ScProvider(mockSc, '');
Expand All @@ -220,7 +240,7 @@ describe('ScProvider', () => {

setChainSyncyingStatus(false);
expect(onConnected).not.toHaveBeenCalled();
});
}, 5000);

it('synchronously emits connected if the Provider is already `connected`', async () => {
const provider = new ScProvider(mockSc, '');
Expand All @@ -233,7 +253,7 @@ describe('ScProvider', () => {

provider.on('connected', onConnected);
expect(onConnected).toHaveBeenCalled();
});
}, 5000);

it('emits `disconnected` once the chain goes back to syncing', async () => {
const provider = new ScProvider(mockSc, '');
Expand All @@ -256,7 +276,7 @@ describe('ScProvider', () => {

expect(onConnected).not.toHaveBeenCalled();
expect(onDisconnected).toHaveBeenCalled();
});
}, 5000);
});

describe('hasSubscriptions', () => {
Expand Down Expand Up @@ -288,7 +308,7 @@ describe('ScProvider', () => {

await provider.connect(undefined, mockedHealthChecker.healthChecker);
expect(chain).toBe(mockSc.latestChain());
});
}, 5000);

it('throws when trying to connect on an already connected Provider', async () => {
const provider = new ScProvider(mockSc, '');
Expand All @@ -300,7 +320,7 @@ describe('ScProvider', () => {
await expect(
provider.connect(undefined, mockedHealthChecker.healthChecker)
).rejects.toThrow(/Already connected/);
});
}, 5000);
});

describe('disconnect', () => {
Expand All @@ -313,7 +333,7 @@ describe('ScProvider', () => {
await provider.disconnect();

expect(chain._isTerminated()).toBe(true);
});
}, 5000);

// eslint-disable-next-line jest/expect-expect
it('does not throw when disconnecting on an already disconnected Provider', async () => {
Expand Down Expand Up @@ -360,7 +380,7 @@ describe('ScProvider', () => {
const response = await responsePromise;

expect(response).toEqual(result);
});
}, 5000);

it("rejects when the response can't be deserialized", async () => {
const provider = new ScProvider(mockSc, '');
Expand All @@ -378,7 +398,7 @@ describe('ScProvider', () => {
}, 0);

await expect(provider.send('getData', ['foo'])).rejects.toThrow();
});
}, 5000);

it('rejects when the smoldot chain has crashed', async () => {
const provider = new ScProvider(mockSc, '');
Expand All @@ -397,7 +417,7 @@ describe('ScProvider', () => {
provider.send('getData', ['foo'])
).rejects.toThrow(/Disconnected/);
expect(provider.isConnected).toBe(false);
});
}, 5000);
});

describe('subscribe', () => {
Expand Down Expand Up @@ -466,7 +486,7 @@ describe('ScProvider', () => {
});
expect(cb).toHaveBeenCalledTimes(2);
expect(cb).toHaveBeenLastCalledWith(null, 2);
});
}, 5000);

it('ignores subscription messages that were received before the subscription token', async () => {
const provider = new ScProvider(mockSc, '');
Expand Down Expand Up @@ -504,7 +524,7 @@ describe('ScProvider', () => {

expect(token).toBe(unsubscribeToken);
expect(cb).not.toHaveBeenCalled();
});
}, 5000);

it('emits the error when the message has an error', async () => {
const provider = new ScProvider(mockSc, '');
Expand Down Expand Up @@ -545,7 +565,7 @@ describe('ScProvider', () => {
expect(token).toBe(unsubscribeToken);
expect(cb).toHaveBeenCalledTimes(1);
expect(cb).toHaveBeenLastCalledWith(expect.any(Error), undefined);
});
}, 5000);

it('errors when subscribing to an unsupported method', async () => {
const provider = new ScProvider(mockSc, '');
Expand All @@ -558,7 +578,7 @@ describe('ScProvider', () => {
await expect(
provider.subscribe('foo', 'bar', ['baz'], () => undefined)
).rejects.toThrow(/Unsupported subscribe method: bar/);
});
}, 5000);
});

describe('unsubscribe', () => {
Expand All @@ -572,7 +592,7 @@ describe('ScProvider', () => {
await expect(
provider.unsubscribe('', '', '')
).rejects.toThrow(/Unable to find active subscription/);
});
}, 5000);
});

it('cleans up the stale subscriptions once it reconnects', async () => {
Expand Down Expand Up @@ -634,5 +654,5 @@ describe('ScProvider', () => {
`{"id":2,"jsonrpc":"2.0","method":"chain_unsubscribeNewHeads","params":["${token}"]}`,
'{"id":3,"jsonrpc":"2.0","method":"chain_subscribeNewHeads","params":["baz"]}'
]);
});
}, 5000);
});
16 changes: 14 additions & 2 deletions packages/rpc-provider/src/substrate-connect/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export class ScProvider implements ProviderInterface {
#chain: Promise<ScType.Chain> | null = null;
#isChainReady = false;

public constructor (Sc: SubstrateConnect, spec: string | ScType.WellKnownChain, sharedSandbox?: ScProvider) {
public constructor (Sc: SubstrateConnect, spec: ScType.WellKnownChain | (string & {}), sharedSandbox?: ScProvider) {
if (!isObject(Sc) || !isObject(Sc.WellKnownChain) || !isFunction(Sc.createScClient)) {
throw new Error('Expected an @substrate/connect interface as first parameter to ScProvider');
}
Expand Down Expand Up @@ -166,7 +166,19 @@ export class ScProvider implements ProviderInterface {
? client.addWellKnownChain
: client.addChain;

this.#chain = addChain(this.#spec as ScType.WellKnownChain, onResponse).then((chain) => {
this.#chain = addChain(this.#spec as ScType.WellKnownChain).then((chain) => {
// Process JSON-RPC responses
void (async () => {
try {
for await (const response of chain.jsonRpcResponses) {
onResponse(response);
}
} catch (error) {
l.error('Error processing JSON-RPC responses:', error);
this.#eventemitter.emit('error', error);
}
})();

hc.setSendJsonRpc(chain.sendJsonRpc);

this.#isChainReady = false;
Expand Down
Loading

0 comments on commit 48588b2

Please sign in to comment.