From 91687998d7ae536549ddcd840aa430098860f0fb Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Wed, 6 Nov 2024 06:25:58 +0000 Subject: [PATCH] chore: remove all sibling test deps (#2806) Converts last set of tests to use stubbing instead of depending on sibling modules in the monorepo. This will reduce release churn in the future. --- packages/libp2p/package.json | 6 +- .../libp2p/test/addresses/addresses.spec.ts | 233 ------ .../connection-gater.spec.ts | 229 +++-- .../test/connection-manager/index.spec.ts | 158 +--- .../test/connection-manager/resolver.spec.ts | 92 --- .../libp2p/test/connection-manager/utils.ts | 35 + packages/libp2p/test/core/core.spec.ts | 13 - packages/libp2p/test/core/listening.spec.ts | 44 - packages/libp2p/test/fixtures/create-peers.ts | 68 -- packages/libp2p/test/fixtures/slow-muxer.ts | 29 - .../test/transports/transport-manager.spec.ts | 128 ++- .../libp2p/test/upgrading/upgrader.spec.ts | 780 +++++++++++------- packages/libp2p/test/upgrading/utils.ts | 39 + 13 files changed, 782 insertions(+), 1072 deletions(-) delete mode 100644 packages/libp2p/test/addresses/addresses.spec.ts delete mode 100644 packages/libp2p/test/connection-manager/resolver.spec.ts create mode 100644 packages/libp2p/test/connection-manager/utils.ts delete mode 100644 packages/libp2p/test/core/listening.spec.ts delete mode 100644 packages/libp2p/test/fixtures/create-peers.ts delete mode 100644 packages/libp2p/test/fixtures/slow-muxer.ts create mode 100644 packages/libp2p/test/upgrading/utils.ts diff --git a/packages/libp2p/package.json b/packages/libp2p/package.json index a3e9406472..46f818a813 100644 --- a/packages/libp2p/package.json +++ b/packages/libp2p/package.json @@ -113,15 +113,11 @@ "uint8arrays": "^5.1.0" }, "devDependencies": { - "@chainsafe/libp2p-yamux": "^7.0.0", - "@libp2p/echo": "^2.1.1", - "@libp2p/memory": "^0.0.0", - "@libp2p/mplex": "^11.0.10", - "@libp2p/plaintext": "^2.0.10", "aegir": "^44.0.1", "delay": "^6.0.0", "it-all": "^3.0.6", "it-drain": "^3.0.7", + "it-length-prefixed": "^9.1.0", "it-map": "^3.1.0", "it-pair": "^2.0.6", "it-stream-types": "^2.0.1", diff --git a/packages/libp2p/test/addresses/addresses.spec.ts b/packages/libp2p/test/addresses/addresses.spec.ts deleted file mode 100644 index a5de3e2e35..0000000000 --- a/packages/libp2p/test/addresses/addresses.spec.ts +++ /dev/null @@ -1,233 +0,0 @@ -/* eslint-env mocha */ - -import { memory } from '@libp2p/memory' -import { multiaddr, protocols } from '@multiformats/multiaddr' -import { expect } from 'aegir/chai' -import { pEvent } from 'p-event' -import { createLibp2p } from '../../src/index.js' -import { getComponent } from '../fixtures/get-component.js' -import type { Libp2p, PeerUpdate } from '@libp2p/interface' -import type { AddressManager, TransportManager } from '@libp2p/interface-internal' -import type { Multiaddr } from '@multiformats/multiaddr' - -const listenAddresses = ['/memory/address-1', '/memory/address-2'] -const announceAddresses = ['/dns4/peer.io/tcp/433/p2p/12D3KooWNvSZnPi3RrhrTwEY4LuuBeB6K6facKUCJcyWG1aoDd2p'] - -describe('libp2p.addressManager', () => { - let libp2p: Libp2p - - afterEach(async () => { - await libp2p?.stop() - }) - - it('should keep listen addresses after start, even if changed', async () => { - libp2p = await createLibp2p({ - start: false, - addresses: { - listen: listenAddresses, - announce: announceAddresses - }, - transports: [ - memory() - ] - }) - - const addressManager = getComponent(libp2p, 'addressManager') - let listenAddrs = addressManager.getListenAddrs().map(ma => ma.toString()) - expect(listenAddrs).to.have.lengthOf(listenAddresses.length) - expect(listenAddrs).to.include(listenAddresses[0]) - expect(listenAddrs).to.include(listenAddresses[1]) - - // Should not replace listen addresses after transport listen - // Only transportManager has visibility of the port used - await libp2p.start() - - listenAddrs = addressManager.getListenAddrs().map(ma => ma.toString()) - expect(listenAddrs).to.have.lengthOf(listenAddresses.length) - expect(listenAddrs).to.include(listenAddresses[0]) - expect(listenAddrs).to.include(listenAddresses[1]) - }) - - it('should announce transport listen addresses if announce addresses are not provided', async () => { - libp2p = await createLibp2p({ - addresses: { - listen: listenAddresses - }, - transports: [ - memory() - ] - }) - - const tmListen = getComponent(libp2p, 'transportManager').getAddrs().map((ma) => ma.toString()) - - // Announce 2 listen (transport) - const advertiseMultiaddrs = getComponent(libp2p, 'addressManager').getAddresses().map((ma) => ma.decapsulateCode(protocols('p2p').code).toString()) - - expect(advertiseMultiaddrs).to.have.lengthOf(listenAddresses.length) - tmListen.forEach((m) => { - expect(advertiseMultiaddrs).to.include(m) - }) - }) - - it('should only announce the given announce addresses when provided', async () => { - libp2p = await createLibp2p({ - addresses: { - listen: listenAddresses, - announce: announceAddresses - }, - transports: [ - memory() - ] - }) - - const tmListen = getComponent(libp2p, 'transportManager').getAddrs().map((ma) => ma.toString()) - - // Announce 1 announce addr - const advertiseMultiaddrs = getComponent(libp2p, 'addressManager').getAddresses().map((ma) => ma.decapsulateCode(protocols('p2p').code).toString()) - expect(advertiseMultiaddrs.length).to.equal(announceAddresses.length) - advertiseMultiaddrs.forEach((m) => { - expect(tmListen).to.not.include(m) - expect(announceAddresses).to.include(m) - }) - }) - - it('should filter listen addresses filtered by the announce filter', async () => { - libp2p = await createLibp2p({ - addresses: { - listen: listenAddresses, - announceFilter: (multiaddrs: Multiaddr[]) => multiaddrs.slice(1) - }, - transports: [ - memory() - ] - }) - - const listenAddrs = getComponent(libp2p, 'addressManager').getListenAddrs().map((ma) => ma.toString()) - expect(listenAddrs).to.have.lengthOf(listenAddresses.length) - expect(listenAddrs).to.deep.equal(listenAddresses) - - await libp2p.start() - - const addresses = getComponent(libp2p, 'addressManager').getAddresses() - expect(addresses).to.have.lengthOf(1) - }) - - it('should filter announce addresses filtered by the announce filter', async () => { - libp2p = await createLibp2p({ - addresses: { - listen: listenAddresses, - announce: announceAddresses, - announceFilter: () => [] - }, - transports: [ - memory() - ] - }) - - const listenAddrs = getComponent(libp2p, 'addressManager').getListenAddrs().map((ma) => ma.toString()) - expect(listenAddrs).to.have.lengthOf(listenAddresses.length) - expect(listenAddrs).to.deep.equal(listenAddresses) - - const addresses = getComponent(libp2p, 'addressManager').getAddresses() - expect(addresses).to.have.lengthOf(0) - }) - - it('should include observed addresses in returned multiaddrs', async () => { - libp2p = await createLibp2p({ - start: false, - addresses: { - listen: listenAddresses - }, - transports: [ - memory() - ] - }) - - const ma = '/ip4/83.32.123.53/tcp/43928' - - await libp2p.start() - - const addressManager = getComponent(libp2p, 'addressManager') - - expect(addressManager.getAddresses()).to.have.lengthOf(listenAddresses.length) - - addressManager.confirmObservedAddr(multiaddr(ma)) - - expect(addressManager.getAddresses()).to.have.lengthOf(listenAddresses.length + 1) - expect(addressManager.getAddresses().map(ma => ma.decapsulateCode(protocols('p2p').code).toString())).to.include(ma) - }) - - it('should populate the AddressManager from the config', async () => { - libp2p = await createLibp2p({ - start: false, - addresses: { - listen: listenAddresses, - announce: announceAddresses - }, - transports: [ - memory() - ] - }) - - expect(libp2p.getMultiaddrs().map(ma => ma.decapsulateCode(protocols('p2p').code).toString())).to.have.members(announceAddresses) - expect(libp2p.getMultiaddrs().map(ma => ma.decapsulateCode(protocols('p2p').code).toString())).to.not.have.members(listenAddresses) - }) - - it('should update our peer record with announce addresses on startup', async () => { - libp2p = await createLibp2p({ - start: false, - addresses: { - listen: listenAddresses, - announce: announceAddresses - }, - transports: [ - memory() - ] - }) - - const eventPromise = pEvent<'self:peer:update', CustomEvent>(libp2p, 'self:peer:update', { - filter: (event) => { - return event.detail.peer.addresses.map(({ multiaddr }) => multiaddr.toString()) - .includes(announceAddresses[0]) - } - }) - - await libp2p.start() - - const event = await eventPromise - - expect(event.detail.peer.addresses.map(({ multiaddr }) => multiaddr.toString())) - .to.include.members(announceAddresses, 'peer info did not include announce addresses') - }) - - it('should only include confirmed observed addresses in peer record', async () => { - libp2p = await createLibp2p({ - start: false, - addresses: { - listen: listenAddresses, - announce: announceAddresses - }, - transports: [ - memory() - ] - }) - - await libp2p.start() - - const eventPromise = pEvent<'self:peer:update', CustomEvent>(libp2p, 'self:peer:update') - - const unconfirmedAddress = multiaddr('/ip4/127.0.0.1/tcp/4010/ws') - getComponent(libp2p, 'addressManager').addObservedAddr(unconfirmedAddress) - - const confirmedAddress = multiaddr('/ip4/127.0.0.1/tcp/4011/ws') - getComponent(libp2p, 'addressManager').confirmObservedAddr(confirmedAddress) - - const event = await eventPromise - - expect(event.detail.peer.addresses.map(({ multiaddr }) => multiaddr.toString())) - .to.not.include(unconfirmedAddress.toString(), 'peer info included unconfirmed observed address') - - expect(event.detail.peer.addresses.map(({ multiaddr }) => multiaddr.toString())) - .to.include(confirmedAddress.toString(), 'peer info did not include confirmed observed address') - }) -}) diff --git a/packages/libp2p/test/connection-manager/connection-gater.spec.ts b/packages/libp2p/test/connection-manager/connection-gater.spec.ts index 91f9b8a67a..56c0a3688a 100644 --- a/packages/libp2p/test/connection-manager/connection-gater.spec.ts +++ b/packages/libp2p/test/connection-manager/connection-gater.spec.ts @@ -1,160 +1,247 @@ /* eslint-env mocha */ -import { stop } from '@libp2p/interface' +import { generateKeyPair } from '@libp2p/crypto/keys' +import { start, stop } from '@libp2p/interface' +import { logger } from '@libp2p/logger' +import { peerIdFromPrivateKey } from '@libp2p/peer-id' +import { multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' -import sinon from 'sinon' -import { createPeers } from '../fixtures/create-peers.js' -import type { Echo } from '@libp2p/echo' -import type { Libp2p } from '@libp2p/interface' +import Sinon from 'sinon' +import { stubInterface } from 'sinon-ts' +import { DefaultConnectionManager } from '../../src/connection-manager/index.js' +import { DefaultUpgrader } from '../../src/upgrader.js' +import { createDefaultUpgraderComponents } from '../upgrading/utils.js' +import { createDefaultConnectionManagerComponents } from './utils.js' +import type { Transport, MultiaddrConnection, StreamMuxerFactory } from '@libp2p/interface' +import type { TransportManager } from '@libp2p/interface-internal' describe('connection-gater', () => { - let dialer: Libp2p<{ echo: Echo }> - let listener: Libp2p<{ echo: Echo }> + let connectionManager: DefaultConnectionManager afterEach(async () => { - await stop(dialer, listener) + await stop(connectionManager) }) it('intercept peer dial', async () => { - const denyDialPeer = sinon.stub().returns(true) + const denyDialPeer = Sinon.stub().returns(true) + const remotePeer = peerIdFromPrivateKey(await generateKeyPair('Ed25519')) + const ma = multiaddr(`/ip4/123.123.123.123/tcp/1234/p2p/${remotePeer}`) - ;({ dialer, listener } = await createPeers({ + connectionManager = new DefaultConnectionManager(await createDefaultConnectionManagerComponents({ connectionGater: { denyDialPeer } })) + await start(connectionManager) - await expect(dialer.dial(listener.getMultiaddrs())) + await expect(connectionManager.openConnection(ma)) .to.eventually.be.rejected().with.property('name', 'DialDeniedError') + + expect(denyDialPeer.called).to.be.true() }) it('intercept addr dial', async () => { - const denyDialMultiaddr = sinon.stub().returns(false) + const denyDialMultiaddr = Sinon.stub().returns(true) + const ma = multiaddr('/ip4/123.123.123.123/tcp/1234') - ;({ dialer, listener } = await createPeers({ + connectionManager = new DefaultConnectionManager(await createDefaultConnectionManagerComponents({ connectionGater: { denyDialMultiaddr - } + }, + transportManager: stubInterface({ + dialTransportForMultiaddr: () => stubInterface() + }) })) + await start(connectionManager) - await dialer.dial(listener.getMultiaddrs()) + await expect(connectionManager.openConnection(ma)) + .to.eventually.be.rejected().with.property('name', 'DialDeniedError') - for (const multiaddr of listener.getMultiaddrs()) { - expect(denyDialMultiaddr.calledWith(multiaddr)).to.be.true() - } + expect(denyDialMultiaddr.called).to.be.true() }) - it('intercept multiaddr store', async () => { - const filterMultiaddrForPeer = sinon.stub().returns(true) + it('intercept accept inbound connection', async () => { + const denyInboundConnection = Sinon.stub().returns(true) - ;({ dialer, listener } = await createPeers({ + const upgrader = new DefaultUpgrader(await createDefaultUpgraderComponents({ connectionGater: { - filterMultiaddrForPeer + denyInboundConnection } - })) - - const fullMultiaddr = listener.getMultiaddrs()[0] - - await dialer.peerStore.merge(listener.peerId, { - multiaddrs: [fullMultiaddr] + }), { + connectionEncrypters: [], + streamMuxers: [] }) - expect(filterMultiaddrForPeer.callCount).to.equal(1) + const remotePeer = peerIdFromPrivateKey(await generateKeyPair('Ed25519')) + const remoteAddr = multiaddr(`/ip4/123.123.123.123/tcp/1234/p2p/${remotePeer}`) - const args = filterMultiaddrForPeer.getCall(0).args - expect(args[0].toString()).to.equal(listener.peerId.toString()) - expect(args[1].toString()).to.equal(fullMultiaddr.toString()) - }) - - it('intercept accept inbound connection', async () => { - const denyInboundConnection = sinon.stub().returns(false) + const maConn = stubInterface({ + remoteAddr + }) - ;({ dialer, listener } = await createPeers({}, { - connectionGater: { - denyInboundConnection - } + await expect(upgrader.upgradeInbound(maConn, { + skipEncryption: true, + skipProtection: true, + muxerFactory: stubInterface() })) - - await dialer.dial(listener.getMultiaddrs()) + .to.eventually.be.rejected().with.property('name', 'ConnectionInterceptedError') expect(denyInboundConnection.called).to.be.true() }) it('intercept accept outbound connection', async () => { - const denyOutboundConnection = sinon.stub().returns(false) + const denyOutboundConnection = Sinon.stub().returns(true) - ;({ dialer, listener } = await createPeers({ + const upgrader = new DefaultUpgrader(await createDefaultUpgraderComponents({ connectionGater: { denyOutboundConnection } - })) + }), { + connectionEncrypters: [], + streamMuxers: [] + }) + + const remotePeer = peerIdFromPrivateKey(await generateKeyPair('Ed25519')) + const remoteAddr = multiaddr(`/ip4/123.123.123.123/tcp/1234/p2p/${remotePeer}`) - await dialer.dial(listener.getMultiaddrs()) + const maConn = stubInterface({ + remoteAddr + }) - expect(denyOutboundConnection.called).to.be.true() + await expect(upgrader.upgradeOutbound(maConn, { + skipEncryption: true, + skipProtection: true, + muxerFactory: stubInterface() + })) + .to.eventually.be.rejected().with.property('name', 'ConnectionInterceptedError') }) it('intercept inbound encrypted', async () => { - const denyInboundEncryptedConnection = sinon.stub().returns(false) + const denyInboundEncryptedConnection = Sinon.stub().returns(true) + const remotePeer = peerIdFromPrivateKey(await generateKeyPair('Ed25519')) + const remoteAddr = multiaddr(`/ip4/123.123.123.123/tcp/1234/p2p/${remotePeer}`) - ;({ dialer, listener } = await createPeers({}, { + const upgrader = new DefaultUpgrader(await createDefaultUpgraderComponents({ connectionGater: { denyInboundEncryptedConnection } - })) + }), { + connectionEncrypters: [], + streamMuxers: [] + }) + upgrader._encryptInbound = async (maConn) => { + return { + conn: maConn, + remotePeer, + protocol: '/test-encrypter' + } + } + + const maConn = stubInterface({ + remoteAddr, + log: logger('test') + }) - await dialer.dial(listener.getMultiaddrs()) + await expect(upgrader.upgradeInbound(maConn, { + skipProtection: true, + muxerFactory: stubInterface() + })) + .to.eventually.be.rejected().with.property('name', 'ConnectionInterceptedError') expect(denyInboundEncryptedConnection.called).to.be.true() - expect(denyInboundEncryptedConnection.getCall(0).args[0].toMultihash().bytes).to.equalBytes(dialer.peerId.toMultihash().bytes) }) it('intercept outbound encrypted', async () => { - const denyOutboundEncryptedConnection = sinon.stub().returns(false) + const denyOutboundEncryptedConnection = Sinon.stub().returns(true) + + const remotePeer = peerIdFromPrivateKey(await generateKeyPair('Ed25519')) + const remoteAddr = multiaddr(`/ip4/123.123.123.123/tcp/1234/p2p/${remotePeer}`) - ;({ dialer, listener } = await createPeers({ + const upgrader = new DefaultUpgrader(await createDefaultUpgraderComponents({ connectionGater: { denyOutboundEncryptedConnection } - })) + }), { + connectionEncrypters: [], + streamMuxers: [] + }) + upgrader._encryptOutbound = async (maConn) => { + return { + conn: maConn, + remotePeer, + protocol: '/test-encrypter' + } + } - await dialer.dial(listener.getMultiaddrs()) + const maConn = stubInterface({ + remoteAddr, + log: logger('test') + }) + + await expect(upgrader.upgradeOutbound(maConn, { + skipProtection: true, + muxerFactory: stubInterface() + })) + .to.eventually.be.rejected().with.property('name', 'ConnectionInterceptedError') expect(denyOutboundEncryptedConnection.called).to.be.true() - expect(denyOutboundEncryptedConnection.getCall(0).args[0].toMultihash().bytes).to.equalBytes(listener.peerId.toMultihash().bytes) }) it('intercept inbound upgraded', async () => { - const denyInboundUpgradedConnection = sinon.stub().returns(false) + const denyInboundUpgradedConnection = Sinon.stub().returns(true) + const remotePeer = peerIdFromPrivateKey(await generateKeyPair('Ed25519')) + const remoteAddr = multiaddr(`/ip4/123.123.123.123/tcp/1234/p2p/${remotePeer}`) - ;({ dialer, listener } = await createPeers({}, { + const upgrader = new DefaultUpgrader(await createDefaultUpgraderComponents({ connectionGater: { denyInboundUpgradedConnection } - })) + }), { + connectionEncrypters: [], + streamMuxers: [] + }) - const input = Uint8Array.from([0]) - const output = await dialer.services.echo.echo(listener.getMultiaddrs(), input) - expect(output).to.equalBytes(input) + const maConn = stubInterface({ + remoteAddr, + log: logger('test') + }) + + await expect(upgrader.upgradeInbound(maConn, { + skipEncryption: true, + skipProtection: true, + muxerFactory: stubInterface() + })) + .to.eventually.be.rejected().with.property('name', 'ConnectionInterceptedError') expect(denyInboundUpgradedConnection.called).to.be.true() - expect(denyInboundUpgradedConnection.getCall(0).args[0].toMultihash().bytes).to.equalBytes(dialer.peerId.toMultihash().bytes) }) it('intercept outbound upgraded', async () => { - const denyOutboundUpgradedConnection = sinon.stub().returns(false) + const denyOutboundUpgradedConnection = Sinon.stub().returns(true) + const remotePeer = peerIdFromPrivateKey(await generateKeyPair('Ed25519')) + const remoteAddr = multiaddr(`/ip4/123.123.123.123/tcp/1234/p2p/${remotePeer}`) - ;({ dialer, listener } = await createPeers({ + const upgrader = new DefaultUpgrader(await createDefaultUpgraderComponents({ connectionGater: { denyOutboundUpgradedConnection } - })) + }), { + connectionEncrypters: [], + streamMuxers: [] + }) - const input = Uint8Array.from([0]) - const output = await dialer.services.echo.echo(listener.getMultiaddrs(), input) - expect(output).to.equalBytes(input) + const maConn = stubInterface({ + remoteAddr, + log: logger('test') + }) + + await expect(upgrader.upgradeOutbound(maConn, { + skipEncryption: true, + skipProtection: true, + muxerFactory: stubInterface() + })) + .to.eventually.be.rejected().with.property('name', 'ConnectionInterceptedError') expect(denyOutboundUpgradedConnection.called).to.be.true() - expect(denyOutboundUpgradedConnection.getCall(0).args[0].toMultihash().bytes).to.equalBytes(listener.peerId.toMultihash().bytes) }) }) diff --git a/packages/libp2p/test/connection-manager/index.spec.ts b/packages/libp2p/test/connection-manager/index.spec.ts index 629c2a9d7a..32556dc624 100644 --- a/packages/libp2p/test/connection-manager/index.spec.ts +++ b/packages/libp2p/test/connection-manager/index.spec.ts @@ -1,49 +1,32 @@ /* eslint-env mocha */ import { generateKeyPair } from '@libp2p/crypto/keys' -import { TypedEventEmitter, KEEP_ALIVE, start, stop } from '@libp2p/interface' -import { defaultLogger } from '@libp2p/logger' +import { KEEP_ALIVE, start, stop } from '@libp2p/interface' import { peerIdFromPrivateKey } from '@libp2p/peer-id' -import { dns } from '@multiformats/dns' import { multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' import pWaitFor from 'p-wait-for' import sinon from 'sinon' import { stubInterface } from 'sinon-ts' -import { defaultComponents } from '../../src/components.js' -import { DefaultConnectionManager, type DefaultConnectionManagerComponents } from '../../src/connection-manager/index.js' +import { DefaultConnectionManager } from '../../src/connection-manager/index.js' import { createLibp2p } from '../../src/index.js' -import { createPeers } from '../fixtures/create-peers.js' import { getComponent } from '../fixtures/get-component.js' -import type { Echo } from '@libp2p/echo' -import type { ConnectionGater, PeerId, PeerStore, Libp2p, Connection, PeerRouting, MultiaddrConnection } from '@libp2p/interface' -import type { TransportManager } from '@libp2p/interface-internal' +import { createDefaultConnectionManagerComponents, type StubbedDefaultConnectionManagerComponents } from './utils.js' +import type { Libp2p, Connection, MultiaddrConnection } from '@libp2p/interface' const defaultOptions = { maxConnections: 10, inboundUpgradeTimeout: 10000 } -function createDefaultComponents (peerId: PeerId): DefaultConnectionManagerComponents { - return { - peerId, - peerStore: stubInterface({ - all: async () => [] - }), - peerRouting: stubInterface(), - transportManager: stubInterface(), - connectionGater: stubInterface(), - events: new TypedEventEmitter(), - logger: defaultLogger() - } -} - describe('Connection Manager', () => { let libp2p: Libp2p let connectionManager: DefaultConnectionManager + let components: StubbedDefaultConnectionManagerComponents beforeEach(async () => { libp2p = await createLibp2p() + components = await createDefaultConnectionManagerComponents() }) afterEach(async () => { @@ -91,7 +74,7 @@ describe('Connection Manager', () => { it('should deny connections from denylist multiaddrs', async () => { const remoteAddr = multiaddr('/ip4/83.13.55.32/tcp/59283') - connectionManager = new DefaultConnectionManager(createDefaultComponents(libp2p.peerId), { + connectionManager = new DefaultConnectionManager(components, { ...defaultOptions, deny: [ '/ip4/83.13.55.32' @@ -108,7 +91,7 @@ describe('Connection Manager', () => { }) it('should deny connections when maxConnections is exceeded', async () => { - connectionManager = new DefaultConnectionManager(createDefaultComponents(libp2p.peerId), { + connectionManager = new DefaultConnectionManager(components, { ...defaultOptions, maxConnections: 1 }) @@ -133,7 +116,7 @@ describe('Connection Manager', () => { }) it('should deny connections from peers that connect too frequently', async () => { - connectionManager = new DefaultConnectionManager(createDefaultComponents(libp2p.peerId), { + connectionManager = new DefaultConnectionManager(components, { ...defaultOptions, inboundConnectionThreshold: 1 }) @@ -160,7 +143,7 @@ describe('Connection Manager', () => { it('should allow connections from allowlist multiaddrs', async () => { const remoteAddr = multiaddr('/ip4/83.13.55.32/tcp/59283') - connectionManager = new DefaultConnectionManager(createDefaultComponents(libp2p.peerId), { + connectionManager = new DefaultConnectionManager(components, { ...defaultOptions, maxConnections: 1, allow: [ @@ -188,7 +171,7 @@ describe('Connection Manager', () => { }) it('should limit the number of inbound pending connections', async () => { - connectionManager = new DefaultConnectionManager(createDefaultComponents(libp2p.peerId), { + connectionManager = new DefaultConnectionManager(components, { ...defaultOptions, maxIncomingPendingConnections: 1 }) @@ -225,7 +208,7 @@ describe('Connection Manager', () => { }) it('should allow dialing peers when an existing limited connection exists', async () => { - connectionManager = new DefaultConnectionManager(createDefaultComponents(libp2p.peerId), { + connectionManager = new DefaultConnectionManager(components, { ...defaultOptions, maxIncomingPendingConnections: 1 }) @@ -262,72 +245,45 @@ describe('Connection Manager', () => { expect(conn).to.equal(newConnection) }) -}) - -describe('Connection Manager', () => { - let peerIds: PeerId[] - - before(async () => { - peerIds = await Promise.all([ - peerIdFromPrivateKey(await generateKeyPair('Ed25519')), - peerIdFromPrivateKey(await generateKeyPair('Ed25519')) - ]) - }) it('should filter connections on disconnect, removing the closed one', async () => { - const peerStore = stubInterface() - const components = defaultComponents({ - peerId: peerIds[0], - peerStore, - transportManager: stubInterface(), - connectionGater: stubInterface(), - events: new TypedEventEmitter() - }) - const connectionManager = new DefaultConnectionManager(components, { + connectionManager = new DefaultConnectionManager(components, { maxConnections: 1000, inboundUpgradeTimeout: 1000 }) await start(connectionManager) + const remotePeer = peerIdFromPrivateKey(await generateKeyPair('Ed25519')) + const conn1 = stubInterface({ remoteAddr: multiaddr('/ip4/34.4.63.125/tcp/4001'), - remotePeer: peerIds[1], + remotePeer, status: 'open' }) const conn2 = stubInterface({ remoteAddr: multiaddr('/ip4/34.4.63.126/tcp/4001'), - remotePeer: peerIds[1], + remotePeer, status: 'open' }) - expect(connectionManager.getConnections(peerIds[1])).to.have.lengthOf(0) + expect(connectionManager.getConnections(remotePeer)).to.have.lengthOf(0) // Add connection to the connectionManager components.events.safeDispatchEvent('connection:open', { detail: conn1 }) components.events.safeDispatchEvent('connection:open', { detail: conn2 }) - expect(connectionManager.getConnections(peerIds[1])).to.have.lengthOf(2) + expect(connectionManager.getConnections(remotePeer)).to.have.lengthOf(2) conn2.status = 'closed' components.events.safeDispatchEvent('connection:close', { detail: conn2 }) - expect(connectionManager.getConnections(peerIds[1])).to.have.lengthOf(1) + expect(connectionManager.getConnections(remotePeer)).to.have.lengthOf(1) expect(conn1.close.called).to.be.false() - - await connectionManager.stop() }) it('should close connections on stop', async () => { - const peerStore = stubInterface() - const components = defaultComponents({ - peerId: peerIds[0], - peerStore, - transportManager: stubInterface(), - connectionGater: stubInterface(), - events: new TypedEventEmitter() - }) const connectionManager = new DefaultConnectionManager(components, { maxConnections: 1000, inboundUpgradeTimeout: 1000 @@ -335,14 +291,16 @@ describe('Connection Manager', () => { await start(connectionManager) + const remotePeer = peerIdFromPrivateKey(await generateKeyPair('Ed25519')) + const conn1 = stubInterface({ remoteAddr: multiaddr('/ip4/34.4.63.125/tcp/4001'), - remotePeer: peerIds[1], + remotePeer, status: 'open' }) const conn2 = stubInterface({ remoteAddr: multiaddr('/ip4/34.4.63.126/tcp/4001'), - remotePeer: peerIds[1], + remotePeer, status: 'open' }) @@ -350,77 +308,13 @@ describe('Connection Manager', () => { components.events.safeDispatchEvent('connection:open', { detail: conn1 }) components.events.safeDispatchEvent('connection:open', { detail: conn2 }) - expect(connectionManager.getConnections(peerIds[1])).to.have.lengthOf(2) + expect(connectionManager.getConnections(remotePeer)).to.have.lengthOf(2) await connectionManager.stop() expect(conn1.close.called).to.be.true() expect(conn2.close.called).to.be.true() - expect(connectionManager.getConnections(peerIds[1])).to.have.lengthOf(0) - }) -}) - -describe('libp2p.connections', () => { - let dialer: Libp2p<{ echo: Echo }> - let listener: Libp2p<{ echo: Echo }> - - afterEach(async () => { - await stop(dialer, listener) - }) - - it('libp2p.getConnections gets the connectionManager conns', async () => { - ({ dialer, listener } = await createPeers()) - - const conn = await dialer.dial(listener.getMultiaddrs()) - - expect(conn).to.be.ok() - expect(dialer.getConnections()).to.have.lengthOf(1) - }) - - it('should be closed status after stopping', async () => { - ({ dialer, listener } = await createPeers()) - - const conn = await dialer.dial(listener.getMultiaddrs()) - - await dialer.stop() - expect(conn.status).to.eql('closed') - }) - - it('should open multiple connections when forced', async () => { - ({ dialer, listener } = await createPeers()) - - // connect once, should have one connection - await dialer.dial(listener.getMultiaddrs()) - expect(dialer.getConnections()).to.have.lengthOf(1) - - // connect twice, should still only have one connection - await dialer.dial(listener.getMultiaddrs()) - expect(dialer.getConnections()).to.have.lengthOf(1) - - // force connection, should have two connections now - await dialer.dial(listener.getMultiaddrs(), { - force: true - }) - expect(dialer.getConnections()).to.have.lengthOf(2) - }) - - it('should use custom DNS resolver', async () => { - const resolver = sinon.stub() - - ;({ dialer, listener } = await createPeers({ - dns: dns({ - resolvers: { - '.': resolver - } - }) - })) - - const ma = multiaddr('/dnsaddr/example.com/tcp/12345') - const err = new Error('Could not resolve') - - resolver.withArgs('_dnsaddr.example.com').rejects(err) - - await expect(dialer.dial(ma)).to.eventually.be.rejectedWith(err) + expect(connectionManager.getConnections(remotePeer)).to.have.lengthOf(0) }) }) diff --git a/packages/libp2p/test/connection-manager/resolver.spec.ts b/packages/libp2p/test/connection-manager/resolver.spec.ts deleted file mode 100644 index a03d2df2ae..0000000000 --- a/packages/libp2p/test/connection-manager/resolver.spec.ts +++ /dev/null @@ -1,92 +0,0 @@ -/* eslint-env mocha */ - -import { yamux } from '@chainsafe/libp2p-yamux' -import { stop } from '@libp2p/interface' -import { memory } from '@libp2p/memory' -import { mplex } from '@libp2p/mplex' -import { plaintext } from '@libp2p/plaintext' -import { multiaddr } from '@multiformats/multiaddr' -import { expect } from 'aegir/chai' -import sinon from 'sinon' -import { createLibp2p } from '../../src/index.js' -import type { Libp2p } from '@libp2p/interface' -import type { Multiaddr } from '@multiformats/multiaddr' - -describe('resolver', () => { - let dialer: Libp2p - let listener: Libp2p - let resolver: sinon.SinonStub<[Multiaddr], Promise> - - beforeEach(async () => { - resolver = sinon.stub<[Multiaddr], Promise>(); - - [dialer, listener] = await Promise.all([ - createLibp2p({ - transports: [ - memory() - ], - streamMuxers: [ - yamux(), - mplex() - ], - connectionManager: { - resolvers: { - dnsaddr: resolver - } - }, - connectionEncrypters: [ - plaintext() - ] - }), - createLibp2p({ - addresses: { - listen: ['/memory/location'] - }, - transports: [ - memory() - ], - streamMuxers: [ - yamux(), - mplex() - ], - connectionManager: { - resolvers: { - dnsaddr: resolver - } - }, - connectionEncrypters: [ - plaintext() - ] - }) - ]) - }) - - afterEach(async () => { - sinon.restore() - - await stop(dialer, listener) - }) - - it('should use the dnsaddr resolver to resolve a dnsaddr address', async () => { - const dialAddr = multiaddr(`/dnsaddr/remote.libp2p.io/p2p/${listener.peerId}`) - - // resolver stub - resolver.withArgs(dialAddr).resolves(listener.getMultiaddrs().map(ma => ma.toString())) - - // dial with resolved address - const connection = await dialer.dial(dialAddr) - expect(connection).to.exist() - expect(connection.remoteAddr.equals(listener.getMultiaddrs()[0])) - }) - - it('fails to dial if resolve fails and there are no addresses to dial', async () => { - const dialAddr = multiaddr(`/dnsaddr/remote.libp2p.io/p2p/${listener.peerId}`) - const err = new Error() - - // Stub resolver - resolver.rejects(err) - - await expect(dialer.dial(dialAddr)) - .to.eventually.be.rejectedWith(err) - }) -}) diff --git a/packages/libp2p/test/connection-manager/utils.ts b/packages/libp2p/test/connection-manager/utils.ts new file mode 100644 index 0000000000..838b97ee90 --- /dev/null +++ b/packages/libp2p/test/connection-manager/utils.ts @@ -0,0 +1,35 @@ +/* eslint-env mocha */ + +import { generateKeyPair } from '@libp2p/crypto/keys' +import { TypedEventEmitter } from '@libp2p/interface' +import { defaultLogger } from '@libp2p/logger' +import { peerIdFromPrivateKey } from '@libp2p/peer-id' +import { stubInterface, type StubbedInstance } from 'sinon-ts' +import type { DefaultConnectionManagerComponents } from '../../src/connection-manager/index.js' +import type { ConnectionGater, PeerId, PeerStore, PeerRouting, TypedEventTarget, Libp2pEvents, ComponentLogger } from '@libp2p/interface' +import type { TransportManager } from '@libp2p/interface-internal' + +export interface StubbedDefaultConnectionManagerComponents { + peerId: PeerId + peerStore: StubbedInstance + peerRouting: StubbedInstance + transportManager: StubbedInstance + connectionGater: StubbedInstance + events: TypedEventTarget + logger: ComponentLogger +} + +export async function createDefaultConnectionManagerComponents (options?: Partial): Promise { + return { + peerId: peerIdFromPrivateKey(await generateKeyPair('Ed25519')), + peerStore: stubInterface({ + all: async () => [] + }), + peerRouting: stubInterface(), + transportManager: stubInterface(), + connectionGater: stubInterface(), + events: new TypedEventEmitter(), + logger: defaultLogger(), + ...options + } as unknown as any +} diff --git a/packages/libp2p/test/core/core.spec.ts b/packages/libp2p/test/core/core.spec.ts index 1674efcc45..0e8ab1a1ff 100644 --- a/packages/libp2p/test/core/core.spec.ts +++ b/packages/libp2p/test/core/core.spec.ts @@ -1,6 +1,5 @@ /* eslint-env mocha */ -import { memory } from '@libp2p/memory' import { multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' import { stubInterface } from 'sinon-ts' @@ -28,18 +27,6 @@ describe('core', () => { await expect(libp2p.isDialable(ma)).to.eventually.be.false() }) - it('should say an address is dialable if a transport is configured', async () => { - libp2p = await createLibp2p({ - transports: [ - memory() - ] - }) - - const ma = multiaddr('/memory/address-1') - - await expect(libp2p.isDialable(ma)).to.eventually.be.true() - }) - it('should test if a protocol can run over a limited connection', async () => { libp2p = await createLibp2p({ transports: [ diff --git a/packages/libp2p/test/core/listening.spec.ts b/packages/libp2p/test/core/listening.spec.ts deleted file mode 100644 index e7976d7499..0000000000 --- a/packages/libp2p/test/core/listening.spec.ts +++ /dev/null @@ -1,44 +0,0 @@ -/* eslint-env mocha */ - -import { stop } from '@libp2p/interface' -import { memory } from '@libp2p/memory' -import { plaintext } from '@libp2p/plaintext' -import { expect } from 'aegir/chai' -import { createLibp2p } from '../../src/index.js' -import type { Libp2p } from '@libp2p/interface' - -describe('Listening', () => { - let libp2p: Libp2p - - after(async () => { - await stop(libp2p) - }) - - it('should replace wildcard host and port with actual host and port on startup', async () => { - const listenAddress = '/memory/address-1' - - libp2p = await createLibp2p({ - addresses: { - listen: [ - listenAddress - ] - }, - transports: [ - memory() - ], - connectionEncrypters: [ - plaintext() - ] - }) - - await libp2p.start() - - // @ts-expect-error components field is private - const addrs = libp2p.components.transportManager.getAddrs() - - // Should get something like: - // /memory/address-1 - expect(addrs).to.have.lengthOf(1) - expect(addrs[0].toString()).to.equal(listenAddress) - }) -}) diff --git a/packages/libp2p/test/fixtures/create-peers.ts b/packages/libp2p/test/fixtures/create-peers.ts deleted file mode 100644 index aecf65dd11..0000000000 --- a/packages/libp2p/test/fixtures/create-peers.ts +++ /dev/null @@ -1,68 +0,0 @@ -/* eslint-env mocha */ - -import { echo } from '@libp2p/echo' -import { memory } from '@libp2p/memory' -import { mplex } from '@libp2p/mplex' -import { plaintext } from '@libp2p/plaintext' -import { stubInterface } from 'sinon-ts' -import { createLibp2p } from '../../src/index.js' -import type { Components } from '../../src/components.js' -import type { Libp2pOptions } from '../../src/index.js' -import type { Echo } from '@libp2p/echo' -import type { Libp2p } from '@libp2p/interface' - -async function createNode (config: Partial> = {}): Promise<{ node: Libp2p<{ echo: Echo }>, components: Components }> { - let components: Components = stubInterface() - - const node = await createLibp2p({ - transports: [ - memory() - ], - connectionEncrypters: [ - plaintext() - ], - streamMuxers: [ - mplex() - ], - ...config, - services: { - echo: echo(), - components: (c) => { - components = c - } - } - }) - - return { - node, - components - } -} - -interface DialerAndListener { - dialer: Libp2p<{ echo: Echo }> - dialerComponents: Components - - listener: Libp2p<{ echo: Echo }> - listenerComponents: Components -} - -export async function createPeers (dialerConfig: Partial> = {}, listenerConfig: Partial> = {}): Promise { - const { node: dialer, components: dialerComponents } = await createNode(dialerConfig) - const { node: listener, components: listenerComponents } = await createNode({ - ...listenerConfig, - addresses: { - listen: [ - '/memory/address-1' - ] - } - }) - - return { - dialer, - dialerComponents, - - listener, - listenerComponents - } -} diff --git a/packages/libp2p/test/fixtures/slow-muxer.ts b/packages/libp2p/test/fixtures/slow-muxer.ts deleted file mode 100644 index 03279ca32e..0000000000 --- a/packages/libp2p/test/fixtures/slow-muxer.ts +++ /dev/null @@ -1,29 +0,0 @@ -/* eslint-env mocha */ - -import { mplex } from '@libp2p/mplex' -import delay from 'delay' -import map from 'it-map' -import type { Components } from '../../src/components.js' -import type { StreamMuxerFactory } from '@libp2p/interface' - -/** - * Creates a muxer with a delay between each sent packet - */ -export function slowMuxer (packetDelay: number): ((components: Components) => StreamMuxerFactory) { - return (components) => { - const muxerFactory = mplex()(components) - const originalCreateStreamMuxer = muxerFactory.createStreamMuxer.bind(muxerFactory) - - muxerFactory.createStreamMuxer = (init) => { - const muxer = originalCreateStreamMuxer(init) - muxer.source = map(muxer.source, async (buf) => { - await delay(packetDelay) - return buf - }) - - return muxer - } - - return muxerFactory - } -} diff --git a/packages/libp2p/test/transports/transport-manager.spec.ts b/packages/libp2p/test/transports/transport-manager.spec.ts index 277502bb35..51884a583a 100644 --- a/packages/libp2p/test/transports/transport-manager.spec.ts +++ b/packages/libp2p/test/transports/transport-manager.spec.ts @@ -3,11 +3,9 @@ import { generateKeyPair } from '@libp2p/crypto/keys' import { TypedEventEmitter, start, stop, FaultTolerance } from '@libp2p/interface' import { defaultLogger } from '@libp2p/logger' -import { memory } from '@libp2p/memory' import { peerIdFromPrivateKey } from '@libp2p/peer-id' import { persistentPeerStore } from '@libp2p/peer-store' -import { plaintext } from '@libp2p/plaintext' -import { multiaddr } from '@multiformats/multiaddr' +import { multiaddr, type Multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' import { MemoryDatastore } from 'datastore-core' import { pEvent } from 'p-event' @@ -15,20 +13,21 @@ import pWaitFor from 'p-wait-for' import sinon from 'sinon' import { stubInterface } from 'sinon-ts' import { DefaultAddressManager } from '../../src/address-manager/index.js' -import { createLibp2p } from '../../src/index.js' import { DefaultTransportManager } from '../../src/transport-manager.js' import type { Components } from '../../src/components.js' -import type { Connection, Libp2p, Upgrader } from '@libp2p/interface' +import type { Connection, Transport, Upgrader, Listener } from '@libp2p/interface' const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0') const addrs = [ multiaddr('/memory/address-1'), multiaddr('/memory/address-2') ] +const testTransportTag = 'test-transport' describe('Transport Manager', () => { let tm: DefaultTransportManager let components: Components + let transport: Transport beforeEach(async () => { const events = new TypedEventEmitter() @@ -49,6 +48,39 @@ describe('Transport Manager', () => { faultTolerance: FaultTolerance.NO_FATAL }) await start(tm) + + transport = stubInterface({ + dial: async () => stubInterface(), + dialFilter: (addrs) => { + return addrs.filter(ma => ma.toString().startsWith('/memory')) + }, + listenFilter: (addrs) => { + return addrs.filter(ma => ma.toString().startsWith('/memory')) + }, + createListener: () => { + let addr: Multiaddr | undefined + const closeListeners: Array<() => void> = [] + + return stubInterface({ + listen: async (a) => { + addr = a + }, + getAddrs: () => addr != null ? [addr] : [], + close: async () => { + addr = undefined + closeListeners.forEach(fn => { + fn() + }) + }, + addEventListener: (event, handler: any) => { + if (event === 'close') { + closeListeners.push(handler) + } + } + }) + } + }) + transport[Symbol.toStringTag] = testTransportTag }) afterEach(async () => { @@ -58,28 +90,26 @@ describe('Transport Manager', () => { }) it('should be able to add and remove a transport', async () => { - const transport = memory() - expect(tm.getTransports()).to.have.lengthOf(0) - tm.add(transport(components)) + tm.add(transport) expect(tm.getTransports()).to.have.lengthOf(1) - await tm.remove('@libp2p/memory') + await tm.remove(testTransportTag) expect(tm.getTransports()).to.have.lengthOf(0) }) it('should not be able to add a transport twice', async () => { - tm.add(memory()(components)) + tm.add(transport) expect(() => { - tm.add(memory()(components)) + tm.add(transport) }) .to.throw() .and.to.have.property('name', 'InvalidParametersError') }) it('should fail to dial an unsupported address', async () => { - tm.add(memory()(components)) + tm.add(transport) const addr = multiaddr('/ip4/127.0.0.1/tcp/0') await expect(tm.dial(addr)) .to.eventually.be.rejected() @@ -88,7 +118,7 @@ describe('Transport Manager', () => { it('should fail to listen with no valid address', async () => { tm = new DefaultTransportManager(components) - tm.add(memory()(components)) + tm.add(transport) await expect(start(tm)) .to.eventually.be.rejected() @@ -99,15 +129,13 @@ describe('Transport Manager', () => { it('should be able to add and remove a transport', async () => { expect(tm.getTransports()).to.have.lengthOf(0) - tm.add(memory()(components)) + tm.add(transport) expect(tm.getTransports()).to.have.lengthOf(1) - await tm.remove('@libp2p/memory') + await tm.remove(testTransportTag) expect(tm.getTransports()).to.have.lengthOf(0) }) it('should be able to listen', async () => { - const transport = memory()(components) - expect(tm.getTransports()).to.be.empty() tm.add(transport) @@ -117,14 +145,13 @@ describe('Transport Manager', () => { const spyListener = sinon.spy(transport, 'createListener') await tm.listen(addrs) - // Ephemeral ip addresses may result in multiple listeners expect(tm.getAddrs().length).to.equal(addrs.length) await tm.stop() expect(spyListener.called).to.be.true() }) it('should be able to dial', async () => { - tm.add(memory()(components)) + tm.add(transport) await tm.listen(addrs) const addr = tm.getAddrs().shift() @@ -138,7 +165,6 @@ describe('Transport Manager', () => { }) it('should remove listeners when they stop listening', async () => { - const transport = memory()(components) tm.add(transport) expect(tm.getListeners()).to.have.lengthOf(0) @@ -172,65 +198,3 @@ describe('Transport Manager', () => { await tm.stop() }) }) - -describe('libp2p.transportManager (dial only)', () => { - let libp2p: Libp2p - - afterEach(async () => { - await stop(libp2p) - }) - - it('fails to start if multiaddr fails to listen', async () => { - libp2p = await createLibp2p({ - addresses: { - listen: ['/ip4/127.0.0.1/tcp/0'] - }, - transports: [memory()], - connectionEncrypters: [plaintext()], - start: false - }) - - await expect(libp2p.start()).to.eventually.be.rejected - .with.property('name', 'NoValidAddressesError') - }) - - it('does not fail to start if provided listen multiaddr are not compatible to configured transports (when supporting dial only mode)', async () => { - libp2p = await createLibp2p({ - addresses: { - listen: ['/ip4/127.0.0.1/tcp/0'] - }, - transportManager: { - faultTolerance: FaultTolerance.NO_FATAL - }, - transports: [ - memory() - ], - connectionEncrypters: [ - plaintext() - ], - start: false - }) - - await expect(libp2p.start()).to.eventually.be.undefined() - }) - - it('does not fail to start if provided listen multiaddr fail to listen on configured transports (when supporting dial only mode)', async () => { - libp2p = await createLibp2p({ - addresses: { - listen: ['/ip4/127.0.0.1/tcp/12345/p2p/QmWDn2LY8nannvSWJzruUYoLZ4vV83vfCBwd8DipvdgQc3/p2p-circuit'] - }, - transportManager: { - faultTolerance: FaultTolerance.NO_FATAL - }, - transports: [ - memory() - ], - connectionEncrypters: [ - plaintext() - ], - start: false - }) - - await expect(libp2p.start()).to.eventually.be.undefined() - }) -}) diff --git a/packages/libp2p/test/upgrading/upgrader.spec.ts b/packages/libp2p/test/upgrading/upgrader.spec.ts index b2aae00528..47c0002017 100644 --- a/packages/libp2p/test/upgrading/upgrader.spec.ts +++ b/packages/libp2p/test/upgrading/upgrader.spec.ts @@ -1,416 +1,590 @@ /* eslint-env mocha */ -import { stop } from '@libp2p/interface' -import { memory } from '@libp2p/memory' +import { generateKeyPair } from '@libp2p/crypto/keys' +import { logger } from '@libp2p/logger' +import { peerIdFromPrivateKey } from '@libp2p/peer-id' +import { multiaddr, type Multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' import delay from 'delay' import drain from 'it-drain' -import pDefer from 'p-defer' +import { encode } from 'it-length-prefixed' +import map from 'it-map' import Sinon from 'sinon' import { stubInterface } from 'sinon-ts' -import { createPeers } from '../fixtures/create-peers.js' -import { slowMuxer } from '../fixtures/slow-muxer.js' -import type { Components } from '../../src/components.js' -import type { Echo } from '@libp2p/echo' -import type { Libp2p, ConnectionProtector, ConnectionEncrypter, SecuredConnection, StreamMuxerFactory } from '@libp2p/interface' +import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' +import { DefaultUpgrader, type UpgraderInit } from '../../src/upgrader.js' +import { createDefaultUpgraderComponents } from './utils.js' +import type { ConnectionEncrypter, StreamMuxerFactory, MultiaddrConnection, StreamMuxer, ConnectionProtector, PeerId, SecuredConnection, Stream, StreamMuxerInit } from '@libp2p/interface' +import type { ConnectionManager, Registrar } from '@libp2p/interface-internal' describe('upgrader', () => { - let dialer: Libp2p<{ echo: Echo }> - let listener: Libp2p<{ echo: Echo }> - let dialerComponents: Components - let listenerComponents: Components + let init: UpgraderInit + const encrypterProtocol = '/test-encrypter' + const muxerProtocol = '/test-muxer' + let remotePeer: PeerId + let remoteAddr: Multiaddr + let maConn: MultiaddrConnection + + class BoomCrypto implements ConnectionEncrypter { + static protocol = encrypterProtocol + public protocol = encrypterProtocol + async secureInbound (): Promise { throw new Error('Boom') } + async secureOutbound (): Promise { throw new Error('Boom') } + } + + beforeEach(async () => { + remotePeer = peerIdFromPrivateKey(await generateKeyPair('Ed25519')) + remoteAddr = multiaddr(`/ip4/123.123.123.123/tcp/1234/p2p/${remotePeer}`) + + init = { + connectionEncrypters: [ + stubInterface({ + protocol: encrypterProtocol, + secureOutbound: async (connection) => ({ + conn: connection, + remotePeer + }), + secureInbound: async (connection) => ({ + conn: connection, + remotePeer + }) + }) + ], + streamMuxers: [ + stubInterface({ + protocol: muxerProtocol, + createStreamMuxer: () => stubInterface({ + protocol: muxerProtocol, + sink: async (source) => drain(source), + source: (async function * () {})() + }) + }) + ] + } - afterEach(async () => { - await stop(dialer, listener) + maConn = stubInterface({ + remoteAddr, + log: logger('test'), + sink: async (source) => drain(source), + source: map((async function * () { + yield '/multistream/1.0.0\n' + yield `${encrypterProtocol}\n` + yield `${muxerProtocol}\n` + })(), str => encode.single(uint8ArrayFromString(str))) + }) }) - it('should upgrade with valid muxers and crypto', async () => { - ({ dialer, listener } = await createPeers()) - - const input = Uint8Array.from([0, 1, 2, 3, 4]) - const output = await dialer.services.echo.echo(listener.getMultiaddrs(), input) - expect(output).to.equalBytes(input) + it('should upgrade outbound with valid muxers and crypto', async () => { + const upgrader = new DefaultUpgrader(await createDefaultUpgraderComponents(), init) + const conn = await upgrader.upgradeOutbound(maConn) + expect(conn.encryption).to.equal(encrypterProtocol) + expect(conn.multiplexer).to.equal(muxerProtocol) }) - it('should upgrade with only crypto', async () => { - ({ dialer, listener } = await createPeers({ streamMuxers: [] }, { streamMuxers: [] })) + it('should upgrade outbound with only crypto', async () => { + const upgrader = new DefaultUpgrader(await createDefaultUpgraderComponents(), { + ...init, + streamMuxers: [] + }) - const connection = await dialer.dial(listener.getMultiaddrs()) + const connection = await upgrader.upgradeOutbound(maConn) await expect(connection.newStream('/echo/1.0.0')).to.eventually.be.rejected .with.property('name', 'MuxerUnavailableError') }) - it('should use a private connection protector when provided', async () => { - const protector = stubInterface() - protector.protect.callsFake(async (conn) => conn) - const connectionProtector = (): ConnectionProtector => protector + it('should use a private connection protector when provided for inbound connections', async () => { + const connectionProtector = stubInterface() + connectionProtector.protect.callsFake(async (conn) => conn) - ;({ dialer, listener } = await createPeers({ connectionProtector }, { connectionProtector })) + const upgrader = new DefaultUpgrader(await createDefaultUpgraderComponents({ + connectionProtector + }), init) - const input = Uint8Array.from([0, 1, 2, 3, 4]) - const output = await dialer.services.echo.echo(listener.getMultiaddrs(), input) - expect(output).to.equalBytes(input) + await upgrader.upgradeInbound(maConn) - expect(protector.protect.callCount).to.equal(2) + expect(connectionProtector.protect.callCount).to.equal(1) }) - it('should fail if crypto fails', async () => { - class BoomCrypto implements ConnectionEncrypter { - static protocol = '/unstable' - public protocol = '/unstable' - async secureInbound (): Promise { throw new Error('Boom') } - async secureOutbound (): Promise { throw new Error('Boom') } - } + it('should use a private connection protector when provided for outbound connections', async () => { + const connectionProtector = stubInterface() + connectionProtector.protect.callsFake(async (conn) => conn) + + const upgrader = new DefaultUpgrader(await createDefaultUpgraderComponents({ + connectionProtector + }), init) + + await upgrader.upgradeOutbound(maConn) - ({ dialer, dialerComponents, listener, listenerComponents } = await createPeers({ + expect(connectionProtector.protect.callCount).to.equal(1) + }) + + it('should fail inbound if crypto fails', async () => { + const upgrader = new DefaultUpgrader(await createDefaultUpgraderComponents(), { + ...init, connectionEncrypters: [ - () => new BoomCrypto() + new BoomCrypto() ] - }, { + }) + + await expect(upgrader.upgradeInbound(maConn)).to.eventually.be.rejected + .with.property('name', 'EncryptionFailedError') + }) + + it('should fail outbound if crypto fails', async () => { + const upgrader = new DefaultUpgrader(await createDefaultUpgraderComponents(), { + ...init, connectionEncrypters: [ - () => new BoomCrypto() + new BoomCrypto() ] - })) - - const dialerUpgraderUpgradeOutboundSpy = Sinon.spy(dialerComponents.upgrader, 'upgradeOutbound') - const listenerUpgraderUpgradeInboundSpy = Sinon.spy(listenerComponents.upgrader, 'upgradeInbound') + }) - await expect(dialer.dial(listener.getMultiaddrs())).to.eventually.be.rejected + await expect(upgrader.upgradeOutbound(maConn)).to.eventually.be.rejected .with.property('name', 'EncryptionFailedError') + }) - // Ensure both sides fail - await expect(dialerUpgraderUpgradeOutboundSpy.getCall(0).returnValue).to.eventually.be.rejected - .with.property('name', 'EncryptionFailedError') - await expect(listenerUpgraderUpgradeInboundSpy.getCall(0).returnValue).to.eventually.be.rejected - .with.property('name', 'EncryptionFailedError') + it('should abort if inbound upgrade is slow', async () => { + const upgrader = new DefaultUpgrader(await createDefaultUpgraderComponents(), { + ...init, + inboundUpgradeTimeout: 100 + }) + + maConn.source = map(maConn.source, async (buf) => { + await delay(2000) + return buf + }) + + await expect(upgrader.upgradeOutbound(maConn)).to.eventually.be.rejected + .with.property('message').that.include('aborted') }) - it('should clear timeout if upgrade is successful', async () => { - ({ dialer, dialerComponents, listener, listenerComponents } = await createPeers({ - connectionManager: { - inboundUpgradeTimeout: 100 - } - }, { - connectionManager: { - inboundUpgradeTimeout: 100 - } - })) + it('should abort if outbound upgrade is slow', async () => { + const upgrader = new DefaultUpgrader(await createDefaultUpgraderComponents(), { + ...init, + outboundUpgradeTimeout: 100 + }) - await dialer.dial(listener.getMultiaddrs()) + maConn.source = map(maConn.source, async (buf) => { + await delay(2000) + return buf + }) - await delay(1000) + await expect(upgrader.upgradeOutbound(maConn)).to.eventually.be.rejected + .with.property('message').that.include('aborted') + }) - // connections should still be open after timeout - expect(dialer.getConnections(listener.peerId)).to.have.lengthOf(1) - expect(listener.getConnections(dialer.peerId)).to.have.lengthOf(1) + it('should abort by signal if inbound upgrade is slow', async () => { + const upgrader = new DefaultUpgrader(await createDefaultUpgraderComponents(), { + ...init, + inboundUpgradeTimeout: 10000 + }) + + maConn.source = map(maConn.source, async (buf) => { + await delay(2000) + return buf + }) + + await expect(upgrader.upgradeOutbound(maConn, { + signal: AbortSignal.timeout(100) + })).to.eventually.be.rejected + .with.property('message').that.include('aborted') }) - it('should not abort if upgrade is successful', async () => { - ({ dialer, dialerComponents, listener, listenerComponents } = await createPeers({ - connectionManager: { - inboundUpgradeTimeout: 10000 - } - }, { - connectionManager: { - inboundUpgradeTimeout: 10000 - } - })) + it('should abort by signal if outbound upgrade is slow', async () => { + const upgrader = new DefaultUpgrader(await createDefaultUpgraderComponents(), { + ...init, + outboundUpgradeTimeout: 10000 + }) + + maConn.source = map(maConn.source, async (buf) => { + await delay(2000) + return buf + }) + + await expect(upgrader.upgradeOutbound(maConn, { + signal: AbortSignal.timeout(100) + })).to.eventually.be.rejected + .with.property('message').that.include('aborted') + }) - await dialer.dial(listener.getMultiaddrs(), { - signal: AbortSignal.timeout(500) + it('should not abort if inbound upgrade is successful', async () => { + const upgrader = new DefaultUpgrader(await createDefaultUpgraderComponents(), { + ...init, + inboundUpgradeTimeout: 100 }) + const conn = await upgrader.upgradeInbound(maConn) await delay(1000) // connections should still be open after timeout - expect(dialer.getConnections(listener.peerId)).to.have.lengthOf(1) - expect(listener.getConnections(dialer.peerId)).to.have.lengthOf(1) + expect(conn.status).to.equal('open') }) - it('should fail if muxers do not match', async () => { - ({ dialer, dialerComponents, listener, listenerComponents } = await createPeers({ - streamMuxers: [ - () => stubInterface({ - protocol: '/acme-muxer' - }) - ] - }, { - streamMuxers: [ - () => stubInterface({ - protocol: '/example-muxer' - }) - ] - })) - - const dialerUpgraderUpgradeOutboundSpy = Sinon.spy(dialerComponents.upgrader, 'upgradeOutbound') - const listenerUpgraderUpgradeInboundSpy = Sinon.spy(listenerComponents.upgrader, 'upgradeInbound') + it('should not abort if outbound upgrade is successful', async () => { + const upgrader = new DefaultUpgrader(await createDefaultUpgraderComponents(), { + ...init, + inboundUpgradeTimeout: 100 + }) + const conn = await upgrader.upgradeOutbound(maConn) - await expect(dialer.dial(listener.getMultiaddrs())).to.eventually.be.rejected - .with.property('name', 'MuxerUnavailableError') + await delay(1000) - // Ensure both sides fail - await expect(dialerUpgraderUpgradeOutboundSpy.getCall(0).returnValue).to.eventually.be.rejected - .with.property('name', 'MuxerUnavailableError') - await expect(listenerUpgraderUpgradeInboundSpy.getCall(0).returnValue).to.eventually.be.rejected - .with.property('name', 'MuxerUnavailableError') + // connections should still be open after timeout + expect(conn.status).to.equal('open') }) - it('should emit connection events', async () => { - ({ dialer, dialerComponents, listener, listenerComponents } = await createPeers()) - - const localConnectionEventReceived = pDefer() - const localConnectionEndEventReceived = pDefer() - const localPeerConnectEventReceived = pDefer() - const localPeerDisconnectEventReceived = pDefer() - const remoteConnectionEventReceived = pDefer() - const remoteConnectionEndEventReceived = pDefer() - const remotePeerConnectEventReceived = pDefer() - const remotePeerDisconnectEventReceived = pDefer() - - dialerComponents.events.addEventListener('connection:open', (event) => { - expect(event.detail.remotePeer.equals(listener.peerId)).to.be.true() - localConnectionEventReceived.resolve() - }) - dialerComponents.events.addEventListener('connection:close', (event) => { - expect(event.detail.remotePeer.equals(listener.peerId)).to.be.true() - localConnectionEndEventReceived.resolve() - }) - dialerComponents.events.addEventListener('peer:connect', (event) => { - expect(event.detail.equals(listener.peerId)).to.be.true() - localPeerConnectEventReceived.resolve() - }) - dialerComponents.events.addEventListener('peer:disconnect', (event) => { - expect(event.detail.equals(listener.peerId)).to.be.true() - localPeerDisconnectEventReceived.resolve() - }) - - listenerComponents.events.addEventListener('connection:open', (event) => { - expect(event.detail.remotePeer.equals(dialer.peerId)).to.be.true() - remoteConnectionEventReceived.resolve() - }) - listenerComponents.events.addEventListener('connection:close', (event) => { - expect(event.detail.remotePeer.equals(dialer.peerId)).to.be.true() - remoteConnectionEndEventReceived.resolve() - }) - listenerComponents.events.addEventListener('peer:connect', (event) => { - expect(event.detail.equals(dialer.peerId)).to.be.true() - remotePeerConnectEventReceived.resolve() - }) - listenerComponents.events.addEventListener('peer:disconnect', (event) => { - expect(event.detail.equals(dialer.peerId)).to.be.true() - remotePeerDisconnectEventReceived.resolve() - }) - - await dialer.dial(listener.getMultiaddrs()) - - // Verify onConnection is called with the connection - const connections = await Promise.all([ - ...dialer.getConnections(listener.peerId), - ...listener.getConnections(dialer.peerId) - ]) - expect(connections).to.have.lengthOf(2) - - await Promise.all([ - localConnectionEventReceived.promise, - localPeerConnectEventReceived.promise, - remoteConnectionEventReceived.promise, - remotePeerConnectEventReceived.promise - ]) - - // Verify onConnectionEnd is called with the connection - await Promise.all(connections.map(async conn => { await conn.close() })) - - await Promise.all([ - localConnectionEndEventReceived.promise, - localPeerDisconnectEventReceived.promise, - remoteConnectionEndEventReceived.promise, - remotePeerDisconnectEventReceived.promise - ]) - }) + it('should not abort by signal if inbound upgrade is successful', async () => { + const upgrader = new DefaultUpgrader(await createDefaultUpgraderComponents(), { + ...init, + inboundUpgradeTimeout: 10000 + }) + const conn = await upgrader.upgradeInbound(maConn, { + signal: AbortSignal.timeout(100) + }) + + await delay(1000) - it('should fail to create a stream for an unsupported protocol', async () => { - ({ dialer, listener } = await createPeers()) + // connections should still be open after timeout + expect(conn.status).to.equal('open') + }) - await dialer.dial(listener.getMultiaddrs()) + it('should not abort by signal if outbound upgrade is successful', async () => { + const upgrader = new DefaultUpgrader(await createDefaultUpgraderComponents(), { + ...init, + inboundUpgradeTimeout: 10000 + }) + const conn = await upgrader.upgradeOutbound(maConn, { + signal: AbortSignal.timeout(100) + }) - const connections = await Promise.all([ - ...dialer.getConnections(listener.peerId), - ...listener.getConnections(dialer.peerId) - ]) - expect(connections).to.have.lengthOf(2) + await delay(1000) - await expect(connections[0].newStream('/unsupported/1.0.0')).to.eventually.be.rejected - .with.property('name', 'UnsupportedProtocolError') - await expect(connections[1].newStream('/unsupported/1.0.0')).to.eventually.be.rejected - .with.property('name', 'UnsupportedProtocolError') + // connections should still be open after timeout + expect(conn.status).to.equal('open') }) - it('should abort protocol selection for slow stream creation', async () => { - ({ dialer, listener } = await createPeers({ + it('should abort protocol selection for slow outbound stream creation', async () => { + const upgrader = new DefaultUpgrader(await createDefaultUpgraderComponents(), { + ...init, streamMuxers: [ - slowMuxer(1000) + stubInterface({ + protocol: muxerProtocol, + createStreamMuxer: () => stubInterface({ + protocol: muxerProtocol, + sink: async (source) => drain(source), + source: (async function * () {})(), + newStream: () => stubInterface({ + id: 'stream-id', + log: logger('test-stream'), + sink: async (source) => drain(source), + source: (async function * (): any { + await delay(2000) + yield Uint8Array.from([0, 1, 2, 3, 4]) + })() + }) + }) + }) ] - })) - - const connection = await dialer.dial(listener.getMultiaddrs()) + }) + const conn = await upgrader.upgradeOutbound(maConn) - await expect(connection.newStream('/echo/1.0.0', { + await expect(conn.newStream('/echo/1.0.0', { signal: AbortSignal.timeout(100) })).to.eventually.be.rejected .with.property('name', 'AbortError') }) - it('should close streams when protocol negotiation fails', async () => { - ({ dialer, listener } = await createPeers()) + it('should abort stream when protocol negotiation fails on outbound stream', async () => { + let stream: Stream | undefined - await dialer.dial(listener.getMultiaddrs()) - - const connections = await Promise.all([ - ...dialer.getConnections(listener.peerId), - ...listener.getConnections(dialer.peerId) - ]) - expect(connections).to.have.lengthOf(2) - expect(connections[0].streams).to.have.lengthOf(0) - expect(connections[1].streams).to.have.lengthOf(0) + const upgrader = new DefaultUpgrader(await createDefaultUpgraderComponents(), { + ...init, + streamMuxers: [ + stubInterface({ + protocol: muxerProtocol, + createStreamMuxer: () => stubInterface({ + protocol: muxerProtocol, + sink: async (source) => drain(source), + source: (async function * () { + await delay(2000) + yield Uint8Array.from([0, 1, 2, 3, 4]) + })(), + newStream: () => { + stream = stubInterface({ + id: 'stream-id', + log: logger('test-stream'), + sink: async (source) => drain(source), + source: map((async function * () { + yield '/multistream/1.0.0\n' + yield '/different/protocol\n' + })(), str => encode.single(uint8ArrayFromString(str))) + }) + + return stream + } + }) + }) + ] + }) + const conn = await upgrader.upgradeOutbound(maConn) - await expect(connections[0].newStream('/foo/1.0.0')) + await expect(conn.newStream('/foo/1.0.0')) .to.eventually.be.rejected.with.property('name', 'UnsupportedProtocolError') // wait for remote to close await delay(100) - expect(connections[0].streams).to.have.lengthOf(0) - expect(connections[1].streams).to.have.lengthOf(0) + expect(stream?.abort).to.have.property('called', true) }) - it('should allow skipping encryption and protection', async () => { - const protector = stubInterface() - const encrypter = stubInterface() + it('should allow skipping outbound encryption and protection', async () => { + const connectionProtector = stubInterface() + const connectionEncrypter = stubInterface({ + protocol: encrypterProtocol + }) - ;({ dialer, listener } = await createPeers({ - transports: [ - memory({ - upgraderOptions: { - skipEncryption: true, - skipProtection: true - } - }) - ], + const upgrader = new DefaultUpgrader(await createDefaultUpgraderComponents({ + connectionProtector + }), { + ...init, connectionEncrypters: [ - () => encrypter - ], - connectionProtector: () => protector - }, { - transports: [ - memory({ - upgraderOptions: { - skipEncryption: true, - skipProtection: true - } + connectionEncrypter + ] + }) + await upgrader.upgradeOutbound(maConn, { + skipEncryption: true, + skipProtection: true, + muxerFactory: stubInterface({ + createStreamMuxer: () => stubInterface({ + protocol: muxerProtocol, + sink: async (source) => drain(source), + source: (async function * () {})() }) - ], - connectionEncrypters: [ - () => encrypter - ], - connectionProtector: () => protector - })) + }) + }) + expect(connectionProtector.protect).to.have.property('called', false) + expect(connectionEncrypter.secureOutbound).to.have.property('called', false) + }) - const input = Uint8Array.from([0, 1, 2, 3, 4]) - const output = await dialer.services.echo.echo(listener.getMultiaddrs(), input) - expect(output).to.equalBytes(input) + it('should allow skipping inbound encryption and protection', async () => { + const connectionProtector = stubInterface() + const connectionEncrypter = stubInterface({ + protocol: encrypterProtocol + }) - expect(encrypter.secureInbound.called).to.be.false('used connection encrypter') - expect(encrypter.secureOutbound.called).to.be.false('used connection encrypter') - expect(protector.protect.called).to.be.false('used connection protector') + const upgrader = new DefaultUpgrader(await createDefaultUpgraderComponents({ + connectionProtector + }), { + ...init, + connectionEncrypters: [ + connectionEncrypter + ] + }) + await upgrader.upgradeInbound(maConn, { + skipEncryption: true, + skipProtection: true, + muxerFactory: stubInterface({ + createStreamMuxer: () => stubInterface({ + protocol: muxerProtocol, + sink: async (source) => drain(source), + source: (async function * () {})() + }) + }) + }) + expect(connectionProtector.protect).to.have.property('called', false) + expect(connectionEncrypter.secureOutbound).to.have.property('called', false) }) it('should not decrement inbound pending connection count if the connection is denied', async () => { - ({ dialer, dialerComponents, listener, listenerComponents } = await createPeers()) - - listenerComponents.connectionManager.acceptIncomingConnection = async () => false - const afterUpgradeInboundSpy = Sinon.spy(listenerComponents.connectionManager, 'afterUpgradeInbound') - - await expect(dialer.dial(listener.getMultiaddrs())).to.eventually.be.rejected - .with.property('message', 'Connection denied') + const components = await createDefaultUpgraderComponents({ + connectionManager: stubInterface({ + acceptIncomingConnection: async () => false + }) + }) + const upgrader = new DefaultUpgrader(components, init) + await expect(upgrader.upgradeInbound(maConn)).to.eventually.be.rejected + .with.property('name', 'ConnectionDeniedError') - expect(afterUpgradeInboundSpy.called).to.be.false() + expect(components.connectionManager.afterUpgradeInbound).to.have.property('called', false) }) it('should limit the number of incoming streams that can be opened using a protocol', async () => { - ({ dialer, listener } = await createPeers()) - - const protocol = '/a-test-protocol/1.0.0' - - await listener.handle(protocol, () => {}, { - maxInboundStreams: 2, - maxOutboundStreams: 2 + const protocol = '/test/protocol' + const maxInboundStreams = 2 + let streamMuxerInit: StreamMuxerInit | undefined + let streamMuxer: StreamMuxer | undefined + const components = await createDefaultUpgraderComponents({ + registrar: stubInterface({ + getHandler: () => ({ + options: { + maxInboundStreams + }, + handler: Sinon.stub() + }), + getProtocols: () => [protocol] + }) + }) + const upgrader = new DefaultUpgrader(components, { + ...init, + streamMuxers: [ + stubInterface({ + protocol: muxerProtocol, + createStreamMuxer: (init) => { + streamMuxerInit = init + streamMuxer = stubInterface({ + protocol: muxerProtocol, + sink: async (source) => drain(source), + source: (async function * () {})(), + streams: [] + }) + return streamMuxer + } + }) + ] }) - const connection = await dialer.dial(listener.getMultiaddrs()) - expect(connection.streams).to.have.lengthOf(0) - - await connection.newStream(protocol) - await connection.newStream(protocol) - - expect(connection.streams).to.have.lengthOf(2) + const conn = await upgrader.upgradeInbound(maConn) + expect(conn.streams).to.have.lengthOf(0) + + for (let i = 0; i < (maxInboundStreams + 1); i++) { + const incomingStream = stubInterface({ + id: `stream-id-${i}`, + log: logger('test-stream'), + direction: 'inbound', + sink: async (source) => drain(source), + source: map((async function * () { + yield '/multistream/1.0.0\n' + yield `${protocol}\n` + })(), str => encode.single(uint8ArrayFromString(str))) + }) + + streamMuxer?.streams.push(incomingStream) + streamMuxerInit?.onIncomingStream?.(incomingStream) + } - const stream = await connection.newStream(protocol) + await delay(100) - await expect(drain(stream.source)).to.eventually.be.rejected() - .with.property('name', 'StreamResetError') + expect(streamMuxer?.streams).to.have.lengthOf(3) + expect(streamMuxer?.streams[0]).to.have.nested.property('abort.called', false) + expect(streamMuxer?.streams[1]).to.have.nested.property('abort.called', false) + expect(streamMuxer?.streams[2]).to.have.nested.property('abort.called', true) }) it('should limit the number of outgoing streams that can be opened using a protocol', async () => { - ({ dialer, listener } = await createPeers()) - - const protocol = '/a-test-protocol/1.0.0' - - await listener.handle(protocol, () => {}, { - maxInboundStreams: 20, - maxOutboundStreams: 20 + const protocol = '/test/protocol' + const maxOutboundStreams = 2 + let streamMuxer: StreamMuxer | undefined + const components = await createDefaultUpgraderComponents({ + registrar: stubInterface({ + getHandler: () => ({ + options: { + maxOutboundStreams + }, + handler: Sinon.stub() + }), + getProtocols: () => [protocol] + }) }) - - await dialer.handle(protocol, () => {}, { - maxInboundStreams: 2, - maxOutboundStreams: 2 + const upgrader = new DefaultUpgrader(components, { + ...init, + streamMuxers: [ + stubInterface({ + protocol: muxerProtocol, + createStreamMuxer: () => { + streamMuxer = stubInterface({ + protocol: muxerProtocol, + sink: async (source) => drain(source), + source: (async function * () {})(), + streams: [], + newStream: () => { + const outgoingStream = stubInterface({ + id: 'stream-id', + log: logger('test-stream'), + direction: 'outbound', + sink: async (source) => drain(source), + source: map((async function * () { + yield '/multistream/1.0.0\n' + yield `${protocol}\n` + })(), str => encode.single(uint8ArrayFromString(str))) + }) + + streamMuxer?.streams.push(outgoingStream) + return outgoingStream + } + }) + return streamMuxer + } + }) + ] }) - const connection = await dialer.dial(listener.getMultiaddrs()) - expect(connection.streams).to.have.lengthOf(0) - - await connection.newStream(protocol) - await connection.newStream(protocol) + const conn = await upgrader.upgradeInbound(maConn) + expect(conn.streams).to.have.lengthOf(0) - expect(connection.streams).to.have.lengthOf(2) + await conn.newStream(protocol) + await conn.newStream(protocol) - await expect(connection.newStream(protocol)).to.eventually.be.rejected() + await expect(conn.newStream(protocol)).to.eventually.be.rejected .with.property('name', 'TooManyOutboundProtocolStreamsError') }) it('should allow overriding the number of outgoing streams that can be opened using a protocol without a handler', async () => { - ({ dialer, listener } = await createPeers()) - - const protocol = '/a-test-protocol/1.0.0' - - await listener.handle(protocol, () => {}, { - maxInboundStreams: 20, - maxOutboundStreams: 20 + const protocol = '/test/protocol' + let streamMuxer: StreamMuxer | undefined + const components = await createDefaultUpgraderComponents({ + registrar: stubInterface({ + getHandler: () => ({ + options: {}, + handler: Sinon.stub() + }), + getProtocols: () => [protocol] + }) + }) + const upgrader = new DefaultUpgrader(components, { + ...init, + streamMuxers: [ + stubInterface({ + protocol: muxerProtocol, + createStreamMuxer: () => { + streamMuxer = stubInterface({ + protocol: muxerProtocol, + sink: async (source) => drain(source), + source: (async function * () {})(), + streams: [], + newStream: () => { + const outgoingStream = stubInterface({ + id: 'stream-id', + log: logger('test-stream'), + direction: 'outbound', + sink: async (source) => drain(source), + source: map((async function * () { + yield '/multistream/1.0.0\n' + yield `${protocol}\n` + })(), str => encode.single(uint8ArrayFromString(str))) + }) + + streamMuxer?.streams.push(outgoingStream) + return outgoingStream + } + }) + return streamMuxer + } + }) + ] }) - const connection = await dialer.dial(listener.getMultiaddrs()) - expect(connection.streams).to.have.lengthOf(0) + const conn = await upgrader.upgradeInbound(maConn) + expect(conn.streams).to.have.lengthOf(0) const opts = { - maxOutboundStreams: 2 + maxOutboundStreams: 3 } - await connection.newStream(protocol, opts) - await connection.newStream(protocol, opts) - - expect(connection.streams).to.have.lengthOf(2) + await conn.newStream(protocol, opts) + await conn.newStream(protocol, opts) + await conn.newStream(protocol, opts) - await expect(connection.newStream(protocol, opts)).to.eventually.be.rejected() + await expect(conn.newStream(protocol, opts)).to.eventually.be.rejected .with.property('name', 'TooManyOutboundProtocolStreamsError') }) }) diff --git a/packages/libp2p/test/upgrading/utils.ts b/packages/libp2p/test/upgrading/utils.ts new file mode 100644 index 0000000000..88b0c23bf9 --- /dev/null +++ b/packages/libp2p/test/upgrading/utils.ts @@ -0,0 +1,39 @@ +/* eslint-env mocha */ + +import { generateKeyPair } from '@libp2p/crypto/keys' +import { TypedEventEmitter } from '@libp2p/interface' +import { defaultLogger } from '@libp2p/logger' +import { peerIdFromPrivateKey } from '@libp2p/peer-id' +import { stubInterface, type StubbedInstance } from 'sinon-ts' +import type { DefaultUpgraderComponents } from '../../src/upgrader.js' +import type { ConnectionGater, PeerId, PeerStore, TypedEventTarget, Libp2pEvents, ComponentLogger, Metrics, ConnectionProtector } from '@libp2p/interface' +import type { ConnectionManager, Registrar } from '@libp2p/interface-internal' + +export interface StubbedDefaultUpgraderComponents { + peerId: PeerId + metrics?: StubbedInstance + connectionManager: StubbedInstance + connectionGater: StubbedInstance + connectionProtector?: StubbedInstance + registrar: StubbedInstance + peerStore: StubbedInstance + events: TypedEventTarget + logger: ComponentLogger +} + +export async function createDefaultUpgraderComponents (options?: Partial): Promise { + return { + peerId: peerIdFromPrivateKey(await generateKeyPair('Ed25519')), + connectionManager: stubInterface({ + acceptIncomingConnection: async () => true + }), + connectionGater: stubInterface(), + registrar: stubInterface(), + peerStore: stubInterface({ + all: async () => [] + }), + events: new TypedEventEmitter(), + logger: defaultLogger(), + ...options + } as unknown as any +}