From a35f6ee1d3305191eac5d110e920714c3fb13668 Mon Sep 17 00:00:00 2001 From: adityapk00 <31996805+adityapk00@users.noreply.github.com> Date: Wed, 25 Jan 2023 14:50:49 -0800 Subject: [PATCH] feat: Migrate Gossip, Sync classes to protobufs (#450) * feat: Add RPC implementation * wip * feat: protobufs signer store (#449) * add IdRegistryEvent proto and utils * add signerStore * add signer store to engine * add changeset * wip * feat: protobufs cast store (#452) * protobuf cast store * reuse makeCastIdKey and fix import format * wip * fix: Add require shim to esbuild (#453) * sync engine test --- .../src/network/p2p/connectionFilter.test.ts | 73 ++++ .../src/network/p2p/connectionFilter.ts | 104 ++++++ .../src/network/p2p/gossipNode.test.ts | 182 +++++++++ apps/protohub/src/network/p2p/gossipNode.ts | 349 ++++++++++++++++++ apps/protohub/src/network/p2p/protocol.ts | 12 + .../src/network/sync/merkleTrie.test.ts | 305 +++++++++++++++ apps/protohub/src/network/sync/merkleTrie.ts | 108 ++++++ .../src/network/sync/syncEngine.test.ts | 244 ++++++++++++ apps/protohub/src/network/sync/syncEngine.ts | 338 +++++++++++++++++ apps/protohub/src/network/sync/syncId.ts | 51 +++ .../src/network/sync/trieNode.test.ts | 206 +++++++++++ apps/protohub/src/network/sync/trieNode.ts | 270 ++++++++++++++ .../src/network/utils/factories.test.ts | 60 +++ apps/protohub/src/network/utils/factories.ts | 71 ++++ apps/protohub/src/rpc/server/syncService.ts | 22 ++ apps/protohub/src/storage/engine/index.ts | 48 ++- apps/protohub/src/utils/crypto.ts | 5 + apps/protohub/src/utils/logger.ts | 71 ++++ apps/protohub/src/utils/p2p.test.ts | 142 +++++++ apps/protohub/src/utils/p2p.ts | 165 +++++++++ packages/protobufs/package.json | 1 + packages/protobufs/src/index.ts | 11 + 22 files changed, 2836 insertions(+), 2 deletions(-) create mode 100644 apps/protohub/src/network/p2p/connectionFilter.test.ts create mode 100644 apps/protohub/src/network/p2p/connectionFilter.ts create mode 100644 apps/protohub/src/network/p2p/gossipNode.test.ts create mode 100644 apps/protohub/src/network/p2p/gossipNode.ts create mode 100644 apps/protohub/src/network/p2p/protocol.ts create mode 100644 apps/protohub/src/network/sync/merkleTrie.test.ts create mode 100644 apps/protohub/src/network/sync/merkleTrie.ts create mode 100644 apps/protohub/src/network/sync/syncEngine.test.ts create mode 100644 apps/protohub/src/network/sync/syncEngine.ts create mode 100644 apps/protohub/src/network/sync/syncId.ts create mode 100644 apps/protohub/src/network/sync/trieNode.test.ts create mode 100644 apps/protohub/src/network/sync/trieNode.ts create mode 100644 apps/protohub/src/network/utils/factories.test.ts create mode 100644 apps/protohub/src/network/utils/factories.ts create mode 100644 apps/protohub/src/rpc/server/syncService.ts create mode 100644 apps/protohub/src/utils/crypto.ts create mode 100644 apps/protohub/src/utils/logger.ts create mode 100644 apps/protohub/src/utils/p2p.test.ts create mode 100644 apps/protohub/src/utils/p2p.ts diff --git a/apps/protohub/src/network/p2p/connectionFilter.test.ts b/apps/protohub/src/network/p2p/connectionFilter.test.ts new file mode 100644 index 0000000000..dabeded9a8 --- /dev/null +++ b/apps/protohub/src/network/p2p/connectionFilter.test.ts @@ -0,0 +1,73 @@ +import { mockMultiaddrConnPair } from '@libp2p/interface-mocks'; +import { PeerId } from '@libp2p/interface-peer-id'; +import { createEd25519PeerId } from '@libp2p/peer-id-factory'; +import { multiaddr } from '@multiformats/multiaddr'; +import { ConnectionFilter } from '~/network/p2p/connectionFilter'; + +let allowedPeerId: PeerId; +let blockedPeerId: PeerId; +let localMultiAddrStr: string; +let allowedMultiAddrStr: string; +let filteredMultiAddrStr: string; + +describe('connectionFilter tests', () => { + beforeAll(async () => { + allowedPeerId = await createEd25519PeerId(); + blockedPeerId = await createEd25519PeerId(); + allowedMultiAddrStr = `/ip4/127.0.0.1/tcp/64454/p2p/${allowedPeerId.toString()}`; + expect(multiaddr(allowedMultiAddrStr)).toBeDefined(); + filteredMultiAddrStr = `/ip4/127.0.0.1/tcp/64455/p2p/${blockedPeerId.toString()}`; + expect(multiaddr(filteredMultiAddrStr)).toBeDefined(); + localMultiAddrStr = `/ip4/127.0.0.1/tcp/64456/`; + }); + + test('denies all connections by default', async () => { + const filter = new ConnectionFilter([]); + const { outbound: remoteConnection } = mockMultiaddrConnPair({ + addrs: [multiaddr(localMultiAddrStr), multiaddr(allowedMultiAddrStr)], + remotePeer: allowedPeerId, + }); + await expect(filter.denyDialPeer(allowedPeerId)).resolves.toBeTruthy(); + await expect(filter.denyDialMultiaddr(allowedPeerId, multiaddr(allowedMultiAddrStr))).resolves.toBeTruthy(); + // Incepient Inbound Connections are always allowed + await expect(filter.denyInboundConnection(remoteConnection)).resolves.toBeFalsy(); + await expect(filter.denyInboundEncryptedConnection(allowedPeerId, remoteConnection)).resolves.toBeTruthy(); + await expect(filter.denyInboundUpgradedConnection(allowedPeerId, remoteConnection)).resolves.toBeTruthy(); + await expect(filter.denyOutboundConnection(allowedPeerId, remoteConnection)).resolves.toBeTruthy(); + await expect(filter.denyOutboundEncryptedConnection(allowedPeerId, remoteConnection)).resolves.toBeTruthy(); + await expect(filter.denyOutboundUpgradedConnection(allowedPeerId, remoteConnection)).resolves.toBeTruthy(); + await expect(filter.filterMultiaddrForPeer(allowedPeerId)).resolves.toBeFalsy(); + }); + + test('allows selected peers', async () => { + const filter = new ConnectionFilter([allowedPeerId.toString()]); + const { outbound: remoteConnection } = mockMultiaddrConnPair({ + addrs: [multiaddr(localMultiAddrStr), multiaddr(allowedMultiAddrStr)], + remotePeer: allowedPeerId, + }); + await expect(filter.denyDialPeer(allowedPeerId)).resolves.toBeFalsy(); + await expect(filter.denyDialMultiaddr(allowedPeerId, multiaddr(allowedMultiAddrStr))).resolves.toBeFalsy(); + // Incepient Inbound Connections are always allowed + await expect(filter.denyInboundConnection(remoteConnection)).resolves.toBeFalsy(); + await expect(filter.denyInboundEncryptedConnection(allowedPeerId, remoteConnection)).resolves.toBeFalsy(); + await expect(filter.denyInboundUpgradedConnection(allowedPeerId, remoteConnection)).resolves.toBeFalsy(); + await expect(filter.denyOutboundConnection(allowedPeerId, remoteConnection)).resolves.toBeFalsy(); + await expect(filter.denyOutboundEncryptedConnection(allowedPeerId, remoteConnection)).resolves.toBeFalsy(); + await expect(filter.denyOutboundUpgradedConnection(allowedPeerId, remoteConnection)).resolves.toBeFalsy(); + await expect(filter.filterMultiaddrForPeer(allowedPeerId)).resolves.toBeTruthy(); + }); + + test('filters unknown peers', async () => { + const filter = new ConnectionFilter([allowedPeerId.toString()]); + const { outbound: remoteConnection } = mockMultiaddrConnPair({ + addrs: [multiaddr(localMultiAddrStr), multiaddr(filteredMultiAddrStr)], + remotePeer: blockedPeerId, + }); + await expect(filter.denyDialPeer(allowedPeerId)).resolves.toBeFalsy(); + await expect(filter.denyDialPeer(blockedPeerId)).resolves.toBeTruthy(); + // Incepient Inbound Connections are always allowed + await expect(filter.denyInboundConnection(remoteConnection)).resolves.toBeFalsy(); + await expect(filter.denyOutboundConnection(blockedPeerId, remoteConnection)).resolves.toBeTruthy(); + await expect(filter.filterMultiaddrForPeer(blockedPeerId)).resolves.toBeFalsy(); + }); +}); diff --git a/apps/protohub/src/network/p2p/connectionFilter.ts b/apps/protohub/src/network/p2p/connectionFilter.ts new file mode 100644 index 0000000000..d1fbc9b329 --- /dev/null +++ b/apps/protohub/src/network/p2p/connectionFilter.ts @@ -0,0 +1,104 @@ +import { ConnectionGater, MultiaddrConnection } from '@libp2p/interface-connection'; +import { PeerId } from '@libp2p/interface-peer-id'; +import { Multiaddr } from '@multiformats/multiaddr'; +import { logger } from '~/utils/logger'; + +/** + * Implementes the libp2p ConnectionGater interface + * + * These APIs are called in a particular sequence for each inbound/outbound connection. + * We just want to intercept at the lowest possible point which is why every API here is implemented. + * + * Note - arrow functions are used here because libp2p's createLibp2p (see `src/network/node.ts`) + * uses a "recursivePartial" on the passed in object and class methods are not enumerated. + * Using arrow functions allows their recursivePartial enumerator to parse the object. + */ +export class ConnectionFilter implements ConnectionGater { + private allowedPeers: string[] | undefined; + + constructor(addrs: string[] | undefined) { + this.allowedPeers = addrs; + } + + denyDialPeer = async (peerId: PeerId): Promise => { + const deny = this.shouldDeny(peerId.toString()); + if (deny) { + logger.info(`ConnectionFilter denyDialPeer: denied a connection with ${peerId}`); + } + return deny; + }; + + denyDialMultiaddr = async (peerId: PeerId, _multiaddr: Multiaddr): Promise => { + const deny = this.shouldDeny(peerId.toString()); + if (deny) { + logger.info(`ConnectionFilter denyDialMultiaddr: denied a connection with ${peerId}`); + } + return deny; + }; + + denyInboundConnection = async (_maConn: MultiaddrConnection): Promise => { + /** + * A PeerId is not always known on incipient connections. + * Don't filter incipient connections, later filters will catch it. + */ + return false; + }; + + denyOutboundConnection = async (peerId: PeerId, _maConn: MultiaddrConnection): Promise => { + const deny = this.shouldDeny(peerId.toString()); + if (deny) { + logger.info(`ConnectionFilter denyOutboundConnection: denied a connection with ${peerId}`); + } + return deny; + }; + + denyInboundEncryptedConnection = async (peerId: PeerId, _maConn: MultiaddrConnection): Promise => { + const deny = this.shouldDeny(peerId.toString()); + if (deny) { + logger.info(`ConnectionFilter denyInboundEncryptedConnection: denied a connection with ${peerId}`); + } + return deny; + }; + + denyOutboundEncryptedConnection = async (peerId: PeerId, _maConn: MultiaddrConnection): Promise => { + const deny = this.shouldDeny(peerId.toString()); + if (deny) { + logger.info(`ConnectionFilter denyOutboundEncryptedConnection: denied a connection with ${peerId}`); + } + return deny; + }; + + denyInboundUpgradedConnection = async (peerId: PeerId, _maConn: MultiaddrConnection): Promise => { + const deny = this.shouldDeny(peerId.toString()); + if (deny) { + logger.info(`ConnectionFilter denyInboundUpgradedConnection: denied a connection with ${peerId}`); + } + return deny; + }; + + denyOutboundUpgradedConnection = async (peerId: PeerId, _maConn: MultiaddrConnection): Promise => { + const deny = this.shouldDeny(peerId.toString()); + if (deny) { + logger.info(`ConnectionFilter denyOutboundUpgradedConnection: denied a connection with ${peerId}`); + } + return deny; + }; + + filterMultiaddrForPeer = async (peer: PeerId): Promise => { + return !this.shouldDeny(peer.toString()); + }; + + /* -------------------------------------------------------------------------- */ + /* Private Methods */ + /* -------------------------------------------------------------------------- */ + + private shouldDeny(peerId: string) { + if (!peerId) return true; + if (this.allowedPeers === undefined) return false; + + const found = this.allowedPeers.find((value) => { + return peerId && value === peerId; + }); + return found === undefined; + } +} diff --git a/apps/protohub/src/network/p2p/gossipNode.test.ts b/apps/protohub/src/network/p2p/gossipNode.test.ts new file mode 100644 index 0000000000..1b7698b02f --- /dev/null +++ b/apps/protohub/src/network/p2p/gossipNode.test.ts @@ -0,0 +1,182 @@ +import * as protobufs from '@farcaster/protobufs'; +import { multiaddr } from '@multiformats/multiaddr/'; +import { GossipNode } from '~/network/p2p/gossipNode'; +import { NETWORK_TOPIC_PRIMARY } from '~/network/p2p/protocol'; +import { sleep } from '~/utils/crypto'; +import { NetworkFactories } from '../utils/factories'; + +const NUM_NODES = 10; +const PROPAGATION_DELAY = 3 * 1000; // between 2 and 3 full heartbeat ticks + +const TEST_TIMEOUT_LONG = 60 * 1000; +const TEST_TIMEOUT_SHORT = 10 * 1000; + +let nodes: GossipNode[]; +// map peerId -> topics -> Messages per topic +let messages: Map>; + +/** Create a sequence of connections between all the nodes */ +const connectAll = async (nodes: GossipNode[]) => { + const connectionResults = await Promise.all( + nodes.slice(1).map((n) => { + return n.connect(nodes[0] as GossipNode); + }) + ); + connectionResults.forEach((r) => expect(r?.isOk()).toBeTruthy()); + + // subscribe every node to the test topic + nodes.forEach((n) => n.gossip?.subscribe(NETWORK_TOPIC_PRIMARY)); + // sleep 5 heartbeats to let the gossipsub network form + await sleep(PROPAGATION_DELAY); +}; + +const trackMessages = () => { + nodes.forEach((n) => { + { + n.addListener('message', (topic, message) => { + expect(message.isOk()).toBeTruthy(); + + const peerId = n.peerId?.toString() ?? ''; + let existingTopics = messages.get(peerId); + if (!existingTopics) existingTopics = new Map(); + let existingMessages = existingTopics.get(topic); + if (!existingMessages) existingMessages = []; + + existingMessages.push(message._unsafeUnwrap()); + existingTopics.set(topic, existingMessages); + messages.set(peerId, existingTopics); + }); + n.registerDebugListeners(); + } + }); +}; + +describe('node unit tests', () => { + test('fails to bootstrap to invalid addresses', async () => { + const node = new GossipNode(); + const error = (await node.start([multiaddr()]))._unsafeUnwrapErr(); + expect(error.errCode).toEqual('unavailable'); + expect(error.message).toContain('could not connect to any bootstrap nodes'); + await node.stop(); + }); + + test('fails to connect with a node that has not started', async () => { + const node = new GossipNode(); + await node.start([]); + + let result = await node.connectAddress(multiaddr()); + expect(result.isErr()).toBeTruthy(); + + const offlineNode = new GossipNode(); + result = await node.connect(offlineNode); + expect(result.isErr()).toBeTruthy(); + + await node.stop(); + }); + + test( + 'can only dial allowed nodes', + async () => { + const node1 = new GossipNode(); + await node1.start([]); + + const node2 = new GossipNode(); + await node2.start([]); + + // node 3 has node 1 in its allow list, but not node 2 + const node3 = new GossipNode(); + if (node1.peerId) { + await node3.start([], { allowedPeerIdStrs: [node1.peerId.toString()] }); + } else { + throw Error('Node1 not started, no peerId found'); + } + + try { + let dialResult = await node1.connect(node3); + expect(dialResult.isOk()).toBeTruthy(); + + dialResult = await node2.connect(node3); + expect(dialResult.isErr()).toBeTruthy(); + + dialResult = await node3.connect(node2); + expect(dialResult.isErr()).toBeTruthy(); + } finally { + await node1.stop(); + await node2.stop(); + await node3.stop(); + } + }, + TEST_TIMEOUT_SHORT + ); + + test('port and transport addrs in the Ip MultiAddr is not allowed', async () => { + const node = new GossipNode(); + const options = { ipMultiAddr: '/ip4/127.0.0.1/tcp/8080' }; + const error = (await node.start([], options))._unsafeUnwrapErr(); + + expect(error.errCode).toEqual('unavailable'); + expect(error.message).toMatch('unexpected multiaddr transport/port information'); + expect(node.isStarted()).toBeFalsy(); + await node.stop(); + }); + + test('invalid multiaddr format is not allowed', async () => { + const node = new GossipNode(); + // an IPv6 being supplied as an IPv4 + const options = { ipMultiAddr: '/ip4/2600:1700:6cf0:990:2052:a166:fb35:830a' }; + expect((await node.start([], options))._unsafeUnwrapErr().errCode).toEqual('unavailable'); + const error = (await node.start([], options))._unsafeUnwrapErr(); + + expect(error.errCode).toEqual('unavailable'); + expect(error.message).toMatch('invalid multiaddr'); + expect(node.isStarted()).toBeFalsy(); + await node.stop(); + }); +}); + +describe('gossip network', () => { + beforeAll(async () => { + nodes = [...Array(NUM_NODES)].map(() => new GossipNode()); + messages = new Map(); + }); + + beforeEach(async () => { + messages.clear(); + await Promise.all(nodes.map((node) => node.start([]))); + }); + + afterEach(async () => { + await Promise.all(nodes.map((node) => node.stop())); + }, TEST_TIMEOUT_SHORT); + + test( + 'sends a message to a gossip network', + async () => { + await connectAll(nodes); + nodes.map((n) => expect(n.gossip?.getPeers().length).toBeGreaterThanOrEqual(1)); + + trackMessages(); + + const message = NetworkFactories.GossipMessage.build(); + // publish via some random node + const randomNode = nodes[Math.floor(Math.random() * nodes.length)] as GossipNode; + expect(randomNode.publish(message)).resolves.toBeUndefined(); + // sleep 5 heartbeat ticks + await sleep(PROPAGATION_DELAY); + + // check that every node has the message + nodes.map((n) => { + // the sender won't have this message + if (n.peerId?.toString() === randomNode.peerId?.toString()) return; + + const topics = messages.get(n.peerId?.toString() ?? ''); + expect(topics).toBeDefined(); + expect(topics?.has(NETWORK_TOPIC_PRIMARY)).toBeTruthy(); + const topicMessages = topics?.get(NETWORK_TOPIC_PRIMARY) ?? []; + expect(topicMessages.length).toBe(1); + expect(topicMessages[0]).toEqual(message); + }); + }, + TEST_TIMEOUT_LONG + ); +}); diff --git a/apps/protohub/src/network/p2p/gossipNode.ts b/apps/protohub/src/network/p2p/gossipNode.ts new file mode 100644 index 0000000000..ae76f2e509 --- /dev/null +++ b/apps/protohub/src/network/p2p/gossipNode.ts @@ -0,0 +1,349 @@ +import { GossipSub } from '@chainsafe/libp2p-gossipsub'; +import { Noise } from '@chainsafe/libp2p-noise'; +import * as protobufs from '@farcaster/protobufs'; +import { HubError, HubResult } from '@farcaster/protoutils'; +import { Connection } from '@libp2p/interface-connection'; +import { PeerId } from '@libp2p/interface-peer-id'; +import { Mplex } from '@libp2p/mplex'; +import { PubSubPeerDiscovery } from '@libp2p/pubsub-peer-discovery'; +import { TCP } from '@libp2p/tcp'; +import { Multiaddr } from '@multiformats/multiaddr'; +import { createLibp2p, Libp2p } from 'libp2p'; +import { err, ok, Result, ResultAsync } from 'neverthrow'; +import { TypedEmitter } from 'tiny-typed-emitter'; +import { ConnectionFilter } from '~/network/p2p/connectionFilter'; +import { GOSSIP_TOPICS, NETWORK_TOPIC_PRIMARY } from '~/network/p2p/protocol'; +import { logger } from '~/utils/logger'; +import { addressInfoFromParts, checkNodeAddrs, ipMultiAddrStrFromAddressInfo } from '~/utils/p2p'; + +const MultiaddrLocalHost = '/ip4/127.0.0.1'; + +const log = logger.child({ component: 'Node' }); + +interface NodeEvents { + /** + * Triggered when a new message is received. Provides the topic the message was received on + * as well as the result of decoding the message + */ + message: (topic: string, message: HubResult) => void; + /** Triggered when a peer is connected. Provides the Libp2p Connection object. */ + peerConnect: (connection: Connection) => void; + /** Triggered when a peer is disconnected. Provides the Libp2p Connecion object. */ + peerDisconnect: (connection: Connection) => void; +} + +/** Node create options */ +interface NodeOptions { + /** PeerId to use as the Node's Identity. Generates a new ephemeral PeerId if not specified*/ + peerId?: PeerId | undefined; + /** IP address in MultiAddr format to bind to */ + ipMultiAddr?: string | undefined; + /** External Ip address to announce */ + announceIp?: string | undefined; + /** Port to listen for gossip. Picks a port at random if not specified. This is combined with the IPMultiAddr */ + gossipPort?: number | undefined; + /** A list of addresses to peer with. PeersIds outside of this list will not be able to connect to this node */ + allowedPeerIdStrs?: string[] | undefined; +} + +/** + * A representation of a libp2p network node. + * + * Nodes participate in the p2p GossipSub network we create using libp2p. + */ +export class GossipNode extends TypedEmitter { + private _node?: Libp2p; + + /** + * Returns the PublicKey of the node + */ + get peerId() { + return this._node?.peerId; + } + + /** + * Returns all known addresses of the node + * + * This contains local addresses as well and will need to be + * checked for reachability prior to establishing connections + */ + get multiaddrs() { + return this._node?.getMultiaddrs(); + } + + get addressBook() { + return this._node?.peerStore.addressBook; + } + + async getPeerInfo(peerId: PeerId) { + const existingConnections = this._node?.connectionManager.getConnections(peerId); + for (const conn of existingConnections ?? []) { + const knownAddrs = await this._node?.peerStore.addressBook.get(peerId); + if (knownAddrs && !knownAddrs.find((addr) => addr.multiaddr.equals(conn.remoteAddr))) { + // updates the peer store + await this._node?.peerStore.addressBook.add(peerId, [conn.remoteAddr]); + } + } + return await this._node?.peerStore.get(peerId); + } + + /** + * Returns a the GossipSub implementation used by the Node + */ + get gossip() { + const pubsub = this._node?.pubsub; + return pubsub ? (pubsub as GossipSub) : undefined; + } + + /** + * Creates and Starts the underlying libp2p node. Nodes must be started prior to any network configuration or communication. + * + * @param bootstrapAddrs A list of addresses to bootstrap from. Attempts to connect with each peer in the list + * @param startOptions Options to configure the node's behavior + */ + async start(bootstrapAddrs: Multiaddr[], startOptions?: NodeOptions): Promise> { + const createResult = await this.createNode(startOptions ?? {}); + if (createResult.isErr()) return err(createResult.error); + + this._node = createResult.value; + this.registerListeners(); + + this.registerDebugListeners(); + + await this._node.start(); + log.info( + { identity: this.identity, addresses: this._node.getMultiaddrs().map((a) => a.toString()) }, + 'Starting libp2p' + ); + + return this.bootstrap(bootstrapAddrs); + } + + isStarted() { + return this._node?.isStarted(); + } + + /** + * Stops the node's participation on the libp2p network and tears down pubsub + */ + async stop() { + await this._node?.stop(); + log.info({ identity: this.identity }, 'Stopped libp2p...'); + } + + get identity() { + return this.peerId?.toString(); + } + + /** + * Publishes a message to the GossipSub network + * @message The GossipMessage to publish to the network + */ + async publish(message: protobufs.GossipMessage) { + const topics = message.topics; + const encodedMessage = GossipNode.encodeMessage(message); + + log.debug({ identity: this.identity }, `Publishing message to topics: ${topics}`); + encodedMessage.match( + async (msg) => { + const results = await Promise.all(topics.map((topic) => this.gossip?.publish(topic, msg))); + log.debug({ identity: this.identity, results }, 'Published to gossip peers'); + }, + async (err) => { + log.error(err, 'Failed to publish message.'); + } + ); + } + + /** + * Gossip out a Message to the network + */ + async gossipMessage(message: protobufs.Message) { + const gossipMessage = protobufs.GossipMessage.create({ + message, + topics: [NETWORK_TOPIC_PRIMARY], + peerId: this.peerId?.toBytes() ?? new Uint8Array(), + }); + await this.publish(gossipMessage); + } + + /** Gossip out our contact info to the network */ + async gossipContactInfo(contactInfo: protobufs.ContactInfoContent) { + const gossipMessage = protobufs.GossipMessage.create({ + contactInfoContent: contactInfo, + topics: [NETWORK_TOPIC_PRIMARY], + peerId: this.peerId?.toBytes() ?? new Uint8Array(), + }); + await this.publish(gossipMessage); + } + + /** + * Connect with a peer Node + * + * @param node The peer Node to attempt a connection with + */ + async connect(node: GossipNode): Promise> { + const multiaddrs = node.multiaddrs; + if (multiaddrs) { + // how to select an addr here? + for (const addr of multiaddrs) { + try { + const result = await this.connectAddress(addr); + // bail after the first successful connection + if (result.isOk()) return ok(undefined); + } catch (error: any) { + log.error(error, 'Failed to connect to addr'); + continue; + } + } + } else { + return err(new HubError('unavailable', { message: 'no peer id' })); + } + return err(new HubError('unavailable', { message: 'cannot connect to any peer' })); + } + + /** + * Connect with a peer's NodeAddress + * + * @param address The NodeAddress to attempt a connection with + */ + async connectAddress(address: Multiaddr): Promise> { + log.debug({ identity: this.identity, address }, `Attempting to connect to address ${address}`); + try { + const result = await this._node?.dial(address); + if (result) { + log.info({ identity: this.identity, address }, `Connected to peer at address: ${address}`); + return ok(undefined); + } + } catch (error: any) { + log.error(error, `Failed to connect to peer at address: ${address}`); + return err(new HubError('unavailable', error)); + } + return err(new HubError('unavailable', { message: `cannot connect to peer: ${address}` })); + } + + registerListeners() { + this._node?.connectionManager.addEventListener('peer:connect', (event) => { + this.emit('peerConnect', event.detail); + }); + this._node?.connectionManager.addEventListener('peer:disconnect', (event) => { + this.emit('peerDisconnect', event.detail); + }); + this.gossip?.addEventListener('message', (event) => { + // ignore messages that aren't in our list of topics (ignores gossipsub peer discovery messages) + if (GOSSIP_TOPICS.includes(event.detail.topic)) { + this.emit('message', event.detail.topic, GossipNode.decodeMessage(event.detail.data)); + } + }); + } + + registerDebugListeners() { + // Debug + this._node?.addEventListener('peer:discovery', (event) => { + log.info({ identity: this.identity }, `Found peer: ${event.detail.multiaddrs} }`); + }); + this._node?.connectionManager.addEventListener('peer:connect', (event) => { + log.info({ identity: this.identity }, `Connection established to: ${event.detail.remotePeer.toString()}`); + }); + this._node?.connectionManager.addEventListener('peer:disconnect', (event) => { + log.info({ identity: this.identity }, `Disconnected from: ${event.detail.remotePeer.toString()} `); + }); + this.gossip?.addEventListener('message', (event) => { + log.info({ identity: this.identity }, `Received message for topic: ${event.detail.topic}`); + }); + this.gossip?.addEventListener('subscription-change', (event) => { + log.info( + { identity: this.identity }, + `Subscription change: ${event.detail.subscriptions.map((value) => { + value.topic; + })}` + ); + }); + } + + /* -------------------------------------------------------------------------- */ + /* Private Methods */ + /* -------------------------------------------------------------------------- */ + + //TODO: Needs better typesafety + static encodeMessage(message: protobufs.GossipMessage): HubResult { + // Serialize the message + return ok(protobufs.GossipMessage.encode(message).finish()); + } + + //TODO: Needs better typesafety + static decodeMessage(message: Uint8Array): HubResult { + // Deserialize the message + return ok(protobufs.GossipMessage.decode(message)); + } + + /* Attempts to dial all the addresses in the bootstrap list */ + private async bootstrap(bootstrapAddrs: Multiaddr[]): Promise> { + if (bootstrapAddrs.length == 0) return ok(undefined); + const results = await Promise.all(bootstrapAddrs.map((addr) => this.connectAddress(addr))); + + const finalResults = Result.combineWithAllErrors(results) as Result; + if (finalResults.isErr() && finalResults.error.length == bootstrapAddrs.length) { + // only fail if all connections failed + return err(new HubError('unavailable', 'could not connect to any bootstrap nodes')); + } + + return ok(undefined); + } + + /** + * Creates a Libp2p node with GossipSub + */ + private async createNode(options: NodeOptions): Promise> { + const listenIPMultiAddr = options.ipMultiAddr ?? MultiaddrLocalHost; + const listenPort = options.gossipPort ?? 0; + const listenMultiAddrStr = `${listenIPMultiAddr}/tcp/${listenPort}`; + + let announceMultiAddrStrList: string[] = []; + if (options.announceIp && options.gossipPort) { + const announceMultiAddr = addressInfoFromParts(options.announceIp, options.gossipPort).map((addressInfo) => + ipMultiAddrStrFromAddressInfo(addressInfo) + ); + if (announceMultiAddr.isOk() && announceMultiAddr.value.isOk()) { + // If we have a valid announce IP, use it + const announceIpMultiaddr = announceMultiAddr.value.value; + announceMultiAddrStrList = [`${announceIpMultiaddr}/tcp/${options.gossipPort}`]; + } + } + + const checkResult = checkNodeAddrs(listenIPMultiAddr, listenMultiAddrStr); + if (checkResult.isErr()) return err(new HubError('unavailable', checkResult.error)); + + const gossip = new GossipSub({ + emitSelf: false, + allowPublishToZeroPeers: true, + globalSignaturePolicy: 'StrictSign', + }); + + if (options.allowedPeerIdStrs) { + log.info( + { identity: this.identity, function: 'createNode', allowedPeerIds: options.allowedPeerIdStrs }, + `!!! PEER-ID RESTRICTIONS ENABLED !!!` + ); + } + const connectionGater = new ConnectionFilter(options.allowedPeerIdStrs); + + return ResultAsync.fromPromise( + createLibp2p({ + // setting these optional fields to `undefined` throws an error, only set them if they're defined + ...(options.peerId && { peerId: options.peerId }), + connectionGater, + addresses: { + listen: [listenMultiAddrStr], + announce: announceMultiAddrStrList, + }, + transports: [new TCP()], + streamMuxers: [new Mplex()], + connectionEncryption: [new Noise()], + pubsub: gossip, + peerDiscovery: [new PubSubPeerDiscovery()], + }), + (e) => new HubError('unavailable', { message: 'failed to create libp2p node', cause: e as Error }) + ); + } +} diff --git a/apps/protohub/src/network/p2p/protocol.ts b/apps/protohub/src/network/p2p/protocol.ts new file mode 100644 index 0000000000..ee2f70a0a5 --- /dev/null +++ b/apps/protohub/src/network/p2p/protocol.ts @@ -0,0 +1,12 @@ +import { GossipVersion } from '@farcaster/protobufs'; + +// Network topic for all FC protocol messages +export const NETWORK_TOPIC_PRIMARY = 'f_network_topic_primary'; +// Network topic for node contact info messages +export const NETWORK_TOPIC_CONTACT = 'f_network_topic_contact'; +// The rate at which nodes republish their contact info +export const GOSSIP_CONTACT_INTERVAL = 10_000; +// A list of all gossip topics in use by our protocol +export const GOSSIP_TOPICS = [NETWORK_TOPIC_CONTACT, NETWORK_TOPIC_PRIMARY]; +// The current gossip protocol version +export const GOSSIP_PROTOCOL_VERSION = GossipVersion.GOSSIP_VERSION_1; diff --git a/apps/protohub/src/network/sync/merkleTrie.test.ts b/apps/protohub/src/network/sync/merkleTrie.test.ts new file mode 100644 index 0000000000..aa20d522a9 --- /dev/null +++ b/apps/protohub/src/network/sync/merkleTrie.test.ts @@ -0,0 +1,305 @@ +import { blake3 } from '@noble/hashes/blake3'; +import { MerkleTrie } from '~/network/sync/merkleTrie'; +import { NetworkFactories } from '~/network/utils/factories'; +import { EMPTY_HASH } from './trieNode'; + +describe('MerkleTrie', () => { + const trieWithIds = async (timestamps: number[]) => { + const syncIds = await Promise.all( + timestamps.map(async (t) => { + return await NetworkFactories.SyncId.create(undefined, { transient: { date: new Date(t * 1000) } }); + }) + ); + const trie = new MerkleTrie(); + syncIds.forEach((id) => trie.insert(id)); + return trie; + }; + + describe('insert', () => { + test('succeeds inserting a single item', async () => { + const trie = new MerkleTrie(); + const syncId = await NetworkFactories.SyncId.create(); + + expect(trie.items).toEqual(0); + expect(trie.rootHash).toEqual(''); + + trie.insert(syncId); + + expect(trie.items).toEqual(1); + expect(trie.rootHash).toBeTruthy(); + }); + + test('inserts are idempotent', async () => { + const syncId1 = await NetworkFactories.SyncId.create(); + const syncId2 = await NetworkFactories.SyncId.create(); + + const firstTrie = new MerkleTrie(); + firstTrie.insert(syncId1); + firstTrie.insert(syncId2); + + const secondTrie = new MerkleTrie(); + secondTrie.insert(syncId2); + secondTrie.insert(syncId1); + + // Order does not matter + expect(firstTrie.rootHash).toEqual(secondTrie.rootHash); + expect(firstTrie.items).toEqual(secondTrie.items); + expect(firstTrie.rootHash).toBeTruthy(); + + firstTrie.insert(syncId2); + secondTrie.insert(syncId1); + + // Re-adding same item does not change the hash + expect(firstTrie.rootHash).toEqual(secondTrie.rootHash); + expect(firstTrie.items).toEqual(secondTrie.items); + expect(firstTrie.items).toEqual(2); + }); + + test('insert multiple items out of order results in the same root hash', async () => { + const syncIds = await NetworkFactories.SyncId.createList(25); + + const firstTrie = new MerkleTrie(); + const secondTrie = new MerkleTrie(); + + syncIds.forEach((syncId) => firstTrie.insert(syncId)); + const shuffledIds = syncIds.sort(() => 0.5 - Math.random()); + shuffledIds.forEach((syncId) => secondTrie.insert(syncId)); + + expect(firstTrie.rootHash).toEqual(secondTrie.rootHash); + expect(firstTrie.rootHash).toBeTruthy(); + expect(firstTrie.items).toEqual(secondTrie.items); + expect(firstTrie.items).toEqual(25); + }); + }); + + describe('delete', () => { + test('deletes an item', async () => { + const syncId = await NetworkFactories.SyncId.create(); + + const trie = new MerkleTrie(); + trie.insert(syncId); + expect(trie.items).toEqual(1); + expect(trie.rootHash).toBeTruthy(); + // eslint-disable-next-line security/detect-non-literal-fs-filename + expect(trie.exists(syncId)).toBeTruthy(); + + trie.delete(syncId); + expect(trie.items).toEqual(0); + expect(trie.rootHash).toEqual(EMPTY_HASH); + // eslint-disable-next-line security/detect-non-literal-fs-filename + expect(trie.exists(syncId)).toBeFalsy(); + }); + + test('deleting an item that does not exist does not change the trie', async () => { + const syncId = await NetworkFactories.SyncId.create(); + const trie = new MerkleTrie(); + trie.insert(syncId); + + const rootHashBeforeDelete = trie.rootHash; + const syncId2 = await NetworkFactories.SyncId.create(); + trie.delete(syncId2); + + const rootHashAfterDelete = trie.rootHash; + expect(rootHashAfterDelete).toEqual(rootHashBeforeDelete); + expect(trie.items).toEqual(1); + }); + + test('delete is an exact inverse of insert', async () => { + const syncId1 = await NetworkFactories.SyncId.create(); + const syncId2 = await NetworkFactories.SyncId.create(); + + const trie = new MerkleTrie(); + trie.insert(syncId1); + const rootHashBeforeDelete = trie.rootHash; + trie.insert(syncId2); + + trie.delete(syncId2); + expect(trie.rootHash).toEqual(rootHashBeforeDelete); + }); + + test('trie with a deleted item is the same as a trie with the item never added', async () => { + const syncId1 = await NetworkFactories.SyncId.create(); + const syncId2 = await NetworkFactories.SyncId.create(); + + const firstTrie = new MerkleTrie(); + firstTrie.insert(syncId1); + firstTrie.insert(syncId2); + + firstTrie.delete(syncId1); + + const secondTrie = new MerkleTrie(); + secondTrie.insert(syncId2); + + expect(firstTrie.rootHash).toEqual(secondTrie.rootHash); + expect(firstTrie.rootHash).toBeTruthy(); + expect(firstTrie.items).toEqual(secondTrie.items); + expect(firstTrie.items).toEqual(1); + }); + }); + + test('succeeds with single item', async () => { + const trie = new MerkleTrie(); + const syncId = await NetworkFactories.SyncId.create(); + + // eslint-disable-next-line security/detect-non-literal-fs-filename + expect(trie.exists(syncId)).toBeFalsy(); + + trie.insert(syncId); + + // eslint-disable-next-line security/detect-non-literal-fs-filename + expect(trie.exists(syncId)).toBeTruthy(); + + const nonExistingSyncId = await NetworkFactories.SyncId.create(); + // eslint-disable-next-line security/detect-non-literal-fs-filename + expect(trie.exists(nonExistingSyncId)).toBeFalsy(); + }); + + test('value is always undefined for non-leaf nodes', async () => { + const trie = new MerkleTrie(); + const syncId = await NetworkFactories.SyncId.create(); + + trie.insert(syncId); + + expect(trie.root.value).toBeFalsy(); + }); + + describe('getNodeMetadata', () => { + test('returns undefined if prefix is not present', async () => { + const syncId = await NetworkFactories.SyncId.create(undefined, { transient: { date: new Date(1665182332000) } }); + const trie = new MerkleTrie(); + trie.insert(syncId); + + expect(trie.getTrieNodeMetadata('166518234')).toBeUndefined(); + }); + + test('returns the root metadata if the prefix is empty', async () => { + const syncId = await NetworkFactories.SyncId.create(undefined, { transient: { date: new Date(1665182332000) } }); + const trie = new MerkleTrie(); + trie.insert(syncId); + + const nodeMetadata = trie.getTrieNodeMetadata(''); + expect(nodeMetadata).toBeDefined(); + expect(nodeMetadata?.numMessages).toEqual(1); + expect(nodeMetadata?.prefix).toEqual(''); + expect(nodeMetadata?.children?.size).toEqual(1); + expect(nodeMetadata?.children?.get('1')).toBeDefined(); + }); + + test('returns the correct metadata if prefix is present', async () => { + const trie = await trieWithIds([1665182332, 1665182343]); + const nodeMetadata = trie.getTrieNodeMetadata('16651823'); + + expect(nodeMetadata).toBeDefined(); + expect(nodeMetadata?.numMessages).toEqual(2); + expect(nodeMetadata?.prefix).toEqual('16651823'); + expect(nodeMetadata?.children?.size).toEqual(2); + expect(nodeMetadata?.children?.get('3')).toBeDefined(); + expect(nodeMetadata?.children?.get('4')).toBeDefined(); + }); + }); + + describe('getSnapshot', () => { + test('returns basic information', async () => { + const trie = await trieWithIds([1665182332, 1665182343]); + + const snapshot = trie.getSnapshot('1665182343'); + expect(snapshot.prefix).toEqual('1665182343'); + expect(snapshot.numMessages).toEqual(1); + expect(snapshot.excludedHashes.length).toEqual('1665182343'.length); + }); + + test('returns early when prefix is only partially present', async () => { + const trie = await trieWithIds([1665182332, 1665182343]); + + const snapshot = trie.getSnapshot('1677123'); + expect(snapshot.prefix).toEqual('167'); + expect(snapshot.numMessages).toEqual(2); + expect(snapshot.excludedHashes.length).toEqual('167'.length); + }); + + test('excluded hashes excludes the prefix char at every level', async () => { + const trie = await trieWithIds([1665182332, 1665182343, 1665182345, 1665182351]); + let snapshot = trie.getSnapshot('1665182351'); + let node = trie.getTrieNodeMetadata('16651823'); + // We expect the excluded hash to be the hash of the 3 and 4 child nodes, and excludes the 5 child node + const expectedHash = Buffer.from( + blake3 + .create({ dkLen: 20 }) + .update(node?.children?.get('3')?.hash || '') + .update(node?.children?.get('4')?.hash || '') + .digest() + ).toString('hex'); + expect(snapshot.excludedHashes).toEqual([ + EMPTY_HASH, // 1, these are empty because there are no other children at this level + EMPTY_HASH, // 6 + EMPTY_HASH, // 6 + EMPTY_HASH, // 5 + EMPTY_HASH, // 1 + EMPTY_HASH, // 8 + EMPTY_HASH, // 2 + EMPTY_HASH, // 3 + expectedHash, // 5 (hash of the 3 and 4 child node hashes) + EMPTY_HASH, // 1 + ]); + + snapshot = trie.getSnapshot('1665182343'); + node = trie.getTrieNodeMetadata('166518234'); + const expectedLastHash = Buffer.from(blake3(node?.children?.get('5')?.hash || '', { dkLen: 20 })).toString('hex'); + node = trie.getTrieNodeMetadata('16651823'); + const expectedPenultimateHash = Buffer.from( + blake3 + .create({ dkLen: 20 }) + .update(node?.children?.get('3')?.hash || '') + .update(node?.children?.get('5')?.hash || '') + .digest() + ).toString('hex'); + expect(snapshot.excludedHashes).toEqual([ + EMPTY_HASH, // 1 + EMPTY_HASH, // 6 + EMPTY_HASH, // 6 + EMPTY_HASH, // 5 + EMPTY_HASH, // 1 + EMPTY_HASH, // 8 + EMPTY_HASH, // 2 + EMPTY_HASH, // 3 + expectedPenultimateHash, // 4 (hash of the 3 and 5 child node hashes) + expectedLastHash, // 3 (hash of the 5 child node hash) + ]); + }); + }); + + test('getAllValues returns all values for child nodes', async () => { + const trie = await trieWithIds([1665182332, 1665182343, 1665182345]); + + let values = trie.root.getNode('16651823')?.getAllValues(); + expect(values?.length).toEqual(3); + values = trie.root.getNode('166518233')?.getAllValues(); + expect(values?.length).toEqual(1); + }); + + describe('getDivergencePrefix', () => { + test('returns the prefix with the most common excluded hashes', async () => { + const trie = await trieWithIds([1665182332, 1665182343, 1665182345]); + const prefixToTest = '1665182343'; + const oldSnapshot = trie.getSnapshot(prefixToTest); + trie.insert(await NetworkFactories.SyncId.create(undefined, { transient: { date: new Date(1665182353000) } })); + + // Since message above was added at 1665182353, the two tries diverged at 16651823 for our prefix + let divergencePrefix = trie.getDivergencePrefix(prefixToTest, oldSnapshot.excludedHashes); + expect(divergencePrefix).toEqual('16651823'); + + // divergence prefix should be the full prefix, if snapshots are the same + const currentSnapshot = trie.getSnapshot(prefixToTest); + divergencePrefix = trie.getDivergencePrefix(prefixToTest, currentSnapshot.excludedHashes); + expect(divergencePrefix).toEqual(prefixToTest); + + // divergence prefix should empty if excluded hashes are empty + divergencePrefix = trie.getDivergencePrefix(prefixToTest, []); + expect(divergencePrefix).toEqual(''); + + // divergence prefix should be our prefix if provided hashes are longer + divergencePrefix = trie.getDivergencePrefix(prefixToTest + '5', [...currentSnapshot.excludedHashes, 'different']); + expect(divergencePrefix).toEqual(prefixToTest); + }); + }); +}); diff --git a/apps/protohub/src/network/sync/merkleTrie.ts b/apps/protohub/src/network/sync/merkleTrie.ts new file mode 100644 index 0000000000..3a7a644e46 --- /dev/null +++ b/apps/protohub/src/network/sync/merkleTrie.ts @@ -0,0 +1,108 @@ +import { SyncId } from '~/network/sync/syncId'; +import { TrieNode, TrieSnapshot } from '~/network/sync/trieNode'; + +/** + * Represents a node in the trie, and it's immediate children + * + * @prefix - The prefix of the node, uniquely describes its position in the trie + * @numMessages - The number of messages under this node + * @hash - The merkle hash of the node + * @children - The immediate children of this node + */ +export type NodeMetadata = { + prefix: string; + numMessages: number; + hash: string; + children?: Map; +}; + +/** + * Represents a MerkleTrie. It's conceptually very similar to a Merkle Patricia Tree (see + * https://ethereum.org/en/developers/docs/data-structures-and-encoding/patricia-merkle-trie/). + * We don't have extension nodes currently, so this is essentially a Merkle Radix Trie as + * defined in the link above. + * + * The first 10 levels of the trie are used to represent the timestamp of the messages (see SyncID). + * The "prefix" refers to a subset of the timestamp string. This is used to determine the state of the trie + * (and therefore the hub's messages) at a particular point in time. + * + * Comparing the state of two tries (represented by the snapshot) for the same prefix allows us to determine + * whether two hubs are in sync, and the earliest point of divergence if not. + * + * Note about concurrency: This class and TrieNode are not thread-safe. This is fine because there are no async + * methods, which means the operations won't be interrupted. DO NOT add async methods without considering + * impact on concurrency-safety. + */ +class MerkleTrie { + private readonly _root: TrieNode; + + constructor() { + this._root = new TrieNode(); + } + + public insert(id: SyncId): boolean { + // TODO(aditya): We should insert Uint8Array instead of string + return this._root.insert(id.idString()); + } + + public delete(id: SyncId): boolean { + return this._root.delete(id.idString()); + } + + public exists(id: SyncId): boolean { + // NOTE: eslint falsely identifies as `fs.exists`. + // eslint-disable-next-line security/detect-non-literal-fs-filename + return this._root.exists(id.idString()); + } + + // A snapshot captures the state of the trie excluding the nodes + // specified in the prefix. See TrieSnapshot for more + public getSnapshot(prefix: string): TrieSnapshot { + return this._root.getSnapshot(prefix); + } + + // Compares excluded hashes of another trie with this trie to determine at which prefix the + // states differ. Returns the subset of prefix that's common to both tries. + public getDivergencePrefix(prefix: string, excludedHashes: string[]): string { + const ourExcludedHashes = this.getSnapshot(prefix).excludedHashes; + for (let i = 0; i < prefix.length; i++) { + // NOTE: `i` is controlled by for loop and hence not at risk of object injection. + // eslint-disable-next-line security/detect-object-injection + if (ourExcludedHashes[i] !== excludedHashes[i]) { + return prefix.slice(0, i); + } + } + return prefix; + } + + public getTrieNodeMetadata(prefix: string): NodeMetadata | undefined { + const node = this._root.getNode(prefix); + if (node === undefined) { + return undefined; + } + const children = node?.children || new Map(); + const result = new Map(); + for (const [char, child] of children) { + result.set(char, { + numMessages: child.items, + prefix: prefix + char, + hash: child.hash, + }); + } + return { prefix, children: result, numMessages: node.items, hash: node.hash }; + } + + public get root(): TrieNode { + return this._root; + } + + public get items(): number { + return this._root.items; + } + + public get rootHash(): string { + return this._root.hash; + } +} + +export { MerkleTrie }; diff --git a/apps/protohub/src/network/sync/syncEngine.test.ts b/apps/protohub/src/network/sync/syncEngine.test.ts new file mode 100644 index 0000000000..b3b9a14c61 --- /dev/null +++ b/apps/protohub/src/network/sync/syncEngine.test.ts @@ -0,0 +1,244 @@ +import * as protobufs from '@farcaster/protobufs'; +import { FarcasterNetwork } from '@farcaster/protobufs'; +import { Factories, getFarcasterTime } from '@farcaster/protoutils'; +import { anything, instance, mock, when } from 'ts-mockito'; +import SyncEngine from '~/network/sync/syncEngine'; +import { SyncId } from '~/network/sync/syncId'; +import { jestRocksDB } from '~/storage/db/jestUtils'; +import Engine from '~/storage/engine'; + +const testDb = jestRocksDB(`engine.syncEngine.test`); +const testDb2 = jestRocksDB(`engine2.syncEngine.test`); + +const network = protobufs.FarcasterNetwork.FARCASTER_NETWORK_TESTNET; + +const fid = Factories.Fid.build(); +const custodySigner = Factories.Eip712Signer.build(); +const signer = Factories.Ed25519Signer.build(); + +let custodyEvent: protobufs.IdRegistryEvent; +let signerAdd: protobufs.Message; +let castAdd: protobufs.Message; + +beforeAll(async () => { + custodyEvent = Factories.IdRegistryEvent.build({ fid, to: custodySigner.signerKey }); + + signerAdd = await Factories.SignerAddMessage.create( + { data: { fid, network, signerBody: { signer: signer.signerKey } } }, + { transient: { signer: custodySigner } } + ); + + castAdd = await Factories.CastAddMessage.create({ data: { fid, network } }, { transient: { signer } }); +}); + +describe('SyncEngine', () => { + let syncEngine: SyncEngine; + let engine: Engine; + + beforeEach(async () => { + await testDb.clear(); + engine = new Engine(testDb, FarcasterNetwork.FARCASTER_NETWORK_TESTNET); + syncEngine = new SyncEngine(engine); + }); + + const addMessagesWithTimestamps = async (timestamps: number[]) => { + return await Promise.all( + timestamps.map(async (t) => { + const cast = await Factories.CastAddMessage.create( + { data: { fid, network, timestamp: t } }, + { transient: { signer } } + ); + + const result = await engine.mergeMessage(cast); + expect(result.isOk()).toBeTruthy(); + return Promise.resolve(cast); + }) + ); + }; + + test('trie is updated on successful merge', async () => { + const existingItems = syncEngine.trie.items; + + const rcustody = await engine.mergeIdRegistryEvent(custodyEvent); + expect(rcustody.isOk()).toBeTruthy(); + + const rsigneradd = await engine.mergeMessage(signerAdd); + expect(rsigneradd.isOk()).toBeTruthy(); + + const result = await engine.mergeMessage(castAdd); + expect(result.isOk()).toBeTruthy(); + + // Two messages (signerAdd + castAdd) was added to the trie + expect(syncEngine.trie.items - existingItems).toEqual(2); + // eslint-disable-next-line security/detect-non-literal-fs-filename + expect(syncEngine.trie.exists(new SyncId(castAdd))).toBeTruthy(); + }); + + test('trie is not updated on merge failure', async () => { + expect(syncEngine.trie.items).toEqual(0); + + // Merging a message without the custody event should fail + const result = await engine.mergeMessage(castAdd); + + expect(result.isErr()).toBeTruthy(); + expect(syncEngine.trie.items).toEqual(0); + // eslint-disable-next-line security/detect-non-literal-fs-filename + expect(syncEngine.trie.exists(new SyncId(castAdd))).toBeFalsy(); + }); + + test( + 'trie is updated when a message is removed', + async () => { + await engine.mergeIdRegistryEvent(custodyEvent); + await engine.mergeMessage(signerAdd); + let result = await engine.mergeMessage(castAdd); + expect(result.isOk()).toBeTruthy(); + + // Remove this cast. + const castRemove = await Factories.CastRemoveMessage.create( + { data: { fid, network, castRemoveBody: { targetHash: castAdd.hash } } }, + { transient: { signer } } + ); + + // Merging the cast remove deletes the cast add in the db, and it should be reflected in the trie + result = await engine.mergeMessage(castRemove); + expect(result.isOk()).toBeTruthy(); + + const id = new SyncId(castRemove); + // eslint-disable-next-line security/detect-non-literal-fs-filename + expect(syncEngine.trie.exists(id)).toBeTruthy(); + + // const allMessages = await engine.getAllMessagesBySyncIds([id.idString()]); + // expect(allMessages.isOk()).toBeTruthy(); + // expect(allMessages._unsafeUnwrap()[0]?.type()).toEqual(MessageType.MESSAGE_TYPE_CAST_REMOVE); + + // The trie should contain the message remove + // eslint-disable-next-line security/detect-non-literal-fs-filename + expect(syncEngine.trie.exists(id)).toBeTruthy(); + + // The trie should not contain the castAdd anymore + // eslint-disable-next-line security/detect-non-literal-fs-filename + expect(syncEngine.trie.exists(new SyncId(castAdd))).toBeFalsy(); + }, + 100 * 60 * 1000 + ); + + test( + 'trie is updated when message with higher order is merged', + async () => { + const rcustody = await engine.mergeIdRegistryEvent(custodyEvent); + expect(rcustody.isOk()).toBeTruthy(); + + const rsigneradd = await engine.mergeMessage(signerAdd); + expect(rsigneradd.isOk()).toBeTruthy(); + + // Reaction + const reactionBody = { targetCastId: { fid, hash: castAdd.hash } }; + const reaction1 = await Factories.ReactionAddMessage.create( + { data: { fid, network, timestamp: 30662167, reactionBody } }, + { transient: { signer } } + ); + + // Same reaction, but with different timestamp + const reaction2 = await Factories.ReactionAddMessage.create( + { data: { fid, network, timestamp: 30662168, reactionBody } }, + { transient: { signer } } + ); + + // Merging the first reaction should succeed + let result = await engine.mergeMessage(reaction1); + expect(result.isOk()).toBeTruthy(); + expect(syncEngine.trie.items).toEqual(2); // signerAdd + reaction1 + + // Then merging the second reaction should also succeed and remove reaction1 + result = await engine.mergeMessage(reaction2); + expect(result.isOk()).toBeTruthy(); + expect(syncEngine.trie.items).toEqual(2); // signerAdd + reaction2 (reaction1 is removed) + + // Create a new engine and sync engine + testDb2.clear(); + const engine2 = new Engine(testDb2, FarcasterNetwork.FARCASTER_NETWORK_TESTNET); + const syncEngine2 = new SyncEngine(engine2); + await engine2.mergeIdRegistryEvent(custodyEvent); + await engine2.mergeMessage(signerAdd); + + // Only merge reaction2 + result = await engine2.mergeMessage(reaction2); + expect(result.isOk()).toBeTruthy(); + expect(syncEngine2.trie.items).toEqual(2); // signerAdd + reaction2 + + // Roothashes must match + expect(syncEngine2.trie.rootHash).toEqual(syncEngine.trie.rootHash); + }, + 100 * 60 * 1000 + ); + + test('snapshotTimestampPrefix trims the seconds', async () => { + const nowInSeconds = getFarcasterTime()._unsafeUnwrap(); + const snapshotTimestamp = syncEngine.snapshotTimestamp._unsafeUnwrap(); + expect(snapshotTimestamp).toBeLessThanOrEqual(nowInSeconds); + expect(snapshotTimestamp).toEqual(Math.floor(nowInSeconds / 10) * 10); + }); + + test('shouldSync returns false when already syncing', async () => { + const mockRPCClient = mock(protobufs.SyncServiceClient); + const rpcClient = instance(mockRPCClient); + let called = false; + when(mockRPCClient.getSyncMetadataByPrefix(anything(), anything())).thenCall((_a, callback) => { + expect(syncEngine.shouldSync([])._unsafeUnwrap()).toBeFalsy(); + called = true; + + // Return an empty child map so sync will finish with a noop + const emptyMetadata = protobufs.TrieNodeMetadataResponse.create({ + prefix: new Uint8Array(), + numMessages: 1000, + hash: new Uint8Array(), + children: [], + }); + callback(null, emptyMetadata); + }); + await syncEngine.performSync(['some-divergence'], rpcClient); + expect(called).toBeTruthy(); + }); + + test('shouldSync returns false when excludedHashes match', async () => { + await engine.mergeIdRegistryEvent(custodyEvent); + await engine.mergeMessage(signerAdd); + + await addMessagesWithTimestamps([30662167, 30662169, 30662172]); + expect(syncEngine.shouldSync(syncEngine.snapshot._unsafeUnwrap().excludedHashes)._unsafeUnwrap()).toBeFalsy(); + }); + + test('shouldSync returns true when hashes dont match', async () => { + await engine.mergeIdRegistryEvent(custodyEvent); + await engine.mergeMessage(signerAdd); + + await addMessagesWithTimestamps([30662167, 30662169, 30662172]); + const oldSnapshot = syncEngine.snapshot._unsafeUnwrap(); + await addMessagesWithTimestamps([30662372]); + expect(oldSnapshot.excludedHashes).not.toEqual(syncEngine.snapshot._unsafeUnwrap().excludedHashes); + expect(syncEngine.shouldSync(oldSnapshot.excludedHashes)._unsafeUnwrap()).toBeTruthy(); + }); + + test('initialize populates the trie with all existing messages', async () => { + await engine.mergeIdRegistryEvent(custodyEvent); + await engine.mergeMessage(signerAdd); + + const messages = await addMessagesWithTimestamps([30662167, 30662169, 30662172]); + + const syncEngine = new SyncEngine(engine); + expect(syncEngine.trie.items).toEqual(0); + + await syncEngine.initialize(); + + // There might be more messages related to user creation, but it's sufficient to check for casts + expect(syncEngine.trie.items).toBeGreaterThanOrEqual(3); + expect(syncEngine.trie.rootHash).toBeTruthy(); + // eslint-disable-next-line security/detect-non-literal-fs-filename + expect(syncEngine.trie.exists(new SyncId(messages[0] as protobufs.Message))).toBeTruthy(); + // eslint-disable-next-line security/detect-non-literal-fs-filename + expect(syncEngine.trie.exists(new SyncId(messages[1] as protobufs.Message))).toBeTruthy(); + // eslint-disable-next-line security/detect-non-literal-fs-filename + expect(syncEngine.trie.exists(new SyncId(messages[2] as protobufs.Message))).toBeTruthy(); + }); +}); diff --git a/apps/protohub/src/network/sync/syncEngine.ts b/apps/protohub/src/network/sync/syncEngine.ts new file mode 100644 index 0000000000..1d7fa9a7ef --- /dev/null +++ b/apps/protohub/src/network/sync/syncEngine.ts @@ -0,0 +1,338 @@ +import * as protobufs from '@farcaster/protobufs'; +import { bytesToUtf8String, getFarcasterTime, hexStringToBytes, HubError, HubResult } from '@farcaster/utils'; +import { err, ok } from 'neverthrow'; +import { MerkleTrie, NodeMetadata } from '~/network/sync/merkleTrie'; +import { SyncId, timestampToPaddedTimestampPrefix } from '~/network/sync/syncId'; +import { TrieSnapshot } from '~/network/sync/trieNode'; +import Engine from '~/storage/engine'; +import { logger } from '~/utils/logger'; + +// Number of seconds to wait for the network to "settle" before syncing. We will only +// attempt to sync messages that are older than this time. +const SYNC_THRESHOLD_IN_SECONDS = 10; +const HASHES_PER_FETCH = 50; + +const log = logger.child({ + component: 'SyncEngine', +}); + +class SyncEngine { + private readonly _trie: MerkleTrie; + private readonly engine: Engine; + private _isSyncing = false; + + constructor(engine: Engine) { + this._trie = new MerkleTrie(); + this.engine = engine; + + this.engine.eventHandler.on( + 'mergeMessage', + async (message: protobufs.Message, deletedMessages?: protobufs.Message[]) => { + this.addMessage(message); + + for (const deletedMessage of deletedMessages ?? []) { + this.removeMessage(deletedMessage); + } + } + ); + + // Note: There's no guarantee that the message is actually deleted, because the transaction could fail. + // This is fine, because we'll just end up syncing the message again. It's much worse to miss a removal and cause + // the trie to diverge in a way that's not recoverable without reconstructing it from the db. + // Order of events does not matter. The trie will always converge to the same state. + this.engine.eventHandler.on('pruneMessage', async (message) => { + this.removeMessage(message); + }); + this.engine.eventHandler.on('revokeMessage', async (message) => { + this.removeMessage(message); + }); + } + + public async initialize() { + // TODO: cache the trie to disk, and use this only when the cache doesn't exist + let processedMessages = 0; + await this.engine.forEachMessage((message) => { + this.addMessage(message); + processedMessages += 1; + if (processedMessages % 10_000 === 0) { + log.info({ processedMessages }, 'Initializing sync engine'); + } + }); + log.info({ processedMessages }, 'Sync engine initialized'); + } + + public isSyncing(): boolean { + return this._isSyncing; + } + + /** ---------------------------------------------------------------------------------- */ + /** Sync Methods */ + /** ---------------------------------------------------------------------------------- */ + + public shouldSync(excludedHashes: string[]): HubResult { + if (this._isSyncing) { + log.debug('shouldSync: already syncing'); + return ok(false); + } + + return this.snapshot.map((ourSnapshot) => { + const excludedHashesMatch = + ourSnapshot.excludedHashes.length === excludedHashes.length && + // NOTE: `index` is controlled by `every` and so not at risk of object injection. + // eslint-disable-next-line security/detect-object-injection + ourSnapshot.excludedHashes.every((value, index) => value === excludedHashes[index]); + + log.debug(`shouldSync: excluded hashes check: ${excludedHashes}`); + return !excludedHashesMatch; + }); + } + + async performSync(excludedHashes: string[], rpcClient: protobufs.SyncServiceClient) { + try { + this._isSyncing = true; + await this.snapshot + .asyncMap(async (ourSnapshot) => { + const divergencePrefix = this._trie.getDivergencePrefix(ourSnapshot.prefix, excludedHashes); + log.info({ divergencePrefix, prefix: ourSnapshot.prefix }, 'Divergence prefix'); + const missingIds = await this.fetchMissingHashesByPrefix(divergencePrefix, rpcClient); + log.info({ missingCount: missingIds.length }, 'Fetched missing hashes'); + + // TODO: fetch messages in batches + await this.fetchAndMergeMessages(missingIds, rpcClient); + log.info(`Sync complete`); + }) + .mapErr((error) => { + log.warn(error, `Error performing sync`); + }); + } catch (e) { + log.warn(e, `Error performing sync`); + } finally { + this._isSyncing = false; + } + } + + public async fetchAndMergeMessages(syncIds: string[], rpcClient: protobufs.SyncServiceClient): Promise { + let result = true; + if (syncIds.length === 0) { + return false; + } + + return new Promise((resolve) => { + const messagesStream = rpcClient.getAllMessagesBySyncIds(protobufs.SyncIds.create({ syncIds })); + messagesStream.on('data', async (msg: protobufs.Message) => { + await this.mergeMessages([msg], rpcClient); + }); + messagesStream.on('error', (err) => { + log.warn(err, `Error fetching messages for sync`); + result = false; + }); + messagesStream.on('end', () => { + resolve(result); + }); + }); + } + + public async mergeMessages( + messages: protobufs.Message[], + rpcClient: protobufs.SyncServiceClient + ): Promise[]> { + const mergeResults: HubResult[] = []; + // First, sort the messages by timestamp to reduce thrashing and refetching + messages.sort((a, b) => (a.data?.timestamp || 0) - (b.data?.timestamp || 0)); + + // Merge messages sequentially, so we can handle missing users. + // TODO: Optimize by collecting all failures and retrying them in a batch + for (const msg of messages) { + const result = await this.engine.mergeMessage(msg); + // Unknown user error + if ( + result.isErr() && + result.error.errCode === 'bad_request.validation_failure' && + (result.error.message === 'invalid signer' || result.error.message.startsWith('unknown fid')) + ) { + log.warn({ fid: msg.data?.fid }, 'Unknown user, fetching custody event'); + const result = await this.syncUserAndRetryMessage(msg, rpcClient); + mergeResults.push(result); + } else { + mergeResults.push(result); + } + } + + log.info( + { messages: mergeResults.length, success: mergeResults.filter((r) => r.isOk()).length }, + 'Merged messages' + ); + + return mergeResults; + } + + async fetchMissingHashesByNode( + theirNode: NodeMetadata, + ourNode: NodeMetadata | undefined, + rpcClient: protobufs.SyncServiceClient + ): Promise { + return new Promise((resolve) => { + const missingHashes: string[] = []; + // If the node has fewer than HASHES_PER_FETCH, just fetch them all in go, otherwise, + // iterate through the node's children and fetch them in batches. + if (theirNode.numMessages <= HASHES_PER_FETCH) { + rpcClient.getAllSyncIdsByPrefix( + protobufs.TrieNodePrefix.create({ prefix: hexStringToBytes(theirNode.prefix)._unsafeUnwrap() }), + (err, syncIds) => { + if (err) { + log.warn(err, `Error fetching ids for prefix ${theirNode.prefix}`); + resolve(missingHashes); + } else { + resolve(syncIds.syncIds); + } + } + ); + } else + (async () => { + if (theirNode.children) { + for (const [theirChildChar, theirChild] of theirNode.children.entries()) { + // recursively fetch hashes for every node where the hashes don't match + if (ourNode?.children?.get(theirChildChar)?.hash !== theirChild.hash) { + missingHashes.push(...(await this.fetchMissingHashesByPrefix(theirChild.prefix, rpcClient))); + } + } + } + resolve(missingHashes); + })(); + }); + } + + async fetchMissingHashesByPrefix(prefix: string, rpcClient: protobufs.SyncServiceClient): Promise { + const ourNode = this._trie.getTrieNodeMetadata(prefix); + + return new Promise((resolve) => { + rpcClient.getSyncMetadataByPrefix( + protobufs.TrieNodePrefix.create({ prefix: hexStringToBytes(prefix)._unsafeUnwrap() }), + async (err, theirNodeMetadata) => { + const missingHashes: string[] = []; + if (err) { + log.warn(err, `Error fetching metadata for prefix ${prefix}`); + } else { + missingHashes.push( + ...(await this.fetchMissingHashesByNode(fromNodeMetadataResponse(theirNodeMetadata), ourNode, rpcClient)) + ); + } + resolve(missingHashes); + } + ); + }); + } + + /** ---------------------------------------------------------------------------------- */ + /** Trie Methods */ + /** ---------------------------------------------------------------------------------- */ + public addMessage(message: protobufs.Message): void { + this._trie.insert(new SyncId(message)); + } + + public removeMessage(message: protobufs.Message): void { + this._trie.delete(new SyncId(message)); + } + + public getTrieNodeMetadata(prefix: string): NodeMetadata | undefined { + return this._trie.getTrieNodeMetadata(prefix); + } + + public getIdsByPrefix(prefix: string): string[] { + return this._trie.root.getNode(prefix)?.getAllValues() ?? []; + } + + public get trie(): MerkleTrie { + return this._trie; + } + + public getSnapshotByPrefix(prefix?: string): HubResult { + if (!prefix || prefix === '') { + return this.snapshot; + } else { + return ok(this._trie.getSnapshot(prefix)); + } + } + + public get snapshot(): HubResult { + return this.snapshotTimestamp.map((snapshotTimestamp) => { + // Ignore the least significant digit when fetching the snapshot timestamp because + // second resolution is too fine grained, and fall outside sync threshold anyway + return this._trie.getSnapshot(timestampToPaddedTimestampPrefix(snapshotTimestamp / 10).toString()); + }); + } + + // Returns the most recent timestamp in seconds that's within the sync threshold + // (i.e. highest timestamp that's < current time and timestamp % sync_threshold == 0) + public get snapshotTimestamp(): HubResult { + return getFarcasterTime().map((farcasterTime) => { + const currentTimeInSeconds = Math.floor(farcasterTime); + return Math.floor(currentTimeInSeconds / SYNC_THRESHOLD_IN_SECONDS) * SYNC_THRESHOLD_IN_SECONDS; + }); + } + + private async syncUserAndRetryMessage( + message: protobufs.Message, + _rpcClient: protobufs.SyncServiceClient + ): Promise> { + const fid = message.data?.fid; + if (!fid) { + return err(new HubError('bad_request.invalid_param', 'Invalid fid')); + } + + // const custodyEventResult = await rpcClient.getIdRegistryEvent(fid); + // if (custodyEventResult.isErr()) { + // return err(new HubError('unavailable.network_failure', 'Failed to fetch custody event')); + // } + // const custodyModel = custodyEventResult.value as protobufs.IdRegistryEvent; + // const custodyResult = await this.engine.mergeIdRegistryEvent(custodyModel); + // if (custodyResult.isErr()) { + // return err(new HubError('unavailable.storage_failure', 'Failed to merge custody event')); + // } + + // // Probably not required to fetch the signer messages, but doing it here means + // // sync will complete in one round (prevents messages failing to merge due to missed or out of order signer message) + // const signerMessagesResult = await rpcClient.getAllSignerMessagesByFid(fid); + // if (signerMessagesResult.isErr()) { + // return err(new HubError('unavailable.network_failure', 'Failed to fetch signer messages')); + // } + // const messageModels = signerMessagesResult.value.map((message) => new MessageModel(message)); + // const results = await this.engine.mergeMessages(messageModels); + // if (results.every((r) => r.isErr())) { + // return err(new HubError('unavailable.storage_failure', 'Failed to merge signer messages')); + // } else { + // // if at least one signer message was merged, retry the original message + // return (await this.engine.mergeMessage(message)).mapErr((e) => { + // log.warn(e, `Failed to merge message type ${message.type()}`); + // return new HubError('unavailable.storage_failure', e); + // }); + // } + return ok(undefined); + } +} + +const fromNodeMetadataResponse = (response: protobufs.TrieNodeMetadataResponse): NodeMetadata => { + const children = new Map(); + for (let i = 0; i < response.children.length; i++) { + const child = response.children[i]; + + const prefix = bytesToUtf8String(child?.prefix ?? new Uint8Array())._unsafeUnwrap(); + // Char is the last char of prefix + const char = prefix[prefix.length - 1] ?? ''; + + children.set(char, { + numMessages: Number(child?.numMessages), + prefix, + hash: bytesToUtf8String(child?.hash ?? new Uint8Array())._unsafeUnwrap(), + }); + } + + return { + prefix: bytesToUtf8String(response.prefix ?? new Uint8Array())._unsafeUnwrap(), + numMessages: Number(response.numMessages), + hash: bytesToUtf8String(response.hash ?? new Uint8Array())._unsafeUnwrap(), + children, + }; +}; + +export default SyncEngine; diff --git a/apps/protohub/src/network/sync/syncId.ts b/apps/protohub/src/network/sync/syncId.ts new file mode 100644 index 0000000000..c8ac212617 --- /dev/null +++ b/apps/protohub/src/network/sync/syncId.ts @@ -0,0 +1,51 @@ +import * as protobufs from '@farcaster/protobufs'; +import { makeUserKey, typeToSetPostfix } from '~/storage/db/message'; + +const TIMESTAMP_LENGTH = 10; // 10 bytes for timestamp in decimal +const HASH_LENGTH = 160; // We're using 20 byte blake2b hashes + +/** + * SyncId allows for a stable, time ordered lexicographic sorting of messages across hubs + * It is a combination of the message's timestamp and hash. This id string is used as the key in + * the MerkleTrie. + */ +class SyncId { + private readonly _fid: number; + private readonly _tsHash: Uint8Array; + private readonly _timestamp: number; + private readonly _type: number; + + constructor(message: protobufs.Message) { + this._fid = message.data?.fid || 0; + this._tsHash = message.hash; + this._timestamp = message.data?.timestamp || 0; + this._type = message.data?.type || 0; + } + + public idString(): string { + // For our MerkleTrie, seconds is a good enough resolution + // We also want to normalize the length to 10 characters, so that the MerkleTrie + // will always have the same depth for any timestamp (even 0). + const timestampString = timestampToPaddedTimestampPrefix(this._timestamp); + + const buf = makeMessagePrimaryKey(this._fid, this._type, this._tsHash); + return timestampString + buf.toString('hex'); + } + + static pkFromIdString(idString: string): Buffer { + // The first 10 bytes are the timestamp, so we skip them + const pk = idString.slice(TIMESTAMP_LENGTH); + + return Buffer.from(pk, 'hex'); + } +} + +const timestampToPaddedTimestampPrefix = (timestamp: number): string => { + return Math.floor(timestamp).toString().padStart(TIMESTAMP_LENGTH, '0'); +}; + +const makeMessagePrimaryKey = (fid: number, type: number, hash: Uint8Array): Buffer => { + return Buffer.concat([makeUserKey(fid), Buffer.from([typeToSetPostfix(type)]), Buffer.from(hash)]); +}; + +export { SyncId, timestampToPaddedTimestampPrefix, TIMESTAMP_LENGTH, HASH_LENGTH }; diff --git a/apps/protohub/src/network/sync/trieNode.test.ts b/apps/protohub/src/network/sync/trieNode.test.ts new file mode 100644 index 0000000000..9083d185c2 --- /dev/null +++ b/apps/protohub/src/network/sync/trieNode.test.ts @@ -0,0 +1,206 @@ +import { Factories } from '@farcaster/protoutils'; +import { TIMESTAMP_LENGTH } from '~/network/sync/syncId'; +import { EMPTY_HASH, TrieNode } from '~/network/sync/trieNode'; +import { NetworkFactories } from '~/network/utils/factories'; + +const fid = Factories.Fid.build(); +const sharedDate = new Date(1665182332000); +const sharedPrefixHashA = + '0x09bc3dad4e7f2a77bbb2cccbecb06febfc3f0cbe7ea6a774d2dc043fd45c2c9912f130bf502c88fdedf7bbc4cd20b47aab2079e2d5cbd0a35afd2deec86a4321'; +const sharedPrefixHashB = + '0x09bc3dad4e7f2a77bbb2cccbecb06febfc3f0cbe7ea6a774d2dc043fd45c2c9912f130bf502c88fdedf7bbc4cd20b47aab2079e2d5cbd0a35afd2deec86b1234'; + +describe('TrieNode', () => { + // Traverse the node until we find a leaf or path splits into multiple choices + const traverse = (node: TrieNode): TrieNode => { + if (node.isLeaf) { + return node; + } + const children = Array.from(node.children); + if (children.length > 1) { + return node; + } + return traverse((children[0] as [string, TrieNode])[1]); + }; + + describe('insert', () => { + test('succeeds inserting a single item', async () => { + const root = new TrieNode(); + const id = await NetworkFactories.SyncId.create(); + + expect(root.items).toEqual(0); + expect(root.hash).toEqual(''); + + root.insert(id.idString()); + + expect(root.items).toEqual(1); + expect(root.hash).toBeTruthy(); + }); + + test('inserting the same item twice is idempotent', async () => { + const root = new TrieNode(); + const id = await NetworkFactories.SyncId.create(); + + root.insert(id.idString()); + expect(root.items).toEqual(1); + const previousHash = root.hash; + root.insert(id.idString()); + + expect(root.hash).toEqual(previousHash); + expect(root.items).toEqual(1); + }); + + test('insert compacts hashstring component of syncid to single node for efficiency', async () => { + const root = new TrieNode(); + const id = await NetworkFactories.SyncId.create(); + + root.insert(id.idString()); + let node = root; + // Timestamp portion of the key is not collapsed, but the hash portion is + for (let i = 0; i < TIMESTAMP_LENGTH; i++) { + const children = Array.from(node.children); + const firstChild = children[0] as [string, TrieNode]; + expect(children.length).toEqual(1); + node = firstChild[1]; + } + + expect(node.isLeaf).toEqual(true); + expect(node.value).toEqual(id.idString()); + }); + + test('inserting another key with a common prefix splits the node', async () => { + // Generate two ids with the same timestamp and the same hash prefix. The trie should split the node + // where they diverge + const id1 = await NetworkFactories.SyncId.create(undefined, { + transient: { date: sharedDate, hash: sharedPrefixHashA, fid }, + }); + const hash1 = id1.idString(); + + const id2 = await NetworkFactories.SyncId.create(undefined, { + transient: { date: sharedDate, hash: sharedPrefixHashB, fid }, + }); + const hash2 = id2.idString(); + + // The node at which the trie splits should be the first character that differs between the two hashes + // eslint-disable-next-line security/detect-object-injection + const firstDiffPos = hash1.split('').findIndex((c, i) => c !== hash2[i]); + + const root = new TrieNode(); + root.insert(id1.idString()); + root.insert(id2.idString()); + + const splitNode = traverse(root); + expect(splitNode.items).toEqual(2); + const children = Array.from(splitNode.children); + const firstChild = children[0] as [string, TrieNode]; + const secondChild = children[1] as [string, TrieNode]; + expect(children.length).toEqual(2); + // hash1 node + // eslint-disable-next-line security/detect-object-injection + expect(firstChild[0]).toEqual(hash1[firstDiffPos]); + expect(firstChild[1].isLeaf).toBeTruthy(); + expect(firstChild[1].value).toEqual(id1.idString()); + // hash2 node + // eslint-disable-next-line security/detect-object-injection + expect(secondChild[0]).toEqual(hash2[firstDiffPos]); + expect(secondChild[1].isLeaf).toBeTruthy(); + expect(secondChild[1].value).toEqual(id2.idString()); + }); + }); + + describe('delete', () => { + test('deleting a single item removes the node', async () => { + const root = new TrieNode(); + const id = await NetworkFactories.SyncId.create(); + + root.insert(id.idString()); + expect(root.items).toEqual(1); + + root.delete(id.idString()); + expect(root.items).toEqual(0); + expect(root.hash).toEqual(EMPTY_HASH); + }); + + test('deleting a single item from a node with multiple items removes the item', async () => { + const root = new TrieNode(); + const id1 = await NetworkFactories.SyncId.create(undefined, { transient: { date: sharedDate } }); + const id2 = await NetworkFactories.SyncId.create(undefined, { transient: { date: sharedDate } }); + + root.insert(id1.idString()); + const previousHash = root.hash; + root.insert(id2.idString()); + expect(root.items).toEqual(2); + + root.delete(id2.idString()); + expect(root.items).toEqual(1); + // eslint-disable-next-line security/detect-non-literal-fs-filename + expect(root.exists(id2.idString())).toBeFalsy(); + expect(root.hash).toEqual(previousHash); + }); + + test('deleting a single item from a split node should preserve previous hash', async () => { + const id1 = await NetworkFactories.SyncId.create(undefined, { + transient: { date: sharedDate, hash: sharedPrefixHashA }, + }); + const id2 = await NetworkFactories.SyncId.create(undefined, { + transient: { date: sharedDate, hash: sharedPrefixHashB }, + }); + + const root = new TrieNode(); + root.insert(id1.idString()); + const previousRootHash = root.hash; + const leafNode = traverse(root); + root.insert(id2.idString()); + + expect(root.hash).not.toEqual(previousRootHash); + + root.delete(id2.idString()); + + const newLeafNode = traverse(root); + expect(newLeafNode).toEqual(leafNode); + expect(root.hash).toEqual(previousRootHash); + }); + }); + + describe('get', () => { + test('getting a single item returns the value', async () => { + const root = new TrieNode(); + const id = await NetworkFactories.SyncId.create(); + + root.insert(id.idString()); + expect(root.items).toEqual(1); + + // eslint-disable-next-line security/detect-non-literal-fs-filename + expect(root.exists(id.idString())).toBeTruthy(); + }); + + test('getting an item after deleting it returns undefined', async () => { + const root = new TrieNode(); + const id = await NetworkFactories.SyncId.create(); + + root.insert(id.idString()); + expect(root.items).toEqual(1); + + root.delete(id.idString()); + // eslint-disable-next-line security/detect-non-literal-fs-filename + expect(root.exists(id.idString())).toBeFalsy(); + expect(root.items).toEqual(0); + }); + + test('getting an non-existent item that share the same prefix with an existing item returns undefined', async () => { + const id1 = await NetworkFactories.SyncId.create(undefined, { + transient: { date: sharedDate, hash: sharedPrefixHashA }, + }); + const id2 = await NetworkFactories.SyncId.create(undefined, { + transient: { date: sharedDate, hash: sharedPrefixHashB }, + }); + + const root = new TrieNode(); + root.insert(id1.idString()); + + // id2 shares the same prefix, but doesn't exist, so it should return undefined + // eslint-disable-next-line security/detect-non-literal-fs-filename + expect(root.exists(id2.idString())).toBeFalsy(); + }); + }); +}); diff --git a/apps/protohub/src/network/sync/trieNode.ts b/apps/protohub/src/network/sync/trieNode.ts new file mode 100644 index 0000000000..e8c2718a5c --- /dev/null +++ b/apps/protohub/src/network/sync/trieNode.ts @@ -0,0 +1,270 @@ +import { HubError } from '@farcaster/utils'; +import { blake3 } from '@noble/hashes/blake3'; +import { TIMESTAMP_LENGTH } from '~/network/sync/syncId'; + +export const EMPTY_HASH = Buffer.from(blake3('', { dkLen: 20 })).toString('hex'); + +/** + * A snapshot of the trie at a particular timestamp which can be used to determine if two + * hubs are in sync + * + * @prefix - The prefix (timestamp string) used to generate the snapshot + * @excludedHashes - The hash of all the nodes excluding the prefix character at every index of the prefix + * @numMessages - The total number of messages captured in the snapshot (excludes the prefix nodes) + */ +export type TrieSnapshot = { + prefix: string; + excludedHashes: string[]; + numMessages: number; +}; + +/** + * Represents a node in a MerkleTrie. Automatically updates the hashes when items are added, + * and keeps track of the number of items in the subtree. + */ +class TrieNode { + private _hash: string; + private _items: number; + private _children: Map; + private _key: string | undefined; + + constructor() { + this._hash = ''; + this._items = 0; + this._children = new Map(); + this._key = undefined; + } + + /** + * Inserts a value into the trie. Returns true if the value was inserted, false if it already existed + * @param key - The key to insert + * @param value - The value to insert + * @param current_index - The index of the current character in the key (only used internally) + * @returns true if the value was inserted, false if it already existed + * + * Recursively traverses the trie by prefix and inserts the value at the end. Updates the hashes for + * every node that was traversed. + */ + public insert(key: string, current_index = 0): boolean { + const char = key.charAt(current_index); + + // Do not compact the timestamp portion of the trie, since it's used to compare snapshots + if (current_index >= TIMESTAMP_LENGTH && this.isLeaf && !this._key) { + // Reached a leaf node with no value, insert it + this._setKeyValue(key); + this._items += 1; + return true; + } + + if (current_index >= TIMESTAMP_LENGTH && this.isLeaf) { + if (this._key == key) { + // If the same key exists, do nothing + return false; + } + // If the key is different, and a value exists, then split the node + this._splitLeafNode(current_index); + } + + if (!this._children.has(char)) { + this._addChild(char); + } + + // Recurse into a non-leaf node and instruct it to insert the value + const success = this._children.get(char)?.insert(key, current_index + 1); + if (success) { + this._items += 1; + this._updateHash(); + return true; + } + + return false; + } + + /** + * Deletes a value from the trie by key. Returns true if the value was deleted, false if it didn't exist + * @param key - The key to delete + * @param current_index - The index of the current character in the key (only used internally) + * + * Ensures that there are no empty nodes after deletion. This is important to make sure the hashes + * will match exactly with another trie that never had the value (e.g. in another hub). + */ + public delete(key: string, current_index = 0): boolean { + if (this.isLeaf) { + if (this._key === key) { + this._items -= 1; + this._setKeyValue(undefined); + return true; + } else { + return false; + } + } + + const char = key.charAt(current_index); + if (!this._children.has(char)) { + return false; + } + + const success = this._children.get(char)?.delete(key, current_index + 1); + if (success) { + this._items -= 1; + // Delete the child if it's empty. This is required to make sure the hash will be the same + // as another trie that doesn't have this node in the first place. + if (this._children.get(char)?.items === 0) { + this._children.delete(char); + } + + if (this._children.size === 1 && current_index >= TIMESTAMP_LENGTH) { + // Compact the node if it has only one child + const [char, child] = this._children.entries().next().value; + this._setKeyValue(child._key); + this._children.delete(char); + } + + this._updateHash(); + return true; + } + + return false; + } + + /** + * Check if a key exists in the trie. + * @param key - The key to look for + * @param current_index - The index of the current character in the key (only used internally) + */ + public exists(key: string, current_index = 0): boolean { + if (this.isLeaf && this._key === key) { + return true; + } + + const char = key.charAt(current_index); + if (!this._children.has(char)) { + return false; + } + + // NOTE: eslint falsely identifies as `fs.exists`. + // eslint-disable-next-line security/detect-non-literal-fs-filename + return this._children.get(char)?.exists(key, current_index + 1) || false; + } + + // Generates a snapshot for the current node and below. current_index is the index of the prefix the method is operating on + public getSnapshot(prefix: string, current_index = 0): TrieSnapshot { + const char = prefix.charAt(current_index); + if (current_index === prefix.length - 1) { + const excludedHash = this._excludedHash(char); + return { + prefix: prefix, + excludedHashes: [excludedHash.hash], + numMessages: excludedHash.items, + }; + } + + const innerSnapshot = this._children.get(char)?.getSnapshot(prefix, current_index + 1); + const excludedHash = this._excludedHash(char); + return { + prefix: innerSnapshot?.prefix || prefix.slice(0, current_index + 1), + excludedHashes: [excludedHash.hash, ...(innerSnapshot?.excludedHashes || [])], + numMessages: excludedHash.items + (innerSnapshot?.numMessages || 0), + }; + } + + public get items(): number { + return this._items; + } + + public get hash(): string { + return this._hash; + } + + public get isLeaf(): boolean { + return this._children.size === 0; + } + + // Only available on leaf nodes + public get value(): string | undefined { + if (this.isLeaf) { + return this._key; + } + return undefined; + } + + public getNode(prefix: string): TrieNode | undefined { + if (prefix.length === 0) { + return this; + } + const char = prefix.charAt(0); + if (!this._children.has(char)) { + return undefined; + } + return this._children.get(char)?.getNode(prefix.slice(1)); + } + + public get children(): IterableIterator<[string, TrieNode]> { + return this._children.entries(); + } + + public getAllValues(): string[] { + if (this.isLeaf) { + return this._key ? [this._key] : []; + } + const values: string[] = []; + this._children.forEach((child) => { + values.push(...child.getAllValues()); + }); + return values; + } + + /* Private methods */ + + private _excludedHash(char: string): { items: number; hash: string } { + // TODO: Cache this for performance + const hash = blake3.create({ dkLen: 20 }); + let excludedItems = 0; + this._children.forEach((child, key) => { + if (key !== char) { + hash.update(child.hash); + excludedItems += child.items; + } + }); + return { hash: Buffer.from(hash.digest()).toString('hex'), items: excludedItems }; + } + + private _addChild(char: string) { + this._children.set(char, new TrieNode()); + // The hash requires the children to be sorted, and sorting on insert/update is cheaper than + // sorting each time we need to update the hash + this._children = new Map([...this._children.entries()].sort()); + } + + private _setKeyValue(key: string | undefined) { + this._key = key; + this._updateHash(); + } + + // Splits a leaf node into a non-leaf node by clearing its key/value and adding a child for + // the next char in its key + private _splitLeafNode(current_index: number) { + if (!this._key) { + // This should never happen, check is here for type safety + throw new HubError('bad_request', 'Cannot split a leaf node without a key and value'); + } + const newChildChar = this._key.charAt(current_index); + this._addChild(newChildChar); + this._children.get(newChildChar)?.insert(this._key, current_index + 1); + this._setKeyValue(undefined); + } + + private _updateHash() { + if (this.isLeaf) { + this._hash = Buffer.from(blake3(this.value || '', { dkLen: 20 })).toString('hex'); + } else { + const hash = blake3.create({ dkLen: 20 }); + this._children.forEach((child) => { + hash.update(child.hash); + }); + this._hash = Buffer.from(hash.digest()).toString('hex'); + } + } +} + +export { TrieNode }; diff --git a/apps/protohub/src/network/utils/factories.test.ts b/apps/protohub/src/network/utils/factories.test.ts new file mode 100644 index 0000000000..5fe31a21fb --- /dev/null +++ b/apps/protohub/src/network/utils/factories.test.ts @@ -0,0 +1,60 @@ +import * as protobufs from '@farcaster/protobufs'; +import { Factories } from '@farcaster/protoutils'; +import { isPeerId } from '@libp2p/interface-peer-id'; +import { peerIdFromBytes } from '@libp2p/peer-id'; +import { GOSSIP_PROTOCOL_VERSION } from '~/network/p2p/protocol'; +import { NetworkFactories } from '~/network/utils/factories'; + +describe('GossipMessageFactory', () => { + let message: protobufs.Message; + let gossipMessage: protobufs.GossipMessage; + + beforeAll(async () => { + message = await Factories.Message.create(); + gossipMessage = await NetworkFactories.GossipMessage.create({ message }); + }); + + test('creates with arguments', () => { + expect(gossipMessage.message?.hash).toEqual(Array.from(message.hash)); + }); + + test('defaults to the right version', async () => { + expect(gossipMessage.version).toEqual(GOSSIP_PROTOCOL_VERSION); + }); + + test('generates a valid peerId', async () => { + const gossipMsg = await NetworkFactories.GossipMessage.create(); + const peerId = peerIdFromBytes(gossipMsg.peerId || new Uint8Array()); + expect(peerId).toBeDefined(); + expect(isPeerId(peerId)).toBeTruthy(); + }); +}); + +describe('AddressInfoFactory', () => { + test('creates with arguments', async () => { + const gossipAddress = NetworkFactories.GossipAddressInfo.build({ + address: '127.0.0.1', + port: 1234, + family: 4, + }); + expect(gossipAddress.address).toEqual('127.0.0.1'); + expect(gossipAddress.port).toEqual(1234); + expect(gossipAddress.family).toEqual(4); + }); +}); + +describe('ContactInfoFactory', () => { + let gossipAddress: protobufs.GossipAddressInfo; + let rpcAddress: protobufs.GossipAddressInfo; + + beforeAll(() => { + gossipAddress = NetworkFactories.GossipAddressInfo.build(); + rpcAddress = NetworkFactories.GossipAddressInfo.build(); + }); + + test('creates with arguments', async () => { + const contactInfo = NetworkFactories.GossipContactInfoContent.build({ gossipAddress, rpcAddress }); + expect(contactInfo.rpcAddress).toEqual(rpcAddress); + expect(contactInfo.gossipAddress).toEqual(gossipAddress); + }); +}); diff --git a/apps/protohub/src/network/utils/factories.ts b/apps/protohub/src/network/utils/factories.ts new file mode 100644 index 0000000000..f46919e5d7 --- /dev/null +++ b/apps/protohub/src/network/utils/factories.ts @@ -0,0 +1,71 @@ +import { faker } from '@faker-js/faker'; +import * as protobufs from '@farcaster/protobufs'; +import { Factories, hexStringToBytes } from '@farcaster/protoutils'; +import { PeerId } from '@libp2p/interface-peer-id'; +import { createEd25519PeerId } from '@libp2p/peer-id-factory'; +import { Factory } from 'fishery'; +import { NETWORK_TOPIC_PRIMARY } from '~/network/p2p/protocol'; +import { HASH_LENGTH, SyncId } from '~/network/sync/syncId'; + +const GossipAddressInfoFactory = Factory.define(() => { + return protobufs.GossipAddressInfo.create({ + address: '0.0.0.0', + port: faker.datatype.number({ min: 1, max: 65535 }), + family: 4, + }); +}); + +const ContactInfoContentFactory = Factory.define(() => { + return protobufs.ContactInfoContent.create({ + gossipAddress: GossipAddressInfoFactory.build(), + rpcAddress: GossipAddressInfoFactory.build(), + excludedHashes: [], + count: 0, + }); +}); + +const GossipMessageFactory = Factory.define( + ({ onCreate, transientParams }) => { + onCreate(async (gossipMessage) => { + if (gossipMessage.peerId.length === 0) { + gossipMessage.peerId = (await createEd25519PeerId()).toBytes(); + } + return gossipMessage; + }); + + return protobufs.GossipMessage.create({ + peerId: transientParams.peerId ? transientParams.peerId.toBytes() : new Uint8Array(), + message: Factories.Message.build(), + topics: [NETWORK_TOPIC_PRIMARY], + version: protobufs.GossipVersion.GOSSIP_VERSION_1, + }); + } +); + +const SyncIdFactory = Factory.define( + ({ onCreate, transientParams }) => { + onCreate(async () => { + const { date, hash, fid } = transientParams; + const hashBytes = hexStringToBytes(hash || faker.datatype.hexadecimal({ length: HASH_LENGTH }))._unsafeUnwrap(); + const ethSigner = Factories.Eip712Signer.build(); + const signerMessage = await Factories.SignerAddMessage.create( + { + hash: hashBytes, + data: { fid: fid || Factories.Fid.build(), timestamp: (date || faker.date.recent()).getTime() / 1000 }, + }, + { transient: { signer: ethSigner } } + ); + + return new SyncId(signerMessage); + }); + + return undefined; + } +); + +export const NetworkFactories = { + GossipMessage: GossipMessageFactory, + GossipContactInfoContent: ContactInfoContentFactory, + GossipAddressInfo: GossipAddressInfoFactory, + SyncId: SyncIdFactory, +}; diff --git a/apps/protohub/src/rpc/server/syncService.ts b/apps/protohub/src/rpc/server/syncService.ts new file mode 100644 index 0000000000..8f9172b69f --- /dev/null +++ b/apps/protohub/src/rpc/server/syncService.ts @@ -0,0 +1,22 @@ +import { getServer, SyncServiceServer, SyncServiceService } from '@farcaster/protobufs'; + +export const SyncServiceGrpc: SyncServiceServer = { + getInfo: (_call, _callback) => { + // Not implemented + }, + getAllSyncIdsByPrefix: (_call, _callback) => { + // Not implemented + }, + getAllMessagesBySyncIds: (_call) => { + // Not implemented + }, + getSyncMetadataByPrefix: (_call, _callback) => { + // Not implemented + }, + getSyncSnapshotByPrefix: (_call, _callback) => { + // Not implemented + }, +}; + +const server = getServer(); +server.addService(SyncServiceService, SyncServiceGrpc); diff --git a/apps/protohub/src/storage/engine/index.ts b/apps/protohub/src/storage/engine/index.ts index 7b5d0f264b..071207c1e7 100644 --- a/apps/protohub/src/storage/engine/index.ts +++ b/apps/protohub/src/storage/engine/index.ts @@ -1,9 +1,10 @@ import * as protobufs from '@farcaster/protobufs'; import { bytesCompare, HubAsyncResult, HubError, HubResult, validations } from '@farcaster/protoutils'; import { err, ok, ResultAsync } from 'neverthrow'; -import { typeToSetPostfix } from '~/storage/db/message'; +import { SyncId } from '~/network/sync/syncId'; +import { getManyMessages, typeToSetPostfix } from '~/storage/db/message'; import RocksDB from '~/storage/db/rocksdb'; -import { UserPostfix } from '~/storage/db/types'; +import { FID_BYTES, RootPrefix, UserPostfix } from '~/storage/db/types'; import CastStore from '~/storage/stores/castStore'; import ReactionStore from '~/storage/stores/reactionStore'; import SignerStore from '~/storage/stores/signerStore'; @@ -99,6 +100,49 @@ class Engine { return ok(undefined); } + /* -------------------------------------------------------------------------- */ + /* All Messages */ + /* -------------------------------------------------------------------------- */ + async forEachMessage(callback: (message: protobufs.Message) => void): Promise { + const allUserPrefix = Buffer.from([RootPrefix.User]); + + for await (const [key, value] of this._db.iteratorByPrefix(allUserPrefix, { keys: true, valueAsBuffer: true })) { + if (key.length < 2 + FID_BYTES) { + // Not a message key, so we can skip it. + continue; + } + + // Get the UserMessagePostfix from the key, which is the 1 + 32 bytes from the start + const postfix = key.slice(1 + FID_BYTES, 1 + FID_BYTES + 1)[0]; + if ( + postfix !== UserPostfix.CastMessage && + postfix !== UserPostfix.AmpMessage && + postfix !== UserPostfix.ReactionMessage && + postfix !== UserPostfix.VerificationMessage && + postfix !== UserPostfix.SignerMessage && + postfix !== UserPostfix.UserDataMessage + ) { + // Not a message key, so we can skip it. + continue; + } + + if (!value || value.length <= 20) { + // This is a hash and not a message, we need to skip it. + continue; + } + + const message = protobufs.Message.decode(new Uint8Array(value)); + + callback(message); + } + } + async getAllMessagesBySyncIds(syncIds: string[]): HubAsyncResult { + const hashesBuf = syncIds.map((syncIdHash) => SyncId.pkFromIdString(syncIdHash)); + const messages = await ResultAsync.fromPromise(getManyMessages(this._db, hashesBuf), (e) => e as HubError); + + return messages; + } + /* -------------------------------------------------------------------------- */ /* Cast Store Methods */ /* -------------------------------------------------------------------------- */ diff --git a/apps/protohub/src/utils/crypto.ts b/apps/protohub/src/utils/crypto.ts new file mode 100644 index 0000000000..56f0583dec --- /dev/null +++ b/apps/protohub/src/utils/crypto.ts @@ -0,0 +1,5 @@ +export const sleep = (ms: number) => { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); +}; diff --git a/apps/protohub/src/utils/logger.ts b/apps/protohub/src/utils/logger.ts new file mode 100644 index 0000000000..54c532c516 --- /dev/null +++ b/apps/protohub/src/utils/logger.ts @@ -0,0 +1,71 @@ +import * as protobufs from '@farcaster/protobufs'; +import { bytesToHexString, fromFarcasterTime } from '@farcaster/utils'; +import { default as Pino } from 'pino'; + +/** + * Logging Guidelines + * + * 1. Use a child instance per module + * + * const log = logger.child({ component: 'P2PEngine' }) + * + * 2. Structure logs so that they can be queried easily + * + * Good: log.info({ function: 'PerformSync', peerId: peerId.toString() }, 'connected to peers'); + * Bad: log.info('Hub connected to peers over P2P with id', peerId.toString()); + * + * 3. Use the appropriate level with log. (e.g. logger.info): + * + * logger.fatal() - when the application crashes + * logger.error() - when logging an Error object + * logger.warn() - when logging something unexpected + * logger.info() - when logging information (most common use case) + * logger.debug() - when logging something that is temporarily added in for debugging + * logger.trace() - not currently used + * + * 3. When logging an error, include the error object as the first argument to preserve stack traces + * + * Good: log.error(error, "additional context") + * Bad: log.error("additional context", error.message) + * + * More info on best practices: + * https://betterstack.com/community/guides/logging/how-to-install-setup-and-use-pino-to-log-node-js-applications/ + */ +const defaultOptions: Pino.LoggerOptions = {}; + +// Disable logging in tests and CI to reduce noise +if (process.env['NODE_ENV'] === 'test' || process.env['CI']) { + // defaultOptions.level = 'debug'; + defaultOptions.level = 'silent'; +} + +export const logger = Pino(defaultOptions); + +export const messageToLog = (message: protobufs.Message) => { + return { + timestamp: fromFarcasterTime(message.data?.timestamp || 0), + hash: bytesToHexString(message.hash), + fid: message.data?.fid, + type: message.data?.type, + }; +}; + +// export const idRegistryEventToLog = (event: IdRegistryEventModel) => { +// return { +// blockNumber: event.blockNumber(), +// transactionHash: bytesToHexString(event.transactionHash())._unsafeUnwrap(), +// fid: bytesToNumber(event.fid())._unsafeUnwrap(), +// to: bytesToHexString(event.to())._unsafeUnwrap(), +// type: event.typeName(), +// }; +// }; + +// export const nameRegistryEventToLog = (event: NameRegistryEventModel) => { +// return { +// blockNumber: event.blockNumber(), +// transactionHash: bytesToHexString(event.transactionHash())._unsafeUnwrap(), +// fname: Buffer.from(event.fname()).toString('utf-8').replace(/\0/g, ''), +// to: bytesToHexString(event.to())._unsafeUnwrap(), +// type: event.typeName(), +// }; +// }; diff --git a/apps/protohub/src/utils/p2p.test.ts b/apps/protohub/src/utils/p2p.test.ts new file mode 100644 index 0000000000..28ba086fde --- /dev/null +++ b/apps/protohub/src/utils/p2p.test.ts @@ -0,0 +1,142 @@ +import { createEd25519PeerId } from '@libp2p/peer-id-factory'; +import { multiaddr, NodeAddress } from '@multiformats/multiaddr'; +import { + addressInfoFromNodeAddress, + addressInfoFromParts, + checkNodeAddrs, + ipMultiAddrStrFromAddressInfo, + p2pMultiAddrStr, + parseAddress, +} from '~/utils/p2p'; + +describe('p2p utils tests', () => { + test('parse a valid multiaddr', async () => { + const result = parseAddress('/ip4/127.0.0.1/tcp/8080'); + expect(result.isOk()).toBeTruthy(); + }); + + test('fail to parse an invalid multiaddr', async () => { + const error = parseAddress('/ip6/127.0.0.1/8080')._unsafeUnwrapErr(); + expect(error.errCode).toEqual('bad_request.parse_failure'); + expect(error.message).toEqual('invalid multiaddr'); + }); + + test('fail to parse an empty string', async () => { + const error = parseAddress('')._unsafeUnwrapErr(); + expect(error.errCode).toEqual('bad_request'); + expect(error.message).toEqual('multiaddr must not be empty'); + }); + + test('check valid node addresses', async () => { + const result = checkNodeAddrs('/ip4/127.0.0.1/', '/ip4/127.0.0.1/tcp/8080'); + expect(result.isOk()).toBeTruthy(); + }); + + test('check invalid node addresses', async () => { + // invalid IP multiaddr but valid combined multiaddr + let error = checkNodeAddrs( + '/ip4/2600:1700:6cf0:990:2052:a166:fb35:830a', + '/ip4/127.0.0.1/tcp/8080' + )._unsafeUnwrapErr(); + expect(error.errCode).toEqual('bad_request.parse_failure'); + expect(error.message).toEqual('invalid multiaddr'); + + // valid IP multiaddr but invalid combined multiaddr + error = checkNodeAddrs( + '/ip4/127.0.0.1/', + '/ip4/2600:1700:6cf0:990:2052:a166:fb35:830a/tcp/8080' + )._unsafeUnwrapErr(); + expect(error.errCode).toEqual('bad_request.parse_failure'); + expect(error.message).toEqual('invalid multiaddr'); + + // both invalid IP and combined multiaddrs + error = checkNodeAddrs( + '/ip4/2600:1700:6cf0:990:2052:a166:fb35:830a', + '/ip4/2600:1700:6cf0:990:2052:a166:fb35:830a/tcp/8080' + )._unsafeUnwrapErr(); + expect(error.errCode).toEqual('bad_request.parse_failure'); + expect(error.message).toEqual('invalid multiaddr'); + }); + + test('p2p multiaddr formatted string', async () => { + const peerId = await createEd25519PeerId(); + const ip4Addr = { address: '127.0.0.1', family: 'IPv4', port: 1234 }; + const ip6Addr = { address: '2600:1700:6cf0:990:2052:a166:fb35:830a', family: 'IPv6', port: 1234 }; + + let multiAddrStr = p2pMultiAddrStr(ip4Addr, peerId.toString())._unsafeUnwrap(); + const multiAddr = multiaddr(multiAddrStr); + expect(multiAddrStr).toBeDefined(); + expect(multiAddr).toBeDefined(); + + multiAddrStr = p2pMultiAddrStr(ip6Addr, peerId.toString())._unsafeUnwrap(); + expect(multiAddrStr).toBeDefined(); + expect(multiaddr(multiAddrStr)).toBeDefined(); + }); + + test('addressInfo from valid IPv4 inputs', async () => { + const result = addressInfoFromParts('127.0.0.1', 0); + expect(result.isOk()).toBeTruthy(); + const info = result._unsafeUnwrap(); + expect(info.address).toEqual('127.0.0.1'); + expect(info.family).toEqual('IPv4'); + }); + + test('addressInfo from valid IPv6 inputs', async () => { + const result = addressInfoFromParts('2600:1700:6cf0:990:2052:a166:fb35:830a', 12345); + expect(result.isOk()).toBeTruthy(); + const info = result._unsafeUnwrap(); + expect(info.address).toEqual('2600:1700:6cf0:990:2052:a166:fb35:830a'); + expect(info.family).toEqual('IPv6'); + }); + + test('addressInfo fails on invalid inputs', async () => { + const result = addressInfoFromParts('clearlyNotAnIP', 12345); + expect(result.isErr()).toBeTruthy(); + }); + + test('valid multiaddr from addressInfo', async () => { + const addressInfo = addressInfoFromParts('127.0.0.1', 0); + expect(addressInfo.isOk()).toBeTruthy(); + + const multiAddrStr = ipMultiAddrStrFromAddressInfo(addressInfo._unsafeUnwrap())._unsafeUnwrap(); + expect(multiAddrStr).toEqual('/ip4/127.0.0.1'); + + const multiAddr = multiaddr(multiAddrStr); + expect(multiAddr).toBeDefined(); + }); + + test('throws when making multiaddr from invalid addressInfo', () => { + const addressInfo = addressInfoFromParts('127.0.0.1', 0); + expect(addressInfo.isOk()).toBeTruthy(); + + addressInfo._unsafeUnwrap().family = 'ip12'; + const error = ipMultiAddrStrFromAddressInfo(addressInfo._unsafeUnwrap())._unsafeUnwrapErr(); + expect(error.errCode).toEqual('bad_request'); + expect(error.message).toMatch('invalid AddressInfo family'); + }); + + test('converts a valid nodeAddress to an addressInfo', () => { + const nodeAddr: NodeAddress = { + family: 4, + address: '127.0.0.1', + port: 0, + }; + + const addressInfo = addressInfoFromNodeAddress(nodeAddr)._unsafeUnwrap(); + expect(addressInfo.address).toEqual(nodeAddr.address); + expect(addressInfo.port).toEqual(nodeAddr.port); + expect(addressInfo.family).toEqual('IPv4'); + }); + + test('throws when converting an invalid nodeAddress to an addressInfo', () => { + const nodeAddr: NodeAddress = { + family: 21 as 4, + address: '127.0.0.1', + port: 0, + }; + + const error = addressInfoFromNodeAddress(nodeAddr)._unsafeUnwrapErr(); + expect(error.errCode).toEqual('bad_request'); + expect(error.message).toMatch('invalid nodeAddress family'); + }); +}); diff --git a/apps/protohub/src/utils/p2p.ts b/apps/protohub/src/utils/p2p.ts new file mode 100644 index 0000000000..5b878f8aaa --- /dev/null +++ b/apps/protohub/src/utils/p2p.ts @@ -0,0 +1,165 @@ +import { GossipAddressInfo } from '@farcaster/protobufs'; +import { HubAsyncResult, HubError, HubResult } from '@farcaster/protoutils'; +import { Multiaddr, multiaddr, NodeAddress } from '@multiformats/multiaddr'; +import { get } from 'http'; +import { AddressInfo, isIP } from 'net'; +import { err, ok, Result } from 'neverthrow'; +import { logger } from '~/utils/logger'; + +/** Parses an address to verify it is actually a valid MultiAddr */ +export const parseAddress = (multiaddrStr: string): HubResult => { + if (multiaddrStr === '') return err(new HubError('bad_request', 'multiaddr must not be empty')); + + return Result.fromThrowable( + () => multiaddr(multiaddrStr), + (err) => new HubError('bad_request.parse_failure', { cause: err as Error, message: 'invalid multiaddr' }) + )(); +}; + +/** Checks that the IP address to bind to is valid and that the combined IP, transport, and port multiaddr is valid */ +export const checkNodeAddrs = (listenIPAddr: string, listenCombinedAddr: string): HubResult => { + return Result.combine([checkIpAddr(listenIPAddr), checkCombinedAddr(listenCombinedAddr)]).map(() => undefined); +}; + +/** Builds an AddressInfo from a NodeAddress */ +export const addressInfoFromNodeAddress = (nodeAddress: NodeAddress): HubResult => { + if (nodeAddress.family != 4 && nodeAddress.family != 6) + return err(new HubError('bad_request', `invalid nodeAddress family: ${nodeAddress.family}`)); + + return ok({ + address: nodeAddress.address, + port: nodeAddress.port, + family: ipFamilyToString(nodeAddress.family), + }); +}; + +/** Builds an AddressInfo for a given IP address and port */ +export const addressInfoFromParts = (address: string, port: number): HubResult => { + const family = isIP(address); + if (!family) return err(new HubError('bad_request.parse_failure', 'invalid ip address')); + + const addrInfo: AddressInfo = { + address, + port, + family: ipFamilyToString(family), + }; + return ok(addrInfo); +}; + +/** + * Creates an IP-only multiaddr formatted string from an AddressInfo + * + * Does not preserve port or transport information + */ +export const ipMultiAddrStrFromAddressInfo = (addressInfo: AddressInfo): HubResult => { + if (addressInfo.family != 'IPv6' && addressInfo.family != 'IPv4') + return err(new HubError('bad_request', `invalid AddressInfo family: ${addressInfo.family}`)); + + const family = addressInfo.family === 'IPv6' ? 'ip6' : 'ip4'; + const multiaddrStr = `/${family}/${addressInfo.address}`; + return ok(multiaddrStr); +}; + +/** + * Returns an IP-only multiaddr formatted string from an AddressInfo without preserving port and + * transport information. + */ +export const p2pMultiAddrStr = (addressInfo: AddressInfo, peerID: string): HubResult => { + return ipMultiAddrStrFromAddressInfo(addressInfo).map( + (ipMultiAddrStr) => `${ipMultiAddrStr}/tcp/${addressInfo.port}/p2p/${peerID}` + ); +}; + +/* Converts GossipAddressInfo to net.AddressInfo */ +export const addressInfoFromGossip = (addressInfo: GossipAddressInfo): HubResult => { + const address = addressInfo.address; + const port = addressInfo.port; + const family = addressInfo.family; + if (!address || family === 0) return err(new HubError('bad_request.parse_failure', 'Invalid address')); + const addrInfo: AddressInfo = { + address, + port, + family: ipFamilyToString(family), + }; + return ok(addrInfo); +}; + +/* Converts ipFamily number to string */ +export const ipFamilyToString = (family: number): string => { + return family == 4 ? 'IPv4' : 'IPv6'; +}; + +/* Converts AddressInfo to address string */ +export const addressInfoToString = (addressInfo: AddressInfo): string => { + if (addressInfo.family === 'IPv4') { + return `${addressInfo.address}:${addressInfo.port}`; + } else { + return `[${addressInfo.address}]:${addressInfo.port}`; + } +}; + +/** + * Returns publicly visible IPv4 or IPv6 address of the running process + */ +let lastIpFetch = { timestamp: new Date().getTime(), ip: '' }; + +export const getPublicIp = async (): HubAsyncResult => { + return new Promise((resolve, reject) => { + const now = new Date().getTime(); + const since = now - lastIpFetch.timestamp; + if (since <= 10 * 60 * 1000 && lastIpFetch.ip != '') { + logger.debug({ component: 'utils/p2p', ip: lastIpFetch.ip }, `Cached public IP`); + resolve(ok(lastIpFetch.ip)); + return; + } + try { + get({ host: 'api64.ipify.org', port: 80, path: '/' }, (resp) => { + resp.on('data', (ip: Buffer) => { + logger.info({ component: 'utils/p2p', ip: ip.toString() }, `Fetched public IP`); + lastIpFetch = { timestamp: now, ip: ip.toString() }; + resolve(ok(ip.toString())); + }); + }); + } catch (err: any) { + reject(new HubError('unavailable.network_failure', err)); + } + }); +}; + +/* -------------------------------------------------------------------------- */ +/* Private Methods */ +/* -------------------------------------------------------------------------- */ + +const checkIpAddr = (ipAddr: string): HubResult => { + const parseListenIpAddrResult = parseAddress(ipAddr); + if (parseListenIpAddrResult.isErr()) return err(parseListenIpAddrResult.error); + + const optionsResult = Result.fromThrowable( + () => parseListenIpAddrResult.value.toOptions(), + (error) => err(error) + )(); + + // An IP address should not have options and should throw if well-formed + if (optionsResult.isErr()) return ok(undefined); + + const options = optionsResult.value; + if (options.port !== undefined || options.transport !== undefined) { + return err(new HubError('bad_request', 'unexpected multiaddr transport/port information')); + } + return ok(undefined); +}; + +const checkCombinedAddr = (combinedAddr: string): HubResult => { + const parseListenIpAddrResult = parseAddress(combinedAddr); + if (parseListenIpAddrResult.isErr()) return err(parseListenIpAddrResult.error); + + const optionsResult = Result.fromThrowable( + () => parseListenIpAddrResult.value.toOptions(), + (error) => new HubError('bad_request.parse_failure', error as Error) + )(); + + return optionsResult.andThen((options) => { + if (options.transport != 'tcp') return err(new HubError('bad_request', 'multiaddr transport must be tcp')); + return ok(undefined); + }); +}; diff --git a/packages/protobufs/package.json b/packages/protobufs/package.json index 234b876c53..ab0654d6e3 100644 --- a/packages/protobufs/package.json +++ b/packages/protobufs/package.json @@ -24,6 +24,7 @@ }, "devDependencies": { "eslint-config-custom": "*", + "@grpc/grpc-js": "1.7.3", "ts-proto": "^1.138.0" }, "dependencies": {} diff --git a/packages/protobufs/src/index.ts b/packages/protobufs/src/index.ts index 35721c30dc..d6e0c42f07 100644 --- a/packages/protobufs/src/index.ts +++ b/packages/protobufs/src/index.ts @@ -1,4 +1,15 @@ +import * as grpc from '@grpc/grpc-js'; + +export * from './generated/gossip'; +export * from './generated/hub_state'; export * from './generated/id_registry_event'; export * from './generated/message'; +export * from './generated/rpc'; export * from './typeguards'; export * from './types'; + +export const getServer = (): grpc.Server => { + const server = new grpc.Server(); + + return server; +};