Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Geyser -> gRPC multiplexing #253

Closed
wants to merge 42 commits into from
Closed

Conversation

grooviegermanikus
Copy link
Collaborator

@grooviegermanikus grooviegermanikus commented Dec 7, 2023

How to review

This is a WIP-PR.
There are two .rs files which are both runnable. You should run them using RUST_LOG=info,stream_via_grpc=debug,drain_to_tip_pattern=debug and try to understand the sequence of logs (see example below).

There is the algorithm and channel logic to wire up everything. stream_via_grpc.rs is using a real gRPC stream from mainnet where drain_to_tip_pattern.rs demonstrates the pattern I use (i.e. a subset of stream_via_grpc.rs)

Description

This PR implements a component that connects two (later N) input channels from geyser/gRPC.

It assumes two streams of blocks which are ordered and duplicate-free; yet we should expect lags and dropped messages.
The service should emit one stream (channel) with the blocks merged from the two source streams.

The strategy for now is:

  • each channel has a task with recv-loop which gets drained until it sees a block with a slot number higher than the current slot (tip). this block is called "offered block"
    • if the "offered block" connects to the tip (via block.parent_slot) the block is emitted and the system progresses
    • if there are "offered blocks" but none of them are fitting; a timeout is triggered which allows to inspect all "offered blocks" seen so far and decide, what to do
  • the current tip slot number is propagated using a tokio watch channel

Output from stream_via_grpc.rs

2023-12-07T17:22:09.175462Z INFO stream_via_grpc: => recv on blue: 234565430@confirmed (-> 234565429)
2023-12-07T17:22:09.176291Z INFO stream_via_grpc: ==> blue: beyond tip (234565430 > 0)
2023-12-07T17:22:09.176385Z INFO stream_via_grpc: - current_tip=0
2023-12-07T17:22:09.176402Z INFO stream_via_grpc: << offered slot from blue: BlockRef { slot: 234565430, parent_slot: 234565429 }
2023-12-07T17:22:09.632587Z DEBUG stream_via_grpc: % delay value
2023-12-07T17:22:10.334018Z DEBUG stream_via_grpc: % drop value
2023-12-07T17:22:10.365822Z DEBUG stream_via_grpc: % forwarded
2023-12-07T17:22:10.365897Z INFO stream_via_grpc: => recv on green: 234565434@confirmed (-> 234565433)
2023-12-07T17:22:10.365914Z INFO stream_via_grpc: ==> green: beyond tip (234565434 > 0)
2023-12-07T17:22:10.365991Z INFO stream_via_grpc: - current_tip=0
2023-12-07T17:22:10.366006Z INFO stream_via_grpc: << offered slot from green: BlockRef { slot: 234565434, parent_slot: 234565433 }
2023-12-07T17:22:10.582544Z DEBUG stream_via_grpc: % forwarded
2023-12-07T17:22:11.113697Z DEBUG stream_via_grpc: % delay value
2023-12-07T17:22:11.815809Z DEBUG stream_via_grpc: % forwarded
2023-12-07T17:22:11.815919Z DEBUG stream_via_grpc: % forwarded
2023-12-07T17:22:12.240145Z DEBUG stream_via_grpc: % drop value
2023-12-07T17:22:12.432823Z DEBUG stream_via_grpc: % delay value
2023-12-07T17:22:13.136728Z DEBUG stream_via_grpc: % forwarded
2023-12-07T17:22:13.269552Z DEBUG stream_via_grpc: % forwarded
2023-12-07T17:22:13.368889Z INFO stream_via_grpc: - current_tip=0
2023-12-07T17:22:13.368934Z INFO stream_via_grpc: --> timeout: got these slots: [BlockRef { slot: 234565430, parent_slot: 234565429 }, BlockRef { slot: 234565434, parent_slot: 234565433 }]
2023-12-07T17:22:13.369050Z INFO stream_via_grpc: --> initializing with tip 234565434
2023-12-07T17:22:13.369103Z INFO stream_via_grpc: ++> green tip changed to 234565434
2023-12-07T17:22:13.369106Z INFO stream_via_grpc: ==> multiplexed slot: 234565434
2023-12-07T17:22:13.369128Z INFO stream_via_grpc: => recv on green: 234565435@confirmed (-> 234565434)
2023-12-07T17:22:13.369122Z INFO stream_via_grpc: ++> blue tip changed to 234565434
2023-12-07T17:22:13.369139Z INFO stream_via_grpc: ==> green: beyond tip (234565435 > 234565434)
2023-12-07T17:22:13.369150Z INFO stream_via_grpc: => recv on blue: 234565431@confirmed (-> 234565430)
2023-12-07T17:22:13.369163Z INFO stream_via_grpc: => recv on blue: 234565432@confirmed (-> 234565431)
2023-12-07T17:22:13.369167Z INFO stream_via_grpc: - current_tip=234565434
2023-12-07T17:22:13.369174Z INFO stream_via_grpc: => recv on blue: 234565433@confirmed (-> 234565432)
2023-12-07T17:22:13.369178Z INFO stream_via_grpc: << offered slot from green: BlockRef { slot: 234565435, parent_slot: 234565434 }
2023-12-07T17:22:13.369185Z INFO stream_via_grpc: => recv on blue: 234565434@confirmed (-> 234565433)
2023-12-07T17:22:13.369188Z INFO stream_via_grpc: << take block from green as new tip 234565435
2023-12-07T17:22:13.369196Z INFO stream_via_grpc: => recv on blue: 234565435@confirmed (-> 234565434)
2023-12-07T17:22:13.369205Z INFO stream_via_grpc: ==> blue: beyond tip (234565435 > 234565434)
2023-12-07T17:22:13.369220Z INFO stream_via_grpc: ++> green tip changed to 234565435
2023-12-07T17:22:13.369240Z INFO stream_via_grpc: => recv on green: 234565436@confirmed (-> 234565435)
2023-12-07T17:22:13.369246Z INFO stream_via_grpc: ++> blue tip changed to 234565435
2023-12-07T17:22:13.369238Z INFO stream_via_grpc: ==> multiplexed slot: 234565435
2023-12-07T17:22:13.369256Z INFO stream_via_grpc: ==> green: beyond tip (234565436 > 234565435)
2023-12-07T17:22:13.369266Z INFO stream_via_grpc: => recv on blue: 234565436@confirmed (-> 234565435)
2023-12-07T17:22:13.369277Z INFO stream_via_grpc: ==> blue: beyond tip (234565436 > 234565435)
2023-12-07T17:22:13.369297Z INFO stream_via_grpc: - current_tip=234565435
2023-12-07T17:22:13.369306Z INFO stream_via_grpc: << offered slot from blue: BlockRef { slot: 234565435, parent_slot: 234565434 }
2023-12-07T17:22:13.369320Z INFO stream_via_grpc: - current_tip=234565435
2023-12-07T17:22:13.369329Z INFO stream_via_grpc: << offered slot from green: BlockRef { slot: 234565436, parent_slot: 234565435 }
2023-12-07T17:22:13.369339Z INFO stream_via_grpc: << take block from green as new tip 234565436
2023-12-07T17:22:13.369364Z INFO stream_via_grpc: - current_tip=234565436
2023-12-07T17:22:13.369374Z INFO stream_via_grpc: << offered slot from blue: BlockRef { slot: 234565436, parent_slot: 234565435 }
2023-12-07T17:22:13.369393Z INFO stream_via_grpc: ++> blue tip changed to 234565436
2023-12-07T17:22:13.369393Z INFO stream_via_grpc: ==> multiplexed slot: 234565436
2023-12-07T17:22:13.369405Z INFO stream_via_grpc: => recv on blue: 234565437@confirmed (-> 234565436)
2023-12-07T17:22:13.369414Z INFO stream_via_grpc: ==> blue: beyond tip (234565437 > 234565436)
2023-12-07T17:22:13.369420Z INFO stream_via_grpc: ++> green tip changed to 234565436
2023-12-07T17:22:13.369435Z INFO stream_via_grpc: => recv on green: 234565437@confirmed (-> 234565436)
2023-12-07T17:22:13.369431Z INFO stream_via_grpc: - current_tip=234565436
2023-12-07T17:22:13.369444Z INFO stream_via_grpc: ==> green: beyond tip (234565437 > 234565436)
2023-12-07T17:22:13.369448Z INFO stream_via_grpc: << offered slot from blue: BlockRef { slot: 234565437, parent_slot: 234565436 }
2023-12-07T17:22:13.369458Z INFO stream_via_grpc: << take block from blue as new tip 234565437
2023-12-07T17:22:13.369480Z INFO stream_via_grpc: - current_tip=234565437
2023-12-07T17:22:13.369490Z INFO stream_via_grpc: << offered slot from green: BlockRef { slot: 234565437, parent_slot: 234565436 }
2023-12-07T17:22:13.369511Z INFO stream_via_grpc: ++> blue tip changed to 234565437
2023-12-07T17:22:13.369502Z INFO stream_via_grpc: ++> green tip changed to 234565437
2023-12-07T17:22:13.369526Z INFO stream_via_grpc: => recv on blue: 234565438@confirmed (-> 234565437)
2023-12-07T17:22:13.369530Z INFO stream_via_grpc: ==> multiplexed slot: 234565437
2023-12-07T17:22:13.369534Z INFO stream_via_grpc: => recv on green: 234565439@confirmed (-> 234565438)
2023-12-07T17:22:13.369538Z INFO stream_via_grpc: ==> blue: beyond tip (234565438 > 234565437)
2023-12-07T17:22:13.369545Z INFO stream_via_grpc: ==> green: beyond tip (234565439 > 234565437)
2023-12-07T17:22:13.369554Z INFO stream_via_grpc: - current_tip=234565437
2023-12-07T17:22:13.369564Z INFO stream_via_grpc: << offered slot from blue: BlockRef { slot: 234565438, parent_slot: 234565437 }
2023-12-07T17:22:13.369573Z INFO stream_via_grpc: << take block from blue as new tip 234565438
2023-12-07T17:22:13.369594Z INFO stream_via_grpc: - current_tip=234565438
2023-12-07T17:22:13.369603Z INFO stream_via_grpc: << offered slot from green: BlockRef { slot: 234565439, parent_slot: 234565438 }
2023-12-07T17:22:13.369612Z INFO stream_via_grpc: << take block from green as new tip 234565439
2023-12-07T17:22:13.369626Z INFO stream_via_grpc: ==> multiplexed slot: 234565438
2023-12-07T17:22:13.369630Z INFO stream_via_grpc: ++> green tip changed to 234565439
2023-12-07T17:22:13.369638Z INFO stream_via_grpc: ==> multiplexed slot: 234565439
2023-12-07T17:22:13.369643Z INFO stream_via_grpc: => recv on green: 234565440@confirmed (-> 234565439)
2023-12-07T17:22:13.369653Z INFO stream_via_grpc: ==> green: beyond tip (234565440 > 234565439)
2023-12-07T17:22:13.369661Z INFO stream_via_grpc: ++> blue tip changed to 234565439
2023-12-07T17:22:13.369670Z INFO stream_via_grpc: - current_tip=234565439
2023-12-07T17:22:13.369678Z INFO stream_via_grpc: => recv on blue: 234565439@confirmed (-> 234565438)
2023-12-07T17:22:13.369679Z INFO stream_via_grpc: << offered slot from green: BlockRef { slot: 234565440, parent_slot: 234565439 }
2023-12-07T17:22:13.369699Z INFO stream_via_grpc: << take block from green as new tip 234565440
2023-12-07T17:22:13.369729Z INFO stream_via_grpc: ==> multiplexed slot: 234565440
2023-12-07T17:22:13.369746Z INFO stream_via_grpc: ++> blue tip changed to 234565440
2023-12-07T17:22:13.369751Z INFO stream_via_grpc: ++> green tip changed to 234565440
2023-12-07T17:22:13.561760Z INFO stream_via_grpc: => recv on blue: 234565440@confirmed (-> 234565439)
2023-12-07T17:22:13.632833Z DEBUG stream_via_grpc: % forwarded
2023-12-07T17:22:13.632893Z INFO stream_via_grpc: => recv on green: 234565441@confirmed (-> 234565440)
2023-12-07T17:22:13.632913Z INFO stream_via_grpc: ==> green: beyond tip (234565441 > 234565440)
2023-12-07T17:22:13.632979Z INFO stream_via_grpc: - current_tip=234565440
2023-12-07T17:22:13.632994Z INFO stream_via_grpc: << offered slot from green: BlockRef { slot: 234565441, parent_slot: 234565440 }
2023-12-07T17:22:13.633009Z INFO stream_via_grpc: << take block from green as new tip 234565441
2023-12-07T17:22:13.633065Z INFO stream_via_grpc: ++> blue tip changed to 234565441
2023-12-07T17:22:13.633092Z INFO stream_via_grpc: ++> green tip changed to 234565441
2023-12-07T17:22:13.633104Z INFO stream_via_grpc: ==> multiplexed slot: 234565441
2023-12-07T17:22:13.789642Z INFO stream_via_grpc: => recv on blue: 234565441@confirmed (-> 234565440)
2023-12-07T17:22:14.092058Z INFO stream_via_grpc: => recv on blue: 234565442@confirmed (-> 234565441)
2023-12-07T17:22:14.092255Z INFO stream_via_grpc: ==> blue: beyond tip (234565442 > 234565441)
2023-12-07T17:22:14.092377Z INFO stream_via_grpc: - current_tip=234565441
2023-12-07T17:22:14.092394Z INFO stream_via_grpc: << offered slot from blue: BlockRef { slot: 234565442, parent_slot: 234565441 }
2023-12-07T17:22:14.092410Z INFO stream_via_grpc: << take block from blue as new tip 234565442
2023-12-07T17:22:14.092523Z INFO stream_via_grpc: ==> multiplexed slot: 234565442
2023-12-07T17:22:14.092555Z INFO stream_via_grpc: ++> blue tip changed to 234565442
2023-12-07T17:22:14.092613Z INFO stream_via_grpc: ++> green tip changed to 234565442
2023-12-07T17:22:14.139210Z DEBUG stream_via_grpc: % delay value
2023-12-07T17:22:14.855735Z DEBUG stream_via_grpc: % forwarded
2023-12-07T17:22:14.855835Z DEBUG stream_via_grpc: % drop value
2023-12-07T17:22:14.855944Z INFO stream_via_grpc: => recv on green: 234565442@confirmed (-> 234565441)
2023-12-07T17:22:14.861034Z INFO stream_via_grpc: => recv on blue: 234565443@confirmed (-> 234565442)
2023-12-07T17:22:14.861055Z INFO stream_via_grpc: ==> blue: beyond tip (234565443 > 234565442)
2023-12-07T17:22:14.861090Z INFO stream_via_grpc: - current_tip=234565442
2023-12-07T17:22:14.861097Z INFO stream_via_grpc: << offered slot from blue: BlockRef { slot: 234565443, parent_slot: 234565442 }
2023-12-07T17:22:14.861105Z INFO stream_via_grpc: << take block from blue as new tip 234565443
2023-12-07T17:22:14.861157Z INFO stream_via_grpc: ==> multiplexed slot: 234565443
2023-12-07T17:22:14.861170Z INFO stream_via_grpc: ++> green tip changed to 234565443
2023-12-07T17:22:14.861192Z INFO stream_via_grpc: ++> blue tip changed to 234565443
2023-12-07T17:22:14.885982Z DEBUG stream_via_grpc: % forwarded
2023-12-07T17:22:14.886017Z INFO stream_via_grpc: => recv on green: 234565444@confirmed (-> 234565443)
2023-12-07T17:22:14.886025Z INFO stream_via_grpc: ==> green: beyond tip (234565444 > 234565443)
2023-12-07T17:22:14.886060Z INFO stream_via_grpc: - current_tip=234565443
2023-12-07T17:22:14.886067Z INFO stream_via_grpc: << offered slot from green: BlockRef { slot: 234565444, parent_slot: 234565443 }
2023-12-07T17:22:14.886074Z INFO stream_via_grpc: << take block from green as new tip 234565444
2023-12-07T17:22:14.886103Z INFO stream_via_grpc: ++> green tip changed to 234565444
2023-12-07T17:22:14.886116Z INFO stream_via_grpc: ==> multiplexed slot: 234565444
2023-12-07T17:22:14.886129Z INFO stream_via_grpc: ++> blue tip changed to 234565444
2023-12-07T17:22:15.003535Z INFO stream_via_grpc: => recv on blue: 234565444@confirmed (-> 234565443)
2023-12-07T17:22:15.003602Z INFO stream_via_grpc: => recv on blue: 234565445@confirmed (-> 234565444)
2023-12-07T17:22:15.003618Z INFO stream_via_grpc: ==> blue: beyond tip (234565445 > 234565444)
2023-12-07T17:22:15.003666Z INFO stream_via_grpc: - current_tip=234565444
2023-12-07T17:22:15.003682Z INFO stream_via_grpc: << offered slot from blue: BlockRef { slot: 234565445, parent_slot: 234565444 }
2023-12-07T17:22:15.003700Z INFO stream_via_grpc: << take block from blue as new tip 234565445
2023-12-07T17:22:15.003783Z INFO stream_via_grpc: ==> multiplexed slot: 234565445
2023-12-07T17:22:15.003808Z INFO stream_via_grpc: ++> blue tip changed to 234565445
2023-12-07T17:22:15.003836Z INFO stream_via_grpc: ++> green tip changed to 234565445

@grooviegermanikus
Copy link
Collaborator Author

reconnect and multiplexing moved to https://github.com/blockworks-foundation/geyser-grpc-connector;
integration moved to feature/grpc-multiplexing-integrate

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants