From 050b01f05265eccc0d4cd9e0bd5706852d8d142b Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Thu, 7 Nov 2024 11:09:35 +0000 Subject: [PATCH] feat: add latency option to memory transport (#2810) Allows slowing all network traffic by the passed amount of ms. --- packages/transport-memory/package.json | 5 +- packages/transport-memory/src/connections.ts | 12 ++-- packages/transport-memory/src/index.ts | 7 ++ packages/transport-memory/src/listener.ts | 19 +++++ packages/transport-memory/src/memory.ts | 44 ------------ .../transport-memory/test/compliance.spec.ts | 69 ------------------- packages/transport-memory/test/index.spec.ts | 68 ++++++++++++++++++ 7 files changed, 100 insertions(+), 124 deletions(-) delete mode 100644 packages/transport-memory/test/compliance.spec.ts create mode 100644 packages/transport-memory/test/index.spec.ts diff --git a/packages/transport-memory/package.json b/packages/transport-memory/package.json index 0269be5766..81cdd6cc74 100644 --- a/packages/transport-memory/package.json +++ b/packages/transport-memory/package.json @@ -62,11 +62,10 @@ "uint8arraylist": "^2.4.8" }, "devDependencies": { - "@libp2p/crypto": "^5.0.6", - "@libp2p/interface-compliance-tests": "^6.1.8", "@libp2p/logger": "^5.1.3", "@libp2p/peer-id": "^5.0.7", - "aegir": "^44.0.1" + "aegir": "^44.0.1", + "sinon-ts": "^2.0.0" }, "browser": { "./dist/src/tcp.js": "./dist/src/tcp.browser.js" diff --git a/packages/transport-memory/src/connections.ts b/packages/transport-memory/src/connections.ts index a28ace40a8..6c2f658493 100644 --- a/packages/transport-memory/src/connections.ts +++ b/packages/transport-memory/src/connections.ts @@ -47,7 +47,7 @@ import { multiaddr } from '@multiformats/multiaddr' import delay from 'delay' import map from 'it-map' import { pushable } from 'it-pushable' -import type { MemoryTransportComponents } from './index.js' +import type { MemoryTransportComponents, MemoryTransportInit } from './index.js' import type { MultiaddrConnection, PeerId } from '@libp2p/interface' import type { Uint8ArrayList } from 'uint8arraylist' @@ -57,7 +57,7 @@ interface MemoryConnectionHandler { (maConn: MultiaddrConnection): void } -interface MemoryConnectionInit { +interface MemoryConnectionInit extends MemoryTransportInit { onConnection: MemoryConnectionHandler address: string } @@ -66,13 +66,13 @@ export class MemoryConnection { private readonly components: MemoryTransportComponents private readonly init: MemoryConnectionInit private readonly connections: Set - private latency: number + private readonly latency: number constructor (components: MemoryTransportComponents, init: MemoryConnectionInit) { this.components = components this.init = init this.connections = new Set() - this.latency = 0 + this.latency = init.latency ?? 0 } async dial (dialingPeerId: PeerId): Promise { @@ -169,8 +169,4 @@ export class MemoryConnection { maConn.abort(new ConnectionFailedError('Memory Connection closed')) }) } - - setLatency (ms: number): void { - this.latency = ms - } } diff --git a/packages/transport-memory/src/index.ts b/packages/transport-memory/src/index.ts index cab3478b87..3d77fff2d0 100644 --- a/packages/transport-memory/src/index.ts +++ b/packages/transport-memory/src/index.ts @@ -53,6 +53,13 @@ export interface MemoryTransportComponents { export interface MemoryTransportInit { upgraderOptions?: UpgraderOptions inboundUpgradeTimeout?: number + + /** + * Add this much latency in ms to every buffer sent over the transport + * + * @default 0 + */ + latency?: number } export function memory (init?: MemoryTransportInit): (components: MemoryTransportComponents) => Transport { diff --git a/packages/transport-memory/src/listener.ts b/packages/transport-memory/src/listener.ts index ffedea10dc..9320b3c2a8 100644 --- a/packages/transport-memory/src/listener.ts +++ b/packages/transport-memory/src/listener.ts @@ -40,6 +40,24 @@ * * // use connection... * ``` + * + * @example Simulating slow connections + * + * A `latency` argument can be passed to the factory. Each byte array that + * passes through the transport will be delayed by this many ms. + * + * ```TypeScript + * import { createLibp2p } from 'libp2p' + * import { memory } from '@libp2p/memory' + * + * const dialer = await createLibp2p({ + * transports: [ + * memory({ + * latency: 100 + * }) + * ] + * }) + * ``` */ import { ListenError, TypedEventEmitter } from '@libp2p/interface' @@ -77,6 +95,7 @@ export class MemoryTransportListener extends TypedEventEmitter i } this.connection = new MemoryConnection(this.components, { + ...this.init, onConnection: this.onConnection.bind(this), address }) diff --git a/packages/transport-memory/src/memory.ts b/packages/transport-memory/src/memory.ts index 315509cba5..8783e955fb 100644 --- a/packages/transport-memory/src/memory.ts +++ b/packages/transport-memory/src/memory.ts @@ -1,47 +1,3 @@ -/** - * @packageDocumentation - * - * A [libp2p transport](https://docs.libp2p.io/concepts/transports/overview/) - * that operates in-memory only. - * - * This is intended for testing and can only be used to connect two libp2p nodes - * that are running in the same process. - * - * @example - * - * ```TypeScript - * import { createLibp2p } from 'libp2p' - * import { memory } from '@libp2p/memory' - * import { multiaddr } from '@multiformats/multiaddr' - * - * const listener = await createLibp2p({ - * addresses: { - * listen: [ - * '/memory/node-a' - * ] - * }, - * transports: [ - * memory() - * ] - * }) - * - * const dialer = await createLibp2p({ - * transports: [ - * memory() - * ] - * }) - * - * const ma = multiaddr('/memory/node-a') - * - * // dial the listener, timing out after 10s - * const connection = await dialer.dial(ma, { - * signal: AbortSignal.timeout(10_000) - * }) - * - * // use connection... - * ``` - */ - import { ConnectionFailedError, serviceCapabilities, transportSymbol } from '@libp2p/interface' import { Memory } from '@multiformats/multiaddr-matcher' import { connections } from './connections.js' diff --git a/packages/transport-memory/test/compliance.spec.ts b/packages/transport-memory/test/compliance.spec.ts deleted file mode 100644 index 6620843256..0000000000 --- a/packages/transport-memory/test/compliance.spec.ts +++ /dev/null @@ -1,69 +0,0 @@ -import { generateKeyPair } from '@libp2p/crypto/keys' -import tests from '@libp2p/interface-compliance-tests/transport' -import { defaultLogger } from '@libp2p/logger' -import { peerIdFromPrivateKey } from '@libp2p/peer-id' -import { multiaddr } from '@multiformats/multiaddr' -import { connections } from '../src/connections.js' -import { memory } from '../src/index.js' -import type { MemoryTransportListener } from '../src/listener.js' -import type { Listener } from '@libp2p/interface' - -describe('transport compliance tests', () => { - tests({ - async setup () { - const privateKey = await generateKeyPair('Ed25519') - - const transport = memory()({ - logger: defaultLogger(), - peerId: peerIdFromPrivateKey(privateKey) - }) - const addrs = [ - multiaddr('/memory/addr-1'), - multiaddr('/memory/addr-2'), - multiaddr('/memory/addr-3'), - multiaddr('/memory/addr-4') - ] - - let delayMs = 0 - const delayedCreateListener = (options: any): Listener => { - const listener = transport.createListener(options) as unknown as MemoryTransportListener - - const onConnection = listener.onConnection.bind(listener) - - listener.onConnection = (maConn: any) => { - setTimeout(() => { - onConnection(maConn) - }, delayMs) - } - - return listener - } - - const transportProxy = new Proxy(transport, { - // @ts-expect-error cannot access props with a string - get: (_, prop) => prop === 'createListener' ? delayedCreateListener : transport[prop] - }) - - // Used by the dial tests to simulate a delayed connect - const connector = { - delay (ms: number) { - delayMs = ms - connections.get('/memory/addr-1')?.setLatency(ms) - connections.get('/memory/addr-2')?.setLatency(ms) - connections.get('/memory/addr-3')?.setLatency(ms) - connections.get('/memory/addr-4')?.setLatency(ms) - }, - restore () { - delayMs = 0 - connections.get('/memory/addr-1')?.setLatency(0) - connections.get('/memory/addr-2')?.setLatency(0) - connections.get('/memory/addr-3')?.setLatency(0) - connections.get('/memory/addr-4')?.setLatency(0) - } - } - - return { dialer: transportProxy, listener: transportProxy, listenAddrs: addrs, dialAddrs: addrs, connector } - }, - async teardown () {} - }) -}) diff --git a/packages/transport-memory/test/index.spec.ts b/packages/transport-memory/test/index.spec.ts new file mode 100644 index 0000000000..b753cfc47d --- /dev/null +++ b/packages/transport-memory/test/index.spec.ts @@ -0,0 +1,68 @@ +import { defaultLogger } from '@libp2p/logger' +import { peerIdFromString } from '@libp2p/peer-id' +import { multiaddr } from '@multiformats/multiaddr' +import { expect } from 'aegir/chai' +import { stubInterface } from 'sinon-ts' +import { memory } from '../src/index.js' +import type { Upgrader, Connection } from '@libp2p/interface' + +describe('memory', () => { + let upgrader: Upgrader + + beforeEach(async () => { + upgrader = stubInterface({ + upgradeInbound: async (maConn) => { + return stubInterface() + }, + upgradeOutbound: async (maConn) => { + return stubInterface() + } + }) + }) + + it('should dial', async () => { + const transport = memory()({ + peerId: peerIdFromString('12D3KooWJRSrypvnpHgc6ZAgyCni4KcSmbV7uGRaMw5LgMKT18fq'), + logger: defaultLogger() + }) + const ma = multiaddr('/memory/address-1') + const listener = transport.createListener({ + upgrader + }) + await listener.listen(ma) + + const conn = await transport.dial(ma, { + upgrader + }) + + await conn.close() + await listener.close() + }) + + it('should dial with latency', async () => { + const latency = 1000 + const transport = memory({ + latency + })({ + peerId: peerIdFromString('12D3KooWJRSrypvnpHgc6ZAgyCni4KcSmbV7uGRaMw5LgMKT18fq'), + logger: defaultLogger() + }) + const ma = multiaddr('/memory/address-1') + const listener = transport.createListener({ + upgrader + }) + await listener.listen(ma) + + const start = Date.now() + const conn = await transport.dial(ma, { + upgrader + }) + const end = Date.now() + + // +/- a bit + expect(end - start).to.be.greaterThan(latency / 1.1) + + await conn.close() + await listener.close() + }) +})