Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replace providers:Table with providers:SQLiteDatastore #41

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
coverage
nimcache
tests/data
tests/testAll
5 changes: 3 additions & 2 deletions libp2pdht.nimble
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ requires "nim >= 1.2.0",
"secp256k1 >= 0.5.2 & < 0.6.0",
"stew#head",
"stint",
"asynctest >= 0.3.1 & < 0.4.0"
"asynctest >= 0.3.1 & < 0.4.0",
"questionable >= 0.10.5 & < 0.11.0",
"https://github.com/status-im/nim-datastore#methods_close_query"

task coverage, "generates code coverage report":
var (output, exitCode) = gorgeEx("which lcov")
Expand Down Expand Up @@ -57,4 +59,3 @@ task coverage, "generates code coverage report":
exec("genhtml coverage/coverage.f.info --output-directory coverage/report")
echo "Opening HTML coverage report in browser..."
exec("open coverage/report/index.html")

99 changes: 76 additions & 23 deletions libp2pdht/private/eth/p2p/discoveryv5/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,15 @@
{.push raises: [Defect].}

import
std/[tables, sets, options, math, sequtils, algorithm],
std/[tables, sets, options, math, sequtils, algorithm, hashes],
stew/shims/net as stewNet, json_serialization/std/net,
stew/[base64, endians2, results], chronicles, chronicles/chronos_tools, chronos, chronos/timer, stint, bearssl,
stew/[base64, endians2], chronicles, chronicles/chronos_tools, chronos, chronos/timer, stint, bearssl,
stew/results as stewResults,
metrics,
libp2p/[crypto/crypto, routing_record],
"."/[transport, messages, messages_encoding, node, routing_table, spr, random2, ip_vote, nodes_verification]
libp2p/[crypto/crypto, routing_record, signed_envelope],
"."/[transport, messages, messages_encoding, node, routing_table, spr, random2, ip_vote, nodes_verification],
questionable, questionable/results,
datastore/sqlite_datastore

import nimcrypto except toHex

Expand Down Expand Up @@ -135,7 +138,7 @@ type
talkProtocols*: Table[seq[byte], TalkProtocol] # TODO: Table is a bit of
# overkill here, use sequence
rng*: ref BrHmacDrbgContext
providers: Table[NodeId, seq[SignedPeerRecord]]
providers: Datastore

TalkProtocolHandler* = proc(p: TalkProtocol, request: seq[byte], fromId: NodeId, fromUdpAddress: Address): seq[byte]
{.gcsafe, raises: [Defect].}
Expand Down Expand Up @@ -315,27 +318,60 @@ proc handleTalkReq(d: Protocol, fromId: NodeId, fromAddr: Address,
kind = MessageKind.talkresp
d.sendResponse(fromId, fromAddr, talkresp, reqId)

proc addProviderLocal(p: Protocol, cId: NodeId, prov: SignedPeerRecord) =
proc fromBytes(T: type SignedPeerRecord, bytes: seq[byte]): ?!T =
let decodeRes = T.decode(bytes)
if decodeRes.isErr:
failure $decodeRes.error
else:
success decodeRes.get

proc toBytes(prov: SignedPeerRecord): ?!seq[byte] =
let encodeRes = prov.encode
if encodeRes.isErr:
failure $encodeRes.error
else:
success encodeRes.get

proc addProviderLocal(p: Protocol, cId: NodeId, prov: SignedPeerRecord) {.async.} =
trace "adding provider to local db", n=p.localNode, cId, prov
p.providers.mgetOrPut(cId, @[]).add(prov)
without provBytes =? prov.toBytes, error:
trace "error when encoding SPR", cId, error = error.msg

without key =? Key.init($cid & "/" & $provBytes.hash), error:
trace "error when encoding cId as a key", cId, error = error.msg

let putRes = await p.providers.put(key, provBytes)
if putRes.isErr:
trace "error when storing encoded SPR in database", cId, error = error.msg

proc handleAddProvider(d: Protocol, fromId: NodeId, fromAddr: Address,
addProvider: AddProviderMessage, reqId: RequestId) =
d.addProviderLocal(addProvider.cId, addProvider.prov)
addProvider: AddProviderMessage, reqId: RequestId) {.async.} =
await d.addProviderLocal(addProvider.cId, addProvider.prov)

proc handleGetProviders(d: Protocol, fromId: NodeId, fromAddr: Address,
getProviders: GetProvidersMessage, reqId: RequestId) =
getProviders: GetProvidersMessage, reqId: RequestId) {.async.} =

#TODO: add checks, add signed version
let provs = d.providers.getOrDefault(getProviders.cId)
var provs: seq[SignedPeerRecord]
let cId = getProviders.cId
without queryKey =? Key.init($cId & "/*"), error:
trace "error when encoding cId as a query key", cId, error = error.msg

let q = d.providers.query; for kv in q(d.providers, Query.init(queryKey)):
let (_, provBytes) = await kv
without prov =? SignedPeerRecord.fromBytes(provBytes), error:
trace "error when decoding SPR retrieved from database", cId , error = error.msg

provs.add prov

trace "providers:", provs

##TODO: handle multiple messages
let response = ProvidersMessage(total: 1, provs: provs)
d.sendResponse(fromId, fromAddr, response, reqId)

proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address,
message: Message) =
message: Message) {.async.} =
case message.kind
of ping:
discovery_message_requests_incoming.inc()
Expand All @@ -352,10 +388,10 @@ proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address,
of addProvider:
discovery_message_requests_incoming.inc()
discovery_message_requests_incoming.inc(labelValues = ["no_response"])
d.handleAddProvider(srcId, fromAddr, message.addProvider, message.reqId)
await d.handleAddProvider(srcId, fromAddr, message.addProvider, message.reqId)
of getProviders:
discovery_message_requests_incoming.inc()
d.handleGetProviders(srcId, fromAddr, message.getProviders, message.reqId)
await d.handleGetProviders(srcId, fromAddr, message.getProviders, message.reqId)
of regTopic, topicQuery:
discovery_message_requests_incoming.inc()
discovery_message_requests_incoming.inc(labelValues = ["no_response"])
Expand Down Expand Up @@ -643,7 +679,7 @@ proc addProvider*(
if toNode != d.localNode:
discard d.sendRequest(toNode, AddProviderMessage(cId: cId, prov: pr))
else:
d.addProviderLocal(cId, pr)
await d.addProviderLocal(cId, pr)

return res

Expand Down Expand Up @@ -677,11 +713,20 @@ proc getProvidersLocal*(
d: Protocol,
cId: NodeId,
maxitems: int = 5,
): seq[SignedPeerRecord] {.raises: [KeyError,Defect].} =
): Future[seq[SignedPeerRecord]] {.async.} =

var provs: seq[SignedPeerRecord]
without queryKey =? Key.init($cId & "/*"), error:
trace "error when encoding cId as a query key", cId, error = error.msg

return
if (cId in d.providers): d.providers[cId]
else: @[]
let q = d.providers.query; for kv in q(d.providers, Query.init(queryKey)):
let (_, provBytes) = await kv
without prov =? SignedPeerRecord.fromBytes(provBytes), error:
trace "error when decoding SPR retrieved from database", cId , error = error.msg

provs.add prov

return provs

proc getProviders*(
d: Protocol,
Expand All @@ -691,7 +736,7 @@ proc getProviders*(
): Future[DiscResult[seq[SignedPeerRecord]]] {.async.} =

# What providers do we know about?
var res = d.getProvidersLocal(cId, maxitems)
var res = await d.getProvidersLocal(cId, maxitems)
trace "local providers:", res

let nodesNearby = await d.lookup(cId)
Expand Down Expand Up @@ -959,7 +1004,9 @@ proc newProtocol*(
bindIp = IPv4_any(),
enrAutoUpdate = false,
config = defaultDiscoveryConfig,
rng = newRng()):
rng = newRng(),
dataDir: string,
dbName: string):
Protocol =
# TODO: Tried adding bindPort = udpPort as parameter but that gave
# "Error: internal error: environment misses: udpPort" in nim-beacon-chain.
Expand Down Expand Up @@ -996,6 +1043,9 @@ proc newProtocol*(
# TODO Consider whether this should be a Defect
doAssert rng != nil, "RNG initialization failed"

without providers =? SQLiteDatastore.new(dataDir, dbName), error:
raise (ref Defect)(msg: error.msg)

result = Protocol(
privateKey: privKey,
localNode: node,
Expand All @@ -1004,7 +1054,8 @@ proc newProtocol*(
enrAutoUpdate: enrAutoUpdate,
routingTable: RoutingTable.init(
node, config.bitsPerHop, config.tableIpLimits, rng),
rng: rng)
rng: rng,
providers: providers)

result.transport = newTransport(result, privKey, node, bindPort, bindIp, rng)

Expand All @@ -1021,7 +1072,7 @@ proc start*(d: Protocol) =
d.revalidateLoop = revalidateLoop(d)
d.ipMajorityLoop = ipMajorityLoop(d)

proc close*(d: Protocol) =
proc close*(d: Protocol) {.async.} =
doAssert(not d.transport.closed)

debug "Closing discovery node", node = d.localNode
Expand All @@ -1033,6 +1084,7 @@ proc close*(d: Protocol) =
d.ipMajorityLoop.cancel()

d.transport.close()
await d.providers.close()

proc closeWait*(d: Protocol) {.async.} =
doAssert(not d.transport.closed)
Expand All @@ -1046,3 +1098,4 @@ proc closeWait*(d: Protocol) {.async.} =
await d.ipMajorityLoop.cancelAndWait()

await d.transport.closeWait()
await d.providers.close()
8 changes: 4 additions & 4 deletions libp2pdht/private/eth/p2p/discoveryv5/transport.nim
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ proc sendWhoareyou(t: Transport, toId: NodeId, a: Address,
else:
debug "Node with this id already has ongoing handshake, ignoring packet"

proc receive*(t: Transport, a: Address, packet: openArray[byte]) =
proc receive*(t: Transport, a: Address, packet: seq[byte]) {.async.} =
let decoded = t.codec.decodePacket(a, packet)
if decoded.isOk:
let packet = decoded[]
Expand All @@ -111,7 +111,7 @@ proc receive*(t: Transport, a: Address, packet: openArray[byte]) =
let message = packet.messageOpt.get()
trace "Received message packet", srcId = packet.srcId, address = a,
kind = message.kind, packet
t.client.handleMessage(packet.srcId, a, message)
await t.client.handleMessage(packet.srcId, a, message)
else:
trace "Not decryptable message packet received",
srcId = packet.srcId, address = a
Expand Down Expand Up @@ -143,7 +143,7 @@ proc receive*(t: Transport, a: Address, packet: openArray[byte]) =
of HandshakeMessage:
trace "Received handshake message packet", srcId = packet.srcIdHs,
address = a, kind = packet.message.kind
t.client.handleMessage(packet.srcIdHs, a, packet.message)
await t.client.handleMessage(packet.srcIdHs, a, packet.message)
# For a handshake message it is possible that we received an newer SPR.
# In that case we can add/update it to the routing table.
if packet.node.isSome():
Expand Down Expand Up @@ -178,7 +178,7 @@ proc processClient[T](transp: DatagramTransport, raddr: TransportAddress):
return
let a = Address(ip: ValidIpAddress.init(ip), port: raddr.port)

t.receive(a, buf)
await t.receive(a, buf)

proc open*[T](t: Transport[T]) {.raises: [Defect, CatchableError].} =
info "Starting transport", bindAddress = t.bindAddress
Expand Down
12 changes: 11 additions & 1 deletion tests/dht/test_helper.nim
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import
std/[oids, os]

import
bearssl,
chronos,
datastore/sqlite_datastore,
libp2p/crypto/[crypto, secp],
libp2p/multiaddress,
libp2pdht/discv5/[node, routing_table, spr],
Expand All @@ -22,6 +26,10 @@ proc example*(T: type NodeId, rng: ref HmacDrbgContext): NodeId =
pubKey = privKey.getPublicKey.expect("Valid private key for public key")
pubKey.toNodeId().expect("Public key valid for node id")

const testDataDir* = "tests/data"

proc randDbName*(): string = "dht_" & $genOid() & dbExt

proc initDiscoveryNode*(
rng: ref BrHmacDrbgContext,
privKey: PrivateKey,
Expand All @@ -42,7 +50,9 @@ proc initDiscoveryNode*(
localEnrFields = localEnrFields,
previousRecord = previousRecord,
config = config,
rng = rng)
rng = rng,
dataDir = testDataDir,
dbName = randDbName())

protocol.open()

Expand Down
21 changes: 18 additions & 3 deletions tests/dht/test_providers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
{.used.}

import
std/[options, sequtils],
std/[options, os, sequtils],
asynctest,
bearssl,
chronicles,
Expand Down Expand Up @@ -78,6 +78,9 @@ suite "Providers Tests: node alone":
peerRec0: PeerRecord

setupAll:
removeDir(testDataDir)
require(not dirExists(testDataDir))
createDir(testDataDir)
rng = newRng()
nodes = await bootstrapNetwork(nodecount=1)
targetId = NodeId.example(rng)
Expand All @@ -89,6 +92,8 @@ suite "Providers Tests: node alone":
for (n, _) in nodes:
await n.closeWait()
await sleepAsync(chronos.seconds(3))
removeDir(testDataDir)
require(not dirExists(testDataDir))


test "Node in isolation should store":
Expand All @@ -99,7 +104,7 @@ suite "Providers Tests: node alone":
debug "---- STARTING CHECKS ---"
check (addedTo.len == 1)
check (addedTo[0].id == node0.localNode.id)
check (node0.getProvidersLocal(targetId)[0].data.peerId == peerRec0.peerId)
check ((await node0.getProvidersLocal(targetId))[0].data.peerId == peerRec0.peerId)

test "Node in isolation should retrieve":

Expand Down Expand Up @@ -138,8 +143,11 @@ suite "Providers Tests: two nodes":
peerRec0: PeerRecord

setupAll:
removeDir(testDataDir)
require(not dirExists(testDataDir))
createDir(testDataDir)
rng = newRng()
nodes = await bootstrapNetwork(nodecount=3)
nodes = await bootstrapNetwork(nodecount=2)
targetId = NodeId.example(rng)
(node0, privKey0) = nodes[0]
signedPeerRec0 = privKey0.toSignedPeerRecord
Expand All @@ -149,6 +157,8 @@ suite "Providers Tests: two nodes":
for (n, _) in nodes:
await n.closeWait()
await sleepAsync(chronos.seconds(3))
removeDir(testDataDir)
require(not dirExists(testDataDir))

test "2 nodes, store and retrieve from same":

Expand Down Expand Up @@ -187,6 +197,9 @@ suite "Providers Tests: 20 nodes":
peerRec0: PeerRecord

setupAll:
removeDir(testDataDir)
require(not dirExists(testDataDir))
createDir(testDataDir)
rng = newRng()
nodes = await bootstrapNetwork(nodecount=20)
targetId = NodeId.example(rng)
Expand All @@ -199,6 +212,8 @@ suite "Providers Tests: 20 nodes":
teardownAll:
for (n, _) in nodes: # if last test is enabled, we need nodes[1..^1] here
await n.closeWait()
removeDir(testDataDir)
require(not dirExists(testDataDir))

test "20 nodes, store and retrieve from same":

Expand Down
Loading