Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mobile Packet Verifier Add new Metric for pending_dc_burn #881

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 5 additions & 24 deletions mobile_packet_verifier/src/accumulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@ use helium_proto::services::poc_mobile::{
};
use sqlx::{Postgres, Transaction};

use crate::{event_ids, MobileConfigResolverExt};
use crate::{event_ids, pending_burns, MobileConfigResolverExt};

pub async fn accumulate_sessions(
mobile_config: &impl MobileConfigResolverExt,
conn: &mut Transaction<'_, Postgres>,
txn: &mut Transaction<'_, Postgres>,
verified_data_session_report_sink: &FileSinkClient<VerifiedDataTransferIngestReportV1>,
curr_file_ts: DateTime<Utc>,
reports: impl Stream<Item = DataTransferSessionIngestReport>,
) -> anyhow::Result<()> {
tokio::pin!(reports);

while let Some(report) = reports.next().await {
let report_validity = verify_report(conn, mobile_config, &report).await?;
let report_validity = verify_report(txn, mobile_config, &report).await?;
write_verified_report(
verified_data_session_report_sink,
report_validity,
Expand All @@ -37,26 +37,7 @@ pub async fn accumulate_sessions(
continue;
}

let event = report.report.data_transfer_usage;
sqlx::query(
r#"
INSERT INTO data_transfer_sessions (pub_key, payer, uploaded_bytes, downloaded_bytes, rewardable_bytes, first_timestamp, last_timestamp)
VALUES ($1, $2, $3, $4, $5, $6, $6)
ON CONFLICT (pub_key, payer) DO UPDATE SET
uploaded_bytes = data_transfer_sessions.uploaded_bytes + EXCLUDED.uploaded_bytes,
downloaded_bytes = data_transfer_sessions.downloaded_bytes + EXCLUDED.downloaded_bytes,
rewardable_bytes = data_transfer_sessions.rewardable_bytes + EXCLUDED.rewardable_bytes,
last_timestamp = GREATEST(data_transfer_sessions.last_timestamp, EXCLUDED.last_timestamp)
"#
)
.bind(event.pub_key)
.bind(event.payer)
.bind(event.upload_bytes as i64)
.bind(event.download_bytes as i64)
.bind(report.report.rewardable_bytes as i64)
.bind(curr_file_ts)
.execute(&mut *conn)
.await?;
pending_burns::save(&mut *txn, &report.report, curr_file_ts).await?;
}

Ok(())
Expand Down Expand Up @@ -125,7 +106,7 @@ mod tests {
use helium_proto::services::poc_mobile::DataTransferRadioAccessTechnology;
use sqlx::PgPool;

use crate::burner::DataTransferSession;
use crate::pending_burns::DataTransferSession;

use super::*;

Expand Down
103 changes: 11 additions & 92 deletions mobile_packet_verifier/src/burner.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,10 @@
use chrono::{DateTime, Utc};
use file_store::{file_sink::FileSinkClient, traits::TimestampEncode};
use file_store::file_sink::FileSinkClient;
use helium_crypto::PublicKeyBinary;
use helium_proto::services::packet_verifier::ValidDataTransferSession;
use solana::burn::SolanaNetwork;
use sqlx::{FromRow, Pool, Postgres};
use std::collections::HashMap;
use sqlx::{Pool, Postgres};

#[derive(FromRow)]
pub struct DataTransferSession {
pub_key: PublicKeyBinary,
payer: PublicKeyBinary,
uploaded_bytes: i64,
downloaded_bytes: i64,
rewardable_bytes: i64,
first_timestamp: DateTime<Utc>,
last_timestamp: DateTime<Utc>,
}

#[derive(Default)]
pub struct PayerTotals {
total_dcs: u64,
sessions: Vec<DataTransferSession>,
}

impl PayerTotals {
fn push_sess(&mut self, sess: DataTransferSession) {
self.total_dcs += bytes_to_dc(sess.rewardable_bytes as u64);
self.sessions.push(sess);
}
}
use crate::pending_burns;

pub struct Burner<S> {
valid_sessions: FileSinkClient<ValidDataTransferSession>,
Expand All @@ -44,49 +20,17 @@ impl<S> Burner<S> {
}
}

#[derive(thiserror::Error, Debug)]
pub enum BurnError<E> {
#[error("file store error: {0}")]
FileStoreError(#[from] file_store::Error),
#[error("sql error: {0}")]
SqlError(#[from] sqlx::Error),
#[error("solana error: {0}")]
SolanaError(E),
}

impl<S> Burner<S>
where
S: SolanaNetwork,
{
pub async fn burn(&self, pool: &Pool<Postgres>) -> Result<(), BurnError<S::Error>> {
// Fetch all of the sessions
let sessions: Vec<DataTransferSession> =
sqlx::query_as("SELECT * FROM data_transfer_sessions")
.fetch_all(pool)
.await?;

// Fetch all of the sessions and group by the payer
let mut payer_totals = HashMap::<PublicKeyBinary, PayerTotals>::new();
for session in sessions.into_iter() {
payer_totals
.entry(session.payer.clone())
.or_default()
.push_sess(session);
}
pub async fn burn(&self, pool: &Pool<Postgres>) -> anyhow::Result<()> {
for payer_pending_burn in pending_burns::get_all_payer_burns(pool).await? {
let payer = payer_pending_burn.payer;
let total_dcs = payer_pending_burn.total_dcs;
let sessions = payer_pending_burn.sessions;

for (
payer,
PayerTotals {
total_dcs,
sessions,
},
) in payer_totals.into_iter()
{
let payer_balance = self
.solana
.payer_balance(&payer)
.await
.map_err(BurnError::SolanaError)?;
let payer_balance = self.solana.payer_balance(&payer).await?;

if payer_balance < total_dcs {
tracing::warn!(%payer, %payer_balance, %total_dcs, "Payer does not have enough balance to burn dcs");
Expand All @@ -107,28 +51,11 @@ where
.increment(total_dcs);

// Delete from the data transfer session and write out to S3

sqlx::query("DELETE FROM data_transfer_sessions WHERE payer = $1")
.bind(&payer)
.execute(pool)
.await?;
pending_burns::delete_for_payer(pool, &payer, total_dcs).await?;

for session in sessions {
let num_dcs = bytes_to_dc(session.rewardable_bytes as u64);
self.valid_sessions
.write(
ValidDataTransferSession {
pub_key: session.pub_key.into(),
payer: session.payer.into(),
upload_bytes: session.uploaded_bytes as u64,
download_bytes: session.downloaded_bytes as u64,
rewardable_bytes: session.rewardable_bytes as u64,
num_dcs,
first_timestamp: session.first_timestamp.encode_timestamp_millis(),
last_timestamp: session.last_timestamp.encode_timestamp_millis(),
},
&[],
)
.write(ValidDataTransferSession::from(session), &[])
.await?;
}
}
Expand All @@ -146,11 +73,3 @@ where
Ok(())
}
}

const BYTES_PER_DC: u64 = 20_000;

fn bytes_to_dc(bytes: u64) -> u64 {
let bytes = bytes.max(BYTES_PER_DC);
// Integer div/ceil from: https://stackoverflow.com/a/2745086
(bytes + BYTES_PER_DC - 1) / BYTES_PER_DC
}
35 changes: 19 additions & 16 deletions mobile_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
burner::Burner, event_ids::EventIdPurger, settings::Settings, MobileConfigClients,
MobileConfigResolverExt,
burner::Burner, event_ids::EventIdPurger, pending_burns, settings::Settings,
MobileConfigClients, MobileConfigResolverExt,
};
use anyhow::{bail, Result};
use chrono::{TimeZone, Utc};
Expand Down Expand Up @@ -73,23 +73,13 @@ where
S: SolanaNetwork,
MCR: MobileConfigResolverExt,
{
pub async fn run(mut self, shutdown: triggered::Listener) -> Result<()> {
pub async fn run(mut self, mut shutdown: triggered::Listener) -> Result<()> {
// Set the initial burn period to one minute
let mut burn_time = Instant::now() + Duration::from_secs(60);
loop {
tokio::select! {
file = self.reports.recv() => {
let Some(file) = file else {
anyhow::bail!("FileInfoPoller sender was dropped unexpectedly");
};
tracing::info!("Verifying file: {}", file.file_info);
let ts = file.file_info.timestamp;
let mut transaction = self.pool.begin().await?;
let reports = file.into_stream(&mut transaction).await?;
crate::accumulate::accumulate_sessions(&self.mobile_config_resolver, &mut transaction, &self.verified_data_session_report_sink, ts, reports).await?;
transaction.commit().await?;
self.verified_data_session_report_sink.commit().await?;
},
biased;
_ = &mut shutdown => return Ok(()),
_ = sleep_until(burn_time) => {
// It's time to burn
match self.burner.burn(&self.pool).await {
Expand All @@ -102,7 +92,18 @@ where
}
}
}
_ = shutdown.clone() => return Ok(()),
file = self.reports.recv() => {
let Some(file) = file else {
anyhow::bail!("FileInfoPoller sender was dropped unexpectedly");
};
tracing::info!("Verifying file: {}", file.file_info);
let ts = file.file_info.timestamp;
let mut transaction = self.pool.begin().await?;
let reports = file.into_stream(&mut transaction).await?;
crate::accumulate::accumulate_sessions(&self.mobile_config_resolver, &mut transaction, &self.verified_data_session_report_sink, ts, reports).await?;
transaction.commit().await?;
self.verified_data_session_report_sink.commit().await?;
}
}
}
}
Expand All @@ -119,6 +120,8 @@ impl Cmd {
let pool = settings.database.connect("mobile-packet-verifier").await?;
sqlx::migrate!().run(&pool).await?;

pending_burns::initialize(&pool).await?;

// Set up the solana network:
let solana = if settings.enable_solana_integration {
let Some(ref solana_settings) = settings.solana else {
Expand Down
1 change: 1 addition & 0 deletions mobile_packet_verifier/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod accumulate;
pub mod burner;
pub mod daemon;
pub mod event_ids;
pub mod pending_burns;
pub mod settings;

pub struct MobileConfigClients {
Expand Down
Loading