diff --git a/.gitignore b/.gitignore index 7e631e18..6945dfff 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ coverage nimcache +tests/data tests/testAll diff --git a/libp2pdht.nimble b/libp2pdht.nimble index 54cd6715..5d21ff8c 100644 --- a/libp2pdht.nimble +++ b/libp2pdht.nimble @@ -19,7 +19,9 @@ requires "nim >= 1.2.0", "secp256k1 >= 0.5.2 & < 0.6.0", "stew#head", "stint", - "asynctest >= 0.3.1 & < 0.4.0" + "asynctest >= 0.3.1 & < 0.4.0", + "questionable >= 0.10.5 & < 0.11.0", + "https://github.com/status-im/nim-datastore#methods_close_query" task coverage, "generates code coverage report": var (output, exitCode) = gorgeEx("which lcov") @@ -57,4 +59,3 @@ task coverage, "generates code coverage report": exec("genhtml coverage/coverage.f.info --output-directory coverage/report") echo "Opening HTML coverage report in browser..." exec("open coverage/report/index.html") - diff --git a/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim b/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim index fbd0ef3f..b9bf7e2b 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim @@ -74,12 +74,15 @@ {.push raises: [Defect].} import - std/[tables, sets, options, math, sequtils, algorithm], + std/[tables, sets, options, math, sequtils, algorithm, hashes], stew/shims/net as stewNet, json_serialization/std/net, - stew/[base64, endians2, results], chronicles, chronicles/chronos_tools, chronos, chronos/timer, stint, bearssl, + stew/[base64, endians2], chronicles, chronicles/chronos_tools, chronos, chronos/timer, stint, bearssl, + stew/results as stewResults, metrics, - libp2p/[crypto/crypto, routing_record], - "."/[transport, messages, messages_encoding, node, routing_table, spr, random2, ip_vote, nodes_verification] + libp2p/[crypto/crypto, routing_record, signed_envelope], + "."/[transport, messages, messages_encoding, node, routing_table, spr, random2, ip_vote, nodes_verification], + questionable, questionable/results, + datastore/sqlite_datastore import nimcrypto except toHex @@ -135,7 +138,7 @@ type talkProtocols*: Table[seq[byte], TalkProtocol] # TODO: Table is a bit of # overkill here, use sequence rng*: ref BrHmacDrbgContext - providers: Table[NodeId, seq[SignedPeerRecord]] + providers: Datastore TalkProtocolHandler* = proc(p: TalkProtocol, request: seq[byte], fromId: NodeId, fromUdpAddress: Address): seq[byte] {.gcsafe, raises: [Defect].} @@ -315,19 +318,52 @@ proc handleTalkReq(d: Protocol, fromId: NodeId, fromAddr: Address, kind = MessageKind.talkresp d.sendResponse(fromId, fromAddr, talkresp, reqId) -proc addProviderLocal(p: Protocol, cId: NodeId, prov: SignedPeerRecord) = +proc fromBytes(T: type SignedPeerRecord, bytes: seq[byte]): ?!T = + let decodeRes = T.decode(bytes) + if decodeRes.isErr: + failure $decodeRes.error + else: + success decodeRes.get + +proc toBytes(prov: SignedPeerRecord): ?!seq[byte] = + let encodeRes = prov.encode + if encodeRes.isErr: + failure $encodeRes.error + else: + success encodeRes.get + +proc addProviderLocal(p: Protocol, cId: NodeId, prov: SignedPeerRecord) {.async.} = trace "adding provider to local db", n=p.localNode, cId, prov - p.providers.mgetOrPut(cId, @[]).add(prov) + without provBytes =? prov.toBytes, error: + trace "error when encoding SPR", cId, error = error.msg + + without key =? Key.init($cid & "/" & $provBytes.hash), error: + trace "error when encoding cId as a key", cId, error = error.msg + + let putRes = await p.providers.put(key, provBytes) + if putRes.isErr: + trace "error when storing encoded SPR in database", cId, error = error.msg proc handleAddProvider(d: Protocol, fromId: NodeId, fromAddr: Address, - addProvider: AddProviderMessage, reqId: RequestId) = - d.addProviderLocal(addProvider.cId, addProvider.prov) + addProvider: AddProviderMessage, reqId: RequestId) {.async.} = + await d.addProviderLocal(addProvider.cId, addProvider.prov) proc handleGetProviders(d: Protocol, fromId: NodeId, fromAddr: Address, - getProviders: GetProvidersMessage, reqId: RequestId) = + getProviders: GetProvidersMessage, reqId: RequestId) {.async.} = #TODO: add checks, add signed version - let provs = d.providers.getOrDefault(getProviders.cId) + var provs: seq[SignedPeerRecord] + let cId = getProviders.cId + without queryKey =? Key.init($cId & "/*"), error: + trace "error when encoding cId as a query key", cId, error = error.msg + + let q = d.providers.query; for kv in q(d.providers, Query.init(queryKey)): + let (_, provBytes) = await kv + without prov =? SignedPeerRecord.fromBytes(provBytes), error: + trace "error when decoding SPR retrieved from database", cId , error = error.msg + + provs.add prov + trace "providers:", provs ##TODO: handle multiple messages @@ -335,7 +371,7 @@ proc handleGetProviders(d: Protocol, fromId: NodeId, fromAddr: Address, d.sendResponse(fromId, fromAddr, response, reqId) proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address, - message: Message) = + message: Message) {.async.} = case message.kind of ping: discovery_message_requests_incoming.inc() @@ -352,10 +388,10 @@ proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address, of addProvider: discovery_message_requests_incoming.inc() discovery_message_requests_incoming.inc(labelValues = ["no_response"]) - d.handleAddProvider(srcId, fromAddr, message.addProvider, message.reqId) + await d.handleAddProvider(srcId, fromAddr, message.addProvider, message.reqId) of getProviders: discovery_message_requests_incoming.inc() - d.handleGetProviders(srcId, fromAddr, message.getProviders, message.reqId) + await d.handleGetProviders(srcId, fromAddr, message.getProviders, message.reqId) of regTopic, topicQuery: discovery_message_requests_incoming.inc() discovery_message_requests_incoming.inc(labelValues = ["no_response"]) @@ -643,7 +679,7 @@ proc addProvider*( if toNode != d.localNode: discard d.sendRequest(toNode, AddProviderMessage(cId: cId, prov: pr)) else: - d.addProviderLocal(cId, pr) + await d.addProviderLocal(cId, pr) return res @@ -677,11 +713,20 @@ proc getProvidersLocal*( d: Protocol, cId: NodeId, maxitems: int = 5, - ): seq[SignedPeerRecord] {.raises: [KeyError,Defect].} = + ): Future[seq[SignedPeerRecord]] {.async.} = + + var provs: seq[SignedPeerRecord] + without queryKey =? Key.init($cId & "/*"), error: + trace "error when encoding cId as a query key", cId, error = error.msg - return - if (cId in d.providers): d.providers[cId] - else: @[] + let q = d.providers.query; for kv in q(d.providers, Query.init(queryKey)): + let (_, provBytes) = await kv + without prov =? SignedPeerRecord.fromBytes(provBytes), error: + trace "error when decoding SPR retrieved from database", cId , error = error.msg + + provs.add prov + + return provs proc getProviders*( d: Protocol, @@ -691,7 +736,7 @@ proc getProviders*( ): Future[DiscResult[seq[SignedPeerRecord]]] {.async.} = # What providers do we know about? - var res = d.getProvidersLocal(cId, maxitems) + var res = await d.getProvidersLocal(cId, maxitems) trace "local providers:", res let nodesNearby = await d.lookup(cId) @@ -959,7 +1004,9 @@ proc newProtocol*( bindIp = IPv4_any(), enrAutoUpdate = false, config = defaultDiscoveryConfig, - rng = newRng()): + rng = newRng(), + dataDir: string, + dbName: string): Protocol = # TODO: Tried adding bindPort = udpPort as parameter but that gave # "Error: internal error: environment misses: udpPort" in nim-beacon-chain. @@ -996,6 +1043,9 @@ proc newProtocol*( # TODO Consider whether this should be a Defect doAssert rng != nil, "RNG initialization failed" + without providers =? SQLiteDatastore.new(dataDir, dbName), error: + raise (ref Defect)(msg: error.msg) + result = Protocol( privateKey: privKey, localNode: node, @@ -1004,7 +1054,8 @@ proc newProtocol*( enrAutoUpdate: enrAutoUpdate, routingTable: RoutingTable.init( node, config.bitsPerHop, config.tableIpLimits, rng), - rng: rng) + rng: rng, + providers: providers) result.transport = newTransport(result, privKey, node, bindPort, bindIp, rng) @@ -1021,7 +1072,7 @@ proc start*(d: Protocol) = d.revalidateLoop = revalidateLoop(d) d.ipMajorityLoop = ipMajorityLoop(d) -proc close*(d: Protocol) = +proc close*(d: Protocol) {.async.} = doAssert(not d.transport.closed) debug "Closing discovery node", node = d.localNode @@ -1033,6 +1084,7 @@ proc close*(d: Protocol) = d.ipMajorityLoop.cancel() d.transport.close() + await d.providers.close() proc closeWait*(d: Protocol) {.async.} = doAssert(not d.transport.closed) @@ -1046,3 +1098,4 @@ proc closeWait*(d: Protocol) {.async.} = await d.ipMajorityLoop.cancelAndWait() await d.transport.closeWait() + await d.providers.close() diff --git a/libp2pdht/private/eth/p2p/discoveryv5/transport.nim b/libp2pdht/private/eth/p2p/discoveryv5/transport.nim index 4faf0bb5..d7e960d8 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/transport.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/transport.nim @@ -101,7 +101,7 @@ proc sendWhoareyou(t: Transport, toId: NodeId, a: Address, else: debug "Node with this id already has ongoing handshake, ignoring packet" -proc receive*(t: Transport, a: Address, packet: openArray[byte]) = +proc receive*(t: Transport, a: Address, packet: seq[byte]) {.async.} = let decoded = t.codec.decodePacket(a, packet) if decoded.isOk: let packet = decoded[] @@ -111,7 +111,7 @@ proc receive*(t: Transport, a: Address, packet: openArray[byte]) = let message = packet.messageOpt.get() trace "Received message packet", srcId = packet.srcId, address = a, kind = message.kind, packet - t.client.handleMessage(packet.srcId, a, message) + await t.client.handleMessage(packet.srcId, a, message) else: trace "Not decryptable message packet received", srcId = packet.srcId, address = a @@ -143,7 +143,7 @@ proc receive*(t: Transport, a: Address, packet: openArray[byte]) = of HandshakeMessage: trace "Received handshake message packet", srcId = packet.srcIdHs, address = a, kind = packet.message.kind - t.client.handleMessage(packet.srcIdHs, a, packet.message) + await t.client.handleMessage(packet.srcIdHs, a, packet.message) # For a handshake message it is possible that we received an newer SPR. # In that case we can add/update it to the routing table. if packet.node.isSome(): @@ -178,7 +178,7 @@ proc processClient[T](transp: DatagramTransport, raddr: TransportAddress): return let a = Address(ip: ValidIpAddress.init(ip), port: raddr.port) - t.receive(a, buf) + await t.receive(a, buf) proc open*[T](t: Transport[T]) {.raises: [Defect, CatchableError].} = info "Starting transport", bindAddress = t.bindAddress diff --git a/tests/dht/test_helper.nim b/tests/dht/test_helper.nim index f6d49572..465b25eb 100644 --- a/tests/dht/test_helper.nim +++ b/tests/dht/test_helper.nim @@ -1,6 +1,10 @@ +import + std/[oids, os] + import bearssl, chronos, + datastore/sqlite_datastore, libp2p/crypto/[crypto, secp], libp2p/multiaddress, libp2pdht/discv5/[node, routing_table, spr], @@ -22,6 +26,10 @@ proc example*(T: type NodeId, rng: ref HmacDrbgContext): NodeId = pubKey = privKey.getPublicKey.expect("Valid private key for public key") pubKey.toNodeId().expect("Public key valid for node id") +const testDataDir* = "tests/data" + +proc randDbName*(): string = "dht_" & $genOid() & dbExt + proc initDiscoveryNode*( rng: ref BrHmacDrbgContext, privKey: PrivateKey, @@ -42,7 +50,9 @@ proc initDiscoveryNode*( localEnrFields = localEnrFields, previousRecord = previousRecord, config = config, - rng = rng) + rng = rng, + dataDir = testDataDir, + dbName = randDbName()) protocol.open() diff --git a/tests/dht/test_providers.nim b/tests/dht/test_providers.nim index 09d3f036..e75f3ae3 100644 --- a/tests/dht/test_providers.nim +++ b/tests/dht/test_providers.nim @@ -10,7 +10,7 @@ {.used.} import - std/[options, sequtils], + std/[options, os, sequtils], asynctest, bearssl, chronicles, @@ -78,6 +78,9 @@ suite "Providers Tests: node alone": peerRec0: PeerRecord setupAll: + removeDir(testDataDir) + require(not dirExists(testDataDir)) + createDir(testDataDir) rng = newRng() nodes = await bootstrapNetwork(nodecount=1) targetId = NodeId.example(rng) @@ -89,6 +92,8 @@ suite "Providers Tests: node alone": for (n, _) in nodes: await n.closeWait() await sleepAsync(chronos.seconds(3)) + removeDir(testDataDir) + require(not dirExists(testDataDir)) test "Node in isolation should store": @@ -99,7 +104,7 @@ suite "Providers Tests: node alone": debug "---- STARTING CHECKS ---" check (addedTo.len == 1) check (addedTo[0].id == node0.localNode.id) - check (node0.getProvidersLocal(targetId)[0].data.peerId == peerRec0.peerId) + check ((await node0.getProvidersLocal(targetId))[0].data.peerId == peerRec0.peerId) test "Node in isolation should retrieve": @@ -138,8 +143,11 @@ suite "Providers Tests: two nodes": peerRec0: PeerRecord setupAll: + removeDir(testDataDir) + require(not dirExists(testDataDir)) + createDir(testDataDir) rng = newRng() - nodes = await bootstrapNetwork(nodecount=3) + nodes = await bootstrapNetwork(nodecount=2) targetId = NodeId.example(rng) (node0, privKey0) = nodes[0] signedPeerRec0 = privKey0.toSignedPeerRecord @@ -149,6 +157,8 @@ suite "Providers Tests: two nodes": for (n, _) in nodes: await n.closeWait() await sleepAsync(chronos.seconds(3)) + removeDir(testDataDir) + require(not dirExists(testDataDir)) test "2 nodes, store and retrieve from same": @@ -187,6 +197,9 @@ suite "Providers Tests: 20 nodes": peerRec0: PeerRecord setupAll: + removeDir(testDataDir) + require(not dirExists(testDataDir)) + createDir(testDataDir) rng = newRng() nodes = await bootstrapNetwork(nodecount=20) targetId = NodeId.example(rng) @@ -199,6 +212,8 @@ suite "Providers Tests: 20 nodes": teardownAll: for (n, _) in nodes: # if last test is enabled, we need nodes[1..^1] here await n.closeWait() + removeDir(testDataDir) + require(not dirExists(testDataDir)) test "20 nodes, store and retrieve from same": diff --git a/tests/discv5/test_discoveryv5.nim b/tests/discv5/test_discoveryv5.nim index eb6d9eff..c0d7be6e 100644 --- a/tests/discv5/test_discoveryv5.nim +++ b/tests/discv5/test_discoveryv5.nim @@ -1,8 +1,9 @@ {.used.} import - std/tables, + std/[os, tables], chronos, chronicles, stint, asynctest, stew/shims/net, + datastore/sqlite_datastore, stew/byteutils, bearssl, libp2p/crypto/crypto, libp2pdht/discv5/[transport, spr, node, routing_table, encoding, sessions, messages, nodes_verification], @@ -16,6 +17,16 @@ suite "Discovery v5 Tests": setup: rng = newRng() + setupAll: + removeDir(testDataDir) + require(not dirExists(testDataDir)) + createDir(testDataDir) + + teardownAll: + removeDir(testDataDir) + require(not dirExists(testDataDir)) + discard + test "GetNode": # TODO: This could be tested in just a routing table only context let @@ -439,15 +450,18 @@ suite "Discovery v5 Tests": ip = some(ValidIpAddress.init("127.0.0.1")) port = Port(20301) node = newProtocol(privKey, ip, some(port), some(port), bindPort = port, - rng = rng) + rng = rng, dataDir = testDataDir, dbName = randDbName()) noUpdatesNode = newProtocol(privKey, ip, some(port), some(port), - bindPort = port, rng = rng, previousRecord = some(node.getRecord())) + bindPort = port, rng = rng, previousRecord = some(node.getRecord()), + dataDir = testDataDir, dbName = randDbName()) updatesNode = newProtocol(privKey, ip, some(port), some(Port(20302)), bindPort = port, rng = rng, - previousRecord = some(noUpdatesNode.getRecord())) + previousRecord = some(noUpdatesNode.getRecord()), dataDir = testDataDir, + dbName = randDbName()) moreUpdatesNode = newProtocol(privKey, ip, some(port), some(port), bindPort = port, rng = rng, localEnrFields = {"addfield": @[byte 0]}, - previousRecord = some(updatesNode.getRecord())) + previousRecord = some(updatesNode.getRecord()), dataDir = testDataDir, + dbName = randDbName()) check: node.getRecord().seqNum == 1 noUpdatesNode.getRecord().seqNum == 1 @@ -458,7 +472,8 @@ suite "Discovery v5 Tests": expect ResultDefect: let incorrectKeyUpdates = newProtocol(PrivateKey.example(rng), ip, some(port), some(port), bindPort = port, rng = rng, - previousRecord = some(updatesNode.getRecord())) + previousRecord = some(updatesNode.getRecord()), dataDir = testDataDir, + dbName = randDbName()) test "Update node record with revalidate": let @@ -631,7 +646,7 @@ suite "Discovery v5 Tests": let (packet, _) = encodeMessagePacket(rng[], codec, receiveNode.localNode.id, receiveNode.localNode.address.get(), @[]) - receiveNode.transport.receive(a, packet) + await receiveNode.transport.receive(a, packet) # Checking different nodeIds but same address check receiveNode.transport.codec.handshakes.len == 5 @@ -661,7 +676,7 @@ suite "Discovery v5 Tests": let a = localAddress(20303 + i) let (packet, _) = encodeMessagePacket(rng[], codec, receiveNode.localNode.id, receiveNode.localNode.address.get(), @[]) - receiveNode.transport.receive(a, packet) + await receiveNode.transport.receive(a, packet) # Checking different nodeIds but same address check receiveNode.transport.codec.handshakes.len == 5 @@ -693,7 +708,7 @@ suite "Discovery v5 Tests": for i in 0 ..< 5: let (packet, requestNonce) = encodeMessagePacket(rng[], codec, receiveNode.localNode.id, receiveNode.localNode.address.get(), @[]) - receiveNode.transport.receive(a, packet) + await receiveNode.transport.receive(a, packet) if i == 0: firstRequestNonce = requestNonce