Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

saving blocks + tx to Postgres + basic multi-storage-strategy #242

Closed
wants to merge 108 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
108 commits
Select commit Hold shift + click to select a range
d47fbe5
add context() to some db statements
grooviegermanikus Nov 6, 2023
d5e5c1a
handle schema exists case
grooviegermanikus Nov 6, 2023
d17e465
create schema and insert data
grooviegermanikus Nov 6, 2023
6b8bdec
use postgres "TEXT" instead of char/varchar
grooviegermanikus Nov 6, 2023
2f78538
use postgres type "TEXT" instead of CHAR
grooviegermanikus Nov 6, 2023
def99c0
rename PostgresSession cons
grooviegermanikus Nov 6, 2023
b3af181
add tests for vecvec
grooviegermanikus Nov 6, 2023
d13a1ae
wrap EpochRef
grooviegermanikus Nov 6, 2023
e155d1f
wrap EpochRef
grooviegermanikus Nov 6, 2023
5491c52
improve logging on polling
grooviegermanikus Nov 6, 2023
f26dd6d
setup permissions
grooviegermanikus Nov 6, 2023
ebb5db7
rename schema to rpc2a
grooviegermanikus Nov 7, 2023
e4bf72b
check for monotonic insertion
grooviegermanikus Nov 7, 2023
c1e8c8c
comment on schema lite_rpc
grooviegermanikus Nov 7, 2023
77c241a
use &ProducedBlock instead of ProducedBlock
grooviegermanikus Nov 7, 2023
bab1e22
remove RpcBlockConfig; heavy refactoring of multi-storage strategy
grooviegermanikus Nov 7, 2023
7f6a335
replace CommitmentConfig by Commitment
grooviegermanikus Nov 7, 2023
9a87c6c
remove unused imports
grooviegermanikus Nov 7, 2023
9e3970f
extract faithful store
grooviegermanikus Nov 7, 2023
17de34e
make sure that we do not save block with lower commitment level
grooviegermanikus Nov 7, 2023
40e3076
disable faithful
grooviegermanikus Nov 7, 2023
fa89820
removes postgres_data
grooviegermanikus Nov 7, 2023
99d549b
fix get_slot_range
grooviegermanikus Nov 7, 2023
351c4df
add more query_ helpers
grooviegermanikus Nov 8, 2023
da85071
trace-log calls to .save()
grooviegermanikus Nov 8, 2023
10aa14a
go back to commitment_level
grooviegermanikus Nov 8, 2023
349d517
check for commitment level going backwards
grooviegermanikus Nov 8, 2023
a8dce6e
wire up rpc block poller and in-memory-storage in test
grooviegermanikus Nov 8, 2023
1409c2a
trace execution times
grooviegermanikus Nov 8, 2023
7c57753
prepare statement + increase chunk
grooviegermanikus Nov 8, 2023
b880db2
tune primary key + cluster table
grooviegermanikus Nov 8, 2023
02fd145
Merge branch 'groovie/postgres_saving_blocksII' into groovie/postgres…
grooviegermanikus Nov 8, 2023
9357bef
port back
grooviegermanikus Nov 8, 2023
5b8fa0e
reduce chunk size again
grooviegermanikus Nov 13, 2023
79f54e2
Merge branch 'groovie/postgres_saving_blocks-integration-tst' into gr…
grooviegermanikus Nov 13, 2023
7c15c98
Merge branch 'main' into groovie/postgres_saving_blocksII
grooviegermanikus Nov 13, 2023
5f0ae12
fixes
grooviegermanikus Nov 13, 2023
7036783
more fixes
grooviegermanikus Nov 13, 2023
76396c3
inline NB_ARUMENTS
grooviegermanikus Nov 13, 2023
98f73b0
minor renames
grooviegermanikus Nov 13, 2023
0faed4f
add comment to COPY IN
grooviegermanikus Nov 14, 2023
99ec11d
use VARCHAR at some places
grooviegermanikus Nov 14, 2023
d5f3b47
clarify range check
grooviegermanikus Nov 14, 2023
9d77b46
use .save
grooviegermanikus Nov 15, 2023
a012a8c
move .save out of trait
grooviegermanikus Nov 15, 2023
be8e1fe
monotony check
grooviegermanikus Nov 15, 2023
650a7b4
fix import
grooviegermanikus Nov 16, 2023
3daf583
get epochs from postgres
grooviegermanikus Nov 16, 2023
fb779c1
postgres block store
grooviegermanikus Nov 16, 2023
e8c4af2
WIP: query block from postgres
grooviegermanikus Nov 16, 2023
d124f49
WIP: query block from postgres
grooviegermanikus Nov 16, 2023
a3f5739
query slot range over all epoch schemata
grooviegermanikus Nov 17, 2023
3e2d387
calc slot range per epoch
grooviegermanikus Nov 17, 2023
6fe2240
type EpochRef
grooviegermanikus Nov 17, 2023
54c16ef
query block using epoch/slot range map
grooviegermanikus Nov 17, 2023
5618f84
extract query method
grooviegermanikus Nov 17, 2023
091bbb5
map rewareds
grooviegermanikus Nov 17, 2023
7d4c97a
debug confirmation-finalized state
grooviegermanikus Nov 17, 2023
b859195
debug confirmation-finalized state
grooviegermanikus Nov 17, 2023
9e45f88
debug confirmation-finalized state
grooviegermanikus Nov 17, 2023
5c90feb
use epoch schedule for slot range assignment
grooviegermanikus Nov 17, 2023
4e62e85
iterutils
grooviegermanikus Nov 17, 2023
acc6088
add Assumptions
grooviegermanikus Nov 17, 2023
7d929e6
remove unused code
grooviegermanikus Nov 19, 2023
e102572
experiment with account key compression
grooviegermanikus Nov 19, 2023
79110d3
split off the epoch schema preparation
grooviegermanikus Nov 20, 2023
0394813
clean duplicate
grooviegermanikus Nov 20, 2023
bf89290
remove epoch schema preparation from write loop
grooviegermanikus Nov 20, 2023
455122e
broadcast channel size must be power of two
grooviegermanikus Nov 20, 2023
f5b67fa
improve write path:
grooviegermanikus Nov 20, 2023
40a13d1
remove unused gin extension
grooviegermanikus Nov 20, 2023
3042e6f
improve time tracking
grooviegermanikus Nov 20, 2023
e31f2eb
add wrapper for write connection
grooviegermanikus Nov 20, 2023
3775f99
WIP: revert to get_session
grooviegermanikus Nov 20, 2023
68cb46d
set write session parameters
grooviegermanikus Nov 20, 2023
4a4d674
move PostgresWriteSession
grooviegermanikus Nov 20, 2023
34739b1
copy in csv + binary
grooviegermanikus Nov 20, 2023
5c54cfb
bench postgres
grooviegermanikus Nov 21, 2023
cb48525
bench postgres
grooviegermanikus Nov 21, 2023
5cc7cb9
bench postgres
grooviegermanikus Nov 21, 2023
e51ab41
CAUTION: change postgres shuttdown handling
grooviegermanikus Nov 21, 2023
731a74e
ported to postgres_transaction
grooviegermanikus Nov 21, 2023
704ff44
write using copy in
grooviegermanikus Nov 21, 2023
4b0f885
clippy
grooviegermanikus Nov 21, 2023
626f97c
clippy
grooviegermanikus Nov 21, 2023
7725309
use multiple write sessions
grooviegermanikus Nov 21, 2023
4ac1502
clippy
grooviegermanikus Nov 21, 2023
6cc9168
coordinate epoch schema creation
grooviegermanikus Nov 21, 2023
647ebeb
coordinate startup
grooviegermanikus Nov 21, 2023
bd8182e
relax check for confirmed block
grooviegermanikus Nov 21, 2023
483d79e
fix prioritization_fees overflow
grooviegermanikus Nov 19, 2023
fc49127
minor log fix
grooviegermanikus Nov 21, 2023
e068c97
log timings to CSV
grooviegermanikus Nov 22, 2023
b42cb6c
Revert "log timings to CSV"
grooviegermanikus Nov 22, 2023
165ac41
do not panic on missing blocks
grooviegermanikus Nov 22, 2023
6dc5f7d
add basic ANALYZE for blocks table
grooviegermanikus Nov 22, 2023
c1e264a
Revert "Revert "log timings to CSV""
grooviegermanikus Nov 22, 2023
cb31601
add more Cargo deps
grooviegermanikus Nov 28, 2023
5b931a0
load tester
grooviegermanikus Nov 28, 2023
cd23c42
moved some files; clippy fixes
grooviegermanikus Nov 28, 2023
91b5443
moved some files; clippy fixes
grooviegermanikus Nov 28, 2023
6bd0a1d
add some deps to Cargo
grooviegermanikus Nov 28, 2023
2574aaa
temporarily remove get_block implementation from bridge
grooviegermanikus Nov 28, 2023
715775e
move history related code
grooviegermanikus Nov 28, 2023
bab883c
Merge remote-tracking branch 'origin/main' into groovie/postgres_savi…
grooviegermanikus Nov 28, 2023
b711be0
add postgres tester for block read + simple SELECT 1
grooviegermanikus Nov 28, 2023
f489727
fix db name
grooviegermanikus Nov 28, 2023
f9dd332
add some postgres session examples
grooviegermanikus Nov 28, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@ solana-pubsub-client = "~1.16.3"
solana-streamer = "~1.16.3"
solana-account-decoder = "~1.16.3"
itertools = "0.10.5"
rangetools = "0.1.4"
serde = { version = "1.0.160", features = ["derive"] }
serde_json = "1.0.96"
bincode = "1.3.3"
bs58 = "0.4.0"
base64 = "0.21.0"
thiserror = "1.0.40"
futures = "0.3.28"
futures-util = "0.3.28"
bytes = "1.4.0"
anyhow = "1.0.70"
log = "0.4.17"
Expand Down
8 changes: 4 additions & 4 deletions cluster-endpoints/src/grpc_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,10 +319,10 @@ pub fn create_grpc_subscription(
grpc_x_token: Option<String>,
expected_grpc_version: String,
) -> anyhow::Result<(EndpointStreaming, Vec<AnyhowJoinHandle>)> {
let (slot_sx, slot_notifier) = tokio::sync::broadcast::channel(10);
let (block_sx, blocks_notifier) = tokio::sync::broadcast::channel(10);
let (cluster_info_sx, cluster_info_notifier) = tokio::sync::broadcast::channel(10);
let (va_sx, vote_account_notifier) = tokio::sync::broadcast::channel(10);
let (slot_sx, slot_notifier) = tokio::sync::broadcast::channel(16);
let (block_sx, blocks_notifier) = tokio::sync::broadcast::channel(16);
let (cluster_info_sx, cluster_info_notifier) = tokio::sync::broadcast::channel(16);
let (va_sx, vote_account_notifier) = tokio::sync::broadcast::channel(16);

let mut slots = HashMap::new();
slots.insert(
Expand Down
11 changes: 6 additions & 5 deletions cluster-endpoints/src/json_rpc_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,18 @@

pub fn create_json_rpc_polling_subscription(
rpc_client: Arc<RpcClient>,
num_parallel_tasks: usize,
) -> anyhow::Result<(EndpointStreaming, Vec<AnyhowJoinHandle>)> {
let (slot_sx, slot_notifier) = tokio::sync::broadcast::channel(10);
let (block_sx, blocks_notifier) = tokio::sync::broadcast::channel(10);
let (cluster_info_sx, cluster_info_notifier) = tokio::sync::broadcast::channel(10);
let (va_sx, vote_account_notifier) = tokio::sync::broadcast::channel(10);
let (slot_sx, slot_notifier) = tokio::sync::broadcast::channel(16);
let (block_sx, blocks_notifier) = tokio::sync::broadcast::channel(16);
let (cluster_info_sx, cluster_info_notifier) = tokio::sync::broadcast::channel(16);
let (va_sx, vote_account_notifier) = tokio::sync::broadcast::channel(16);

let mut endpoint_tasks =

Check warning on line 22 in cluster-endpoints/src/json_rpc_subscription.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

Diff in /home/runner/work/lite-rpc/lite-rpc/cluster-endpoints/src/json_rpc_subscription.rs

Check warning on line 22 in cluster-endpoints/src/json_rpc_subscription.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

Diff in /home/runner/work/lite-rpc/lite-rpc/cluster-endpoints/src/json_rpc_subscription.rs
poll_slots(rpc_client.clone(), CommitmentConfig::processed(), slot_sx)?;

let mut block_polling_tasks =
poll_block(rpc_client.clone(), block_sx, slot_notifier.resubscribe());
poll_block(rpc_client.clone(), block_sx, slot_notifier.resubscribe(), num_parallel_tasks);
endpoint_tasks.append(&mut block_polling_tasks);

let cluster_info_polling =
Expand Down
5 changes: 4 additions & 1 deletion cluster-endpoints/src/rpc_polling/poll_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use solana_transaction_status::{TransactionDetails, UiTransactionEncoding};
use std::{sync::Arc, time::Duration};
use tokio::sync::broadcast::{Receiver, Sender};

pub const NUM_PARALLEL_TASKS_DEFAULT: usize = 16;

pub async fn process_block(
rpc_client: &RpcClient,
slot: Slot,
Expand All @@ -42,6 +44,7 @@ pub fn poll_block(
rpc_client: Arc<RpcClient>,
block_notification_sender: Sender<ProducedBlock>,
slot_notification: Receiver<SlotNotification>,
num_parallel_tasks: usize,
) -> Vec<AnyhowJoinHandle> {
let mut tasks: Vec<AnyhowJoinHandle> = vec![];

Expand All @@ -50,7 +53,7 @@ pub fn poll_block(
let (block_schedule_queue_sx, block_schedule_queue_rx) =
async_channel::unbounded::<(Slot, CommitmentConfig)>();

for _i in 0..16 {
for _i in 0..num_parallel_tasks {
let block_notification_sender = block_notification_sender.clone();
let rpc_client = rpc_client.clone();
let block_schedule_queue_rx = block_schedule_queue_rx.clone();
Expand Down
2 changes: 1 addition & 1 deletion cluster-endpoints/src/rpc_polling/poll_slots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub fn poll_slots(
}
Ok(None) => log::error!("got nothing from slot update notifier"),
Err(err) => {
log::warn!("failed to receive slot update: {err}");
log::debug!("timeout on receive slot update: {err}");
// force update the slot
// estimated slot should not go ahead more than 32 slots
// this is because it may be a slot block
Expand Down
17 changes: 17 additions & 0 deletions core/src/iterutils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
pub enum Uniqueness {
ExactlyOne,
Multiple(usize),
Empty,
}

impl Uniqueness {
pub fn inspect_len(len: usize) -> Uniqueness {
if len == 1 {
Uniqueness::ExactlyOne
} else if len == 0 {
Uniqueness::Empty
} else {
Uniqueness::Multiple(len)
}
}
}
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod commitment_utils;
pub mod encoding;
pub mod iterutils;
pub mod keypair_loader;
pub mod quic_connection;
pub mod quic_connection_utils;
Expand Down
7 changes: 2 additions & 5 deletions core/src/stores/block_information_store.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
use dashmap::DashMap;
use log::info;

use solana_sdk::{
clock::MAX_RECENT_BLOCKHASHES,
commitment_config::{CommitmentConfig, CommitmentLevel},
slot_history::Slot,
};
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
use solana_sdk::{clock::MAX_RECENT_BLOCKHASHES, slot_history::Slot};
use std::sync::Arc;
use tokio::sync::RwLock;

Expand Down
32 changes: 31 additions & 1 deletion core/src/structures/epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::epoch_info::EpochInfo;
use solana_sdk::slot_history::Slot;
use solana_sdk::sysvar::epoch_schedule::EpochSchedule;
use std::fmt::Display;
use std::sync::Arc;

#[derive(Debug, Copy, Clone, PartialOrd, PartialEq, Eq, Ord)]
Expand All @@ -14,8 +15,11 @@ pub struct Epoch {
pub absolute_slot: Slot,
}

#[derive(Debug, Copy, Clone, PartialOrd, PartialEq, Eq, Ord, Hash)]
pub struct EpochRef(u64);

impl Epoch {
pub fn into_epoch_info(&self, block_height: u64, transaction_count: Option<u64>) -> EpochInfo {
pub fn as_epoch_info(&self, block_height: u64, transaction_count: Option<u64>) -> EpochInfo {
EpochInfo {
epoch: self.epoch,
slot_index: self.slot_index,
Expand All @@ -27,6 +31,32 @@ impl Epoch {
}
}

impl Display for EpochRef {
grooviegermanikus marked this conversation as resolved.
Show resolved Hide resolved
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.get_epoch())
}
}

impl From<Epoch> for EpochRef {
fn from(epoch: Epoch) -> Self {
Self(epoch.epoch)
}
}

impl EpochRef {
pub fn new(epoch: u64) -> Self {
Self(epoch)
}

pub fn get_epoch(&self) -> u64 {
self.0
}

pub fn get_next_epoch(&self) -> Self {
Self(self.0 + 1)
}
}

#[derive(Clone)]
pub struct EpochCache {
epoch_schedule: Arc<EpochSchedule>,
Expand Down
23 changes: 20 additions & 3 deletions core/src/structures/produced_block.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::{
borsh0_10::try_from_slice_unchecked,
commitment_config::CommitmentConfig,
compute_budget::{self, ComputeBudgetInstruction},
slot_history::Slot,
transaction::TransactionError,
Expand All @@ -23,7 +23,8 @@
pub message: String,
}

#[derive(Default, Debug, Clone)]
// TODO try to remove Clone
#[derive(Debug, Clone)]
pub struct ProducedBlock {
pub transactions: Vec<TransactionInfo>,
pub leader_id: Option<String>,
Expand Down Expand Up @@ -122,7 +123,7 @@
if let Some((units, additional_fee)) = legacy_compute_budget {
cu_requested = Some(units);
if additional_fee > 0 {
prioritization_fees = Some(((units * 1000) / additional_fee).into())
prioritization_fees = Some(calc_prioritization_fees(units, additional_fee))
}
};

Expand Down Expand Up @@ -162,7 +163,23 @@
block_time,
slot,
commitment_config,
rewards,

Check warning on line 166 in core/src/structures/produced_block.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

Diff in /home/runner/work/lite-rpc/lite-rpc/core/src/structures/produced_block.rs

Check warning on line 166 in core/src/structures/produced_block.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

Diff in /home/runner/work/lite-rpc/lite-rpc/core/src/structures/produced_block.rs
}
}


}

fn calc_prioritization_fees(units: u32, additional_fee: u32) -> u64 {
(units as u64 * 1000) / additional_fee as u64

Check warning on line 174 in core/src/structures/produced_block.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

Diff in /home/runner/work/lite-rpc/lite-rpc/core/src/structures/produced_block.rs

Check warning on line 174 in core/src/structures/produced_block.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

Diff in /home/runner/work/lite-rpc/lite-rpc/core/src/structures/produced_block.rs
}

#[test]
fn overflow_u32() {
// value high enough to overflow u32 if multiplied by 1000
let units: u32 = 4_000_000_000;
let additional_fee: u32 = 100;
let prioritization_fees: u64 = calc_prioritization_fees(units, additional_fee);

assert_eq!(40_000_000_000, prioritization_fees);
}
19 changes: 0 additions & 19 deletions core/src/traits/block_storage_interface.rs

This file was deleted.

1 change: 0 additions & 1 deletion core/src/traits/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
pub mod block_storage_interface;
pub mod leaders_fetcher_interface;
pub mod subscription_sink;
17 changes: 15 additions & 2 deletions history/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,30 @@ solana-rpc-client = { workspace = true }

dashmap = {workspace = true}
async-trait = { workspace = true }
tokio = "1.*"
tokio = { version = "1.28.2", features = ["full", "fs"]}
tokio-util = "0.7"

solana-lite-rpc-core = {workspace = true}
solana-lite-rpc-cluster-endpoints = {workspace = true}
solana-rpc-client-api = {workspace = true}
native-tls = { workspace = true }
postgres-native-tls = { workspace = true }
anyhow = { workspace = true }
log = {workspace = true}
tracing-subscriber = { workspace = true, features = ["std", "env-filter"] }
chrono = {workspace = true}
serde = { workspace = true }
serde_json = { workspace = true }
jsonrpsee = { workspace = true }
bincode = {workspace = true}
base64 = {workspace = true}
itertools = {workspace = true}
rangetools = {workspace = true}
tokio-postgres = { version = "0.7.8", features = ["with-chrono-0_4"] }
serde = { workspace = true }
futures = {workspace = true}
futures-util = {workspace = true}
bytes = "1.5.0"
rand = "0.8.5"

[dev-dependencies]
tracing-subscriber = { workspace = true }
Loading
Loading