Skip to content

Commit

Permalink
fix: revert "chore: adding observers for message logging (#2800)" (#2815
Browse files Browse the repository at this point in the history
)
  • Loading branch information
gabrielmer authored and Ivansete-status committed Jun 17, 2024
1 parent 6eec201 commit a7d2dfd
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 66 deletions.
82 changes: 19 additions & 63 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,21 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
if node.wakuRelay.isSubscribed(topic):
return

proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
let msg_hash = topic.computeMessageHash(msg).to0xHex()

notice "waku.relay received",
my_peer_id = node.peerId,
pubsubTopic = topic,
msg_hash = msg_hash,
receivedTime = getNowInNanosecondTime(),
payloadSizeBytes = msg.payload.len

let msgSizeKB = msg.payload.len / 1000

waku_node_messages.inc(labelValues = ["relay"])
waku_histogram_message_size.observe(msgSizeKB)

proc filterHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
if node.wakuFilter.isNil():
return
Expand All @@ -240,6 +255,7 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
let defaultHandler = proc(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await traceHandler(topic, msg)
await filterHandler(topic, msg)
await archiveHandler(topic, msg)

Expand Down Expand Up @@ -376,61 +392,6 @@ proc startRelay*(node: WakuNode) {.async.} =

info "relay started successfully"

proc generateRelayObserver(node: WakuNode): PubSubObserver =
proc logMessageInfo(peer: PubSubPeer, msgs: var RPCMsg, onRecv: bool) =
for msg in msgs.messages:
let msg_id = node.wakuRelay.msgIdProvider(msg).valueOr:
warn "Error generating message id",
my_peer_id = node.peerId,
from_peer_id = peer.peerId,
topic = msg.topic,
error = $error
continue

let msg_id_short = shortLog(msg_id)

let wakuMessage = WakuMessage.decode(msg.data).valueOr:
warn "Error decoding to Waku Message",
my_peer_id = node.peerId,
msg_id = msg_id_short,
from_peer_id = peer.peerId,
topic = msg.topic,
error = $error
continue

let msg_hash = computeMessageHash(msg.topic, wakuMessage).to0xHex()

if onRecv:
notice "received relay message",
my_peer_id = node.peerId,
msg_hash = msg_hash,
msg_id = msg_id_short,
from_peer_id = peer.peerId,
topic = msg.topic,
receivedTime = getNowInNanosecondTime(),
payloadSizeBytes = wakuMessage.payload.len

let msgSizeKB = wakuMessage.payload.len / 1000
waku_node_messages.inc(labelValues = ["relay"])
waku_histogram_message_size.observe(msgSizeKB)
else:
notice "sent relay message",
my_peer_id = node.peerId,
msg_hash = msg_hash,
msg_id = msg_id_short,
to_peer_id = peer.peerId,
topic = msg.topic,
sentTime = getNowInNanosecondTime(),
payloadSizeBytes = wakuMessage.payload.len

proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) =
logMessageInfo(peer, msgs, onRecv = true)

proc onSend(peer: PubSubPeer, msgs: var RPCMsg) =
discard

return PubSubObserver(onRecv: onRecv, onSend: onSend)

proc mountRelay*(
node: WakuNode,
pubsubTopics: seq[string] = @[],
Expand All @@ -451,11 +412,6 @@ proc mountRelay*(

node.wakuRelay = initRes.value

# register relay observers for logging
debug "Registering Relay observers"
let observerLogger = node.generateRelayObserver()
node.wakuRelay.addObserver(observerLogger)

## Add peer exchange handler
if peerExchangeHandler.isSome():
node.wakuRelay.parameters.enablePX = true
Expand Down Expand Up @@ -977,14 +933,14 @@ proc mountLightPush*(
node: WakuNode, rateLimit: RateLimitSetting = DefaultGlobalNonRelayRateLimit
) {.async.} =
info "mounting light push"
var pushHandler =

var pushHandler =
if node.wakuRelay.isNil:
debug "mounting lightpush without relay (nil)"
getNilPushHandler()
else:
debug "mounting lightpush with relay"
let rlnPeer =
let rlnPeer =
if isNil(node.wakuRlnRelay):
debug "mounting lightpush without rln-relay"
none(WakuRLNRelay)
Expand Down
3 changes: 0 additions & 3 deletions waku/waku_relay/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,6 @@ proc addValidator*(
) {.gcsafe.} =
w.wakuValidators.add((handler, errorMessage))

proc addObserver*(w: WakuRelay, observer: PubSubObserver) {.gcsafe.} =
procCall GossipSub(w).addObserver(observer)

method start*(w: WakuRelay) {.async, base.} =
debug "start"
await procCall GossipSub(w).start()
Expand Down

0 comments on commit a7d2dfd

Please sign in to comment.