Skip to content

Commit

Permalink
feat: Migrate Gossip, Sync classes to protobufs (#450)
Browse files Browse the repository at this point in the history
* feat: Add RPC implementation

* wip

* feat: protobufs signer store (#449)

* add IdRegistryEvent proto and utils

* add signerStore

* add signer store to engine

* add changeset

* wip

* feat: protobufs cast store (#452)

* protobuf cast store

* reuse makeCastIdKey and fix import format

* wip

* fix: Add require shim to esbuild (#453)

* sync engine test
  • Loading branch information
adityapk00 authored Jan 25, 2023
1 parent 2b17452 commit a35f6ee
Show file tree
Hide file tree
Showing 22 changed files with 2,836 additions and 2 deletions.
73 changes: 73 additions & 0 deletions apps/protohub/src/network/p2p/connectionFilter.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import { mockMultiaddrConnPair } from '@libp2p/interface-mocks';
import { PeerId } from '@libp2p/interface-peer-id';
import { createEd25519PeerId } from '@libp2p/peer-id-factory';
import { multiaddr } from '@multiformats/multiaddr';
import { ConnectionFilter } from '~/network/p2p/connectionFilter';

let allowedPeerId: PeerId;
let blockedPeerId: PeerId;
let localMultiAddrStr: string;
let allowedMultiAddrStr: string;
let filteredMultiAddrStr: string;

describe('connectionFilter tests', () => {
beforeAll(async () => {
allowedPeerId = await createEd25519PeerId();
blockedPeerId = await createEd25519PeerId();
allowedMultiAddrStr = `/ip4/127.0.0.1/tcp/64454/p2p/${allowedPeerId.toString()}`;
expect(multiaddr(allowedMultiAddrStr)).toBeDefined();
filteredMultiAddrStr = `/ip4/127.0.0.1/tcp/64455/p2p/${blockedPeerId.toString()}`;
expect(multiaddr(filteredMultiAddrStr)).toBeDefined();
localMultiAddrStr = `/ip4/127.0.0.1/tcp/64456/`;
});

test('denies all connections by default', async () => {
const filter = new ConnectionFilter([]);
const { outbound: remoteConnection } = mockMultiaddrConnPair({
addrs: [multiaddr(localMultiAddrStr), multiaddr(allowedMultiAddrStr)],
remotePeer: allowedPeerId,
});
await expect(filter.denyDialPeer(allowedPeerId)).resolves.toBeTruthy();
await expect(filter.denyDialMultiaddr(allowedPeerId, multiaddr(allowedMultiAddrStr))).resolves.toBeTruthy();
// Incepient Inbound Connections are always allowed
await expect(filter.denyInboundConnection(remoteConnection)).resolves.toBeFalsy();
await expect(filter.denyInboundEncryptedConnection(allowedPeerId, remoteConnection)).resolves.toBeTruthy();
await expect(filter.denyInboundUpgradedConnection(allowedPeerId, remoteConnection)).resolves.toBeTruthy();
await expect(filter.denyOutboundConnection(allowedPeerId, remoteConnection)).resolves.toBeTruthy();
await expect(filter.denyOutboundEncryptedConnection(allowedPeerId, remoteConnection)).resolves.toBeTruthy();
await expect(filter.denyOutboundUpgradedConnection(allowedPeerId, remoteConnection)).resolves.toBeTruthy();
await expect(filter.filterMultiaddrForPeer(allowedPeerId)).resolves.toBeFalsy();
});

test('allows selected peers', async () => {
const filter = new ConnectionFilter([allowedPeerId.toString()]);
const { outbound: remoteConnection } = mockMultiaddrConnPair({
addrs: [multiaddr(localMultiAddrStr), multiaddr(allowedMultiAddrStr)],
remotePeer: allowedPeerId,
});
await expect(filter.denyDialPeer(allowedPeerId)).resolves.toBeFalsy();
await expect(filter.denyDialMultiaddr(allowedPeerId, multiaddr(allowedMultiAddrStr))).resolves.toBeFalsy();
// Incepient Inbound Connections are always allowed
await expect(filter.denyInboundConnection(remoteConnection)).resolves.toBeFalsy();
await expect(filter.denyInboundEncryptedConnection(allowedPeerId, remoteConnection)).resolves.toBeFalsy();
await expect(filter.denyInboundUpgradedConnection(allowedPeerId, remoteConnection)).resolves.toBeFalsy();
await expect(filter.denyOutboundConnection(allowedPeerId, remoteConnection)).resolves.toBeFalsy();
await expect(filter.denyOutboundEncryptedConnection(allowedPeerId, remoteConnection)).resolves.toBeFalsy();
await expect(filter.denyOutboundUpgradedConnection(allowedPeerId, remoteConnection)).resolves.toBeFalsy();
await expect(filter.filterMultiaddrForPeer(allowedPeerId)).resolves.toBeTruthy();
});

test('filters unknown peers', async () => {
const filter = new ConnectionFilter([allowedPeerId.toString()]);
const { outbound: remoteConnection } = mockMultiaddrConnPair({
addrs: [multiaddr(localMultiAddrStr), multiaddr(filteredMultiAddrStr)],
remotePeer: blockedPeerId,
});
await expect(filter.denyDialPeer(allowedPeerId)).resolves.toBeFalsy();
await expect(filter.denyDialPeer(blockedPeerId)).resolves.toBeTruthy();
// Incepient Inbound Connections are always allowed
await expect(filter.denyInboundConnection(remoteConnection)).resolves.toBeFalsy();
await expect(filter.denyOutboundConnection(blockedPeerId, remoteConnection)).resolves.toBeTruthy();
await expect(filter.filterMultiaddrForPeer(blockedPeerId)).resolves.toBeFalsy();
});
});
104 changes: 104 additions & 0 deletions apps/protohub/src/network/p2p/connectionFilter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import { ConnectionGater, MultiaddrConnection } from '@libp2p/interface-connection';
import { PeerId } from '@libp2p/interface-peer-id';
import { Multiaddr } from '@multiformats/multiaddr';
import { logger } from '~/utils/logger';

/**
* Implementes the libp2p ConnectionGater interface
*
* These APIs are called in a particular sequence for each inbound/outbound connection.
* We just want to intercept at the lowest possible point which is why every API here is implemented.
*
* Note - arrow functions are used here because libp2p's createLibp2p (see `src/network/node.ts`)
* uses a "recursivePartial" on the passed in object and class methods are not enumerated.
* Using arrow functions allows their recursivePartial enumerator to parse the object.
*/
export class ConnectionFilter implements ConnectionGater {
private allowedPeers: string[] | undefined;

constructor(addrs: string[] | undefined) {
this.allowedPeers = addrs;
}

denyDialPeer = async (peerId: PeerId): Promise<boolean> => {
const deny = this.shouldDeny(peerId.toString());
if (deny) {
logger.info(`ConnectionFilter denyDialPeer: denied a connection with ${peerId}`);
}
return deny;
};

denyDialMultiaddr = async (peerId: PeerId, _multiaddr: Multiaddr): Promise<boolean> => {
const deny = this.shouldDeny(peerId.toString());
if (deny) {
logger.info(`ConnectionFilter denyDialMultiaddr: denied a connection with ${peerId}`);
}
return deny;
};

denyInboundConnection = async (_maConn: MultiaddrConnection): Promise<boolean> => {
/**
* A PeerId is not always known on incipient connections.
* Don't filter incipient connections, later filters will catch it.
*/
return false;
};

denyOutboundConnection = async (peerId: PeerId, _maConn: MultiaddrConnection): Promise<boolean> => {
const deny = this.shouldDeny(peerId.toString());
if (deny) {
logger.info(`ConnectionFilter denyOutboundConnection: denied a connection with ${peerId}`);
}
return deny;
};

denyInboundEncryptedConnection = async (peerId: PeerId, _maConn: MultiaddrConnection): Promise<boolean> => {
const deny = this.shouldDeny(peerId.toString());
if (deny) {
logger.info(`ConnectionFilter denyInboundEncryptedConnection: denied a connection with ${peerId}`);
}
return deny;
};

denyOutboundEncryptedConnection = async (peerId: PeerId, _maConn: MultiaddrConnection): Promise<boolean> => {
const deny = this.shouldDeny(peerId.toString());
if (deny) {
logger.info(`ConnectionFilter denyOutboundEncryptedConnection: denied a connection with ${peerId}`);
}
return deny;
};

denyInboundUpgradedConnection = async (peerId: PeerId, _maConn: MultiaddrConnection): Promise<boolean> => {
const deny = this.shouldDeny(peerId.toString());
if (deny) {
logger.info(`ConnectionFilter denyInboundUpgradedConnection: denied a connection with ${peerId}`);
}
return deny;
};

denyOutboundUpgradedConnection = async (peerId: PeerId, _maConn: MultiaddrConnection): Promise<boolean> => {
const deny = this.shouldDeny(peerId.toString());
if (deny) {
logger.info(`ConnectionFilter denyOutboundUpgradedConnection: denied a connection with ${peerId}`);
}
return deny;
};

filterMultiaddrForPeer = async (peer: PeerId): Promise<boolean> => {
return !this.shouldDeny(peer.toString());
};

/* -------------------------------------------------------------------------- */
/* Private Methods */
/* -------------------------------------------------------------------------- */

private shouldDeny(peerId: string) {
if (!peerId) return true;
if (this.allowedPeers === undefined) return false;

const found = this.allowedPeers.find((value) => {
return peerId && value === peerId;
});
return found === undefined;
}
}
182 changes: 182 additions & 0 deletions apps/protohub/src/network/p2p/gossipNode.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
import * as protobufs from '@farcaster/protobufs';
import { multiaddr } from '@multiformats/multiaddr/';
import { GossipNode } from '~/network/p2p/gossipNode';
import { NETWORK_TOPIC_PRIMARY } from '~/network/p2p/protocol';
import { sleep } from '~/utils/crypto';
import { NetworkFactories } from '../utils/factories';

const NUM_NODES = 10;
const PROPAGATION_DELAY = 3 * 1000; // between 2 and 3 full heartbeat ticks

const TEST_TIMEOUT_LONG = 60 * 1000;
const TEST_TIMEOUT_SHORT = 10 * 1000;

let nodes: GossipNode[];
// map peerId -> topics -> Messages per topic
let messages: Map<string, Map<string, protobufs.GossipMessage[] | undefined>>;

/** Create a sequence of connections between all the nodes */
const connectAll = async (nodes: GossipNode[]) => {
const connectionResults = await Promise.all(
nodes.slice(1).map((n) => {
return n.connect(nodes[0] as GossipNode);
})
);
connectionResults.forEach((r) => expect(r?.isOk()).toBeTruthy());

// subscribe every node to the test topic
nodes.forEach((n) => n.gossip?.subscribe(NETWORK_TOPIC_PRIMARY));
// sleep 5 heartbeats to let the gossipsub network form
await sleep(PROPAGATION_DELAY);
};

const trackMessages = () => {
nodes.forEach((n) => {
{
n.addListener('message', (topic, message) => {
expect(message.isOk()).toBeTruthy();

const peerId = n.peerId?.toString() ?? '';
let existingTopics = messages.get(peerId);
if (!existingTopics) existingTopics = new Map();
let existingMessages = existingTopics.get(topic);
if (!existingMessages) existingMessages = [];

existingMessages.push(message._unsafeUnwrap());
existingTopics.set(topic, existingMessages);
messages.set(peerId, existingTopics);
});
n.registerDebugListeners();
}
});
};

describe('node unit tests', () => {
test('fails to bootstrap to invalid addresses', async () => {
const node = new GossipNode();
const error = (await node.start([multiaddr()]))._unsafeUnwrapErr();
expect(error.errCode).toEqual('unavailable');
expect(error.message).toContain('could not connect to any bootstrap nodes');
await node.stop();
});

test('fails to connect with a node that has not started', async () => {
const node = new GossipNode();
await node.start([]);

let result = await node.connectAddress(multiaddr());
expect(result.isErr()).toBeTruthy();

const offlineNode = new GossipNode();
result = await node.connect(offlineNode);
expect(result.isErr()).toBeTruthy();

await node.stop();
});

test(
'can only dial allowed nodes',
async () => {
const node1 = new GossipNode();
await node1.start([]);

const node2 = new GossipNode();
await node2.start([]);

// node 3 has node 1 in its allow list, but not node 2
const node3 = new GossipNode();
if (node1.peerId) {
await node3.start([], { allowedPeerIdStrs: [node1.peerId.toString()] });
} else {
throw Error('Node1 not started, no peerId found');
}

try {
let dialResult = await node1.connect(node3);
expect(dialResult.isOk()).toBeTruthy();

dialResult = await node2.connect(node3);
expect(dialResult.isErr()).toBeTruthy();

dialResult = await node3.connect(node2);
expect(dialResult.isErr()).toBeTruthy();
} finally {
await node1.stop();
await node2.stop();
await node3.stop();
}
},
TEST_TIMEOUT_SHORT
);

test('port and transport addrs in the Ip MultiAddr is not allowed', async () => {
const node = new GossipNode();
const options = { ipMultiAddr: '/ip4/127.0.0.1/tcp/8080' };
const error = (await node.start([], options))._unsafeUnwrapErr();

expect(error.errCode).toEqual('unavailable');
expect(error.message).toMatch('unexpected multiaddr transport/port information');
expect(node.isStarted()).toBeFalsy();
await node.stop();
});

test('invalid multiaddr format is not allowed', async () => {
const node = new GossipNode();
// an IPv6 being supplied as an IPv4
const options = { ipMultiAddr: '/ip4/2600:1700:6cf0:990:2052:a166:fb35:830a' };
expect((await node.start([], options))._unsafeUnwrapErr().errCode).toEqual('unavailable');
const error = (await node.start([], options))._unsafeUnwrapErr();

expect(error.errCode).toEqual('unavailable');
expect(error.message).toMatch('invalid multiaddr');
expect(node.isStarted()).toBeFalsy();
await node.stop();
});
});

describe('gossip network', () => {
beforeAll(async () => {
nodes = [...Array(NUM_NODES)].map(() => new GossipNode());
messages = new Map();
});

beforeEach(async () => {
messages.clear();
await Promise.all(nodes.map((node) => node.start([])));
});

afterEach(async () => {
await Promise.all(nodes.map((node) => node.stop()));
}, TEST_TIMEOUT_SHORT);

test(
'sends a message to a gossip network',
async () => {
await connectAll(nodes);
nodes.map((n) => expect(n.gossip?.getPeers().length).toBeGreaterThanOrEqual(1));

trackMessages();

const message = NetworkFactories.GossipMessage.build();
// publish via some random node
const randomNode = nodes[Math.floor(Math.random() * nodes.length)] as GossipNode;
expect(randomNode.publish(message)).resolves.toBeUndefined();
// sleep 5 heartbeat ticks
await sleep(PROPAGATION_DELAY);

// check that every node has the message
nodes.map((n) => {
// the sender won't have this message
if (n.peerId?.toString() === randomNode.peerId?.toString()) return;

const topics = messages.get(n.peerId?.toString() ?? '');
expect(topics).toBeDefined();
expect(topics?.has(NETWORK_TOPIC_PRIMARY)).toBeTruthy();
const topicMessages = topics?.get(NETWORK_TOPIC_PRIMARY) ?? [];
expect(topicMessages.length).toBe(1);
expect(topicMessages[0]).toEqual(message);
});
},
TEST_TIMEOUT_LONG
);
});
Loading

0 comments on commit a35f6ee

Please sign in to comment.