Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit retries on dispense #63

Merged
merged 3 commits into from
Mar 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 27 additions & 35 deletions src/dispense_tracker.rs
Original file line number Diff line number Diff line change
@@ -1,45 +1,30 @@
use std::{
cmp::Ordering,
collections::{BinaryHeap, HashMap, HashSet},
};
use std::collections::BTreeMap;
use std::collections::{HashMap, HashSet};
use std::time::{SystemTime, UNIX_EPOCH};

use fuel_types::Address;

#[derive(Debug, Eq, PartialEq)]
pub struct Entry {
address: Address,
timestamp: u64,
}

impl Ord for Entry {
fn cmp(&self, other: &Self) -> Ordering {
self.timestamp.cmp(&other.timestamp)
}
}

impl PartialOrd for Entry {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

pub trait Clock: std::fmt::Debug + Send + Sync {
fn now(&self) -> u64;
}

#[derive(Debug)]
pub struct TokioTime {}
pub struct StdTime {}

impl Clock for TokioTime {
impl Clock for StdTime {
fn now(&self) -> u64 {
tokio::time::Instant::now().elapsed().as_secs()
let start = SystemTime::now();
let since_the_epoch = start
.duration_since(UNIX_EPOCH)
.expect("Time went backwards");
since_the_epoch.as_secs()
}
}

#[derive(Debug)]
pub struct DispenseTracker {
tracked: HashMap<Address, u64>,
queue: BinaryHeap<Entry>,
queue: BTreeMap<u64, Vec<Address>>,
in_progress: HashSet<Address>,
clock: Box<dyn Clock>,
}
Expand All @@ -48,9 +33,9 @@ impl Default for DispenseTracker {
fn default() -> Self {
Self {
tracked: HashMap::default(),
queue: BinaryHeap::default(),
queue: Default::default(),
in_progress: HashSet::default(),
clock: Box::new(TokioTime {}),
clock: Box::new(StdTime {}),
}
}
}
Expand All @@ -59,7 +44,7 @@ impl DispenseTracker {
pub fn new(clock: impl Clock + 'static) -> Self {
Self {
tracked: HashMap::new(),
queue: BinaryHeap::new(),
queue: Default::default(),
in_progress: HashSet::new(),
clock: Box::new(clock),
}
Expand All @@ -70,7 +55,7 @@ impl DispenseTracker {

let timestamp = self.clock.now();
self.tracked.insert(address, timestamp);
self.queue.push(Entry { address, timestamp });
self.queue.entry(timestamp).or_default().push(address);
}

pub fn mark_in_progress(&mut self, address: Address) {
Expand All @@ -84,17 +69,24 @@ impl DispenseTracker {
pub fn evict_expired_entries(&mut self, eviction_duration: u64) {
let now = self.clock.now();

while let Some(oldest_entry) = self.queue.peek() {
if now - oldest_entry.timestamp > eviction_duration {
let removed_entry = self.queue.pop().unwrap();
self.tracked.remove(&removed_entry.address);
while let Some(oldest_entry) = self.queue.first_entry() {
if now - oldest_entry.key() > eviction_duration {
let (_, addresses) = oldest_entry.remove_entry();

for address in addresses {
self.tracked.remove(&address);
}
} else {
break;
}
}
}

pub fn has_tracked(&self, address: &Address) -> bool {
self.tracked.get(address).is_some() || self.in_progress.contains(address)
self.tracked.get(address).is_some()
}

pub fn is_in_progress(&self, address: &Address) -> bool {
self.in_progress.contains(address)
}
}
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ mod dispense_tracker;
mod recaptcha;
mod routes;

pub use dispense_tracker::{Clock, TokioTime};
pub use dispense_tracker::{Clock, StdTime};
pub use routes::THE_BIGGEST_AMOUNT;

#[derive(Debug)]
Expand Down
4 changes: 2 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use fuel_faucet::{config::Config, start_server, TokioTime};
use fuel_faucet::{config::Config, start_server, StdTime};
use tracing_subscriber::EnvFilter;

#[tokio::main]
async fn main() {
let config = Config::default();
init_logger(&config);
let clock = TokioTime {};
let clock = StdTime {};
let (_, task) = start_server(config, clock).await;
let _ = task.await.unwrap();
}
Expand Down
10 changes: 10 additions & 0 deletions src/models.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::fmt::{self, Display, Formatter};

use reqwest::StatusCode;
use serde::{Deserialize, Serialize};

Expand All @@ -24,3 +26,11 @@ pub struct DispenseError {
pub status: StatusCode,
pub error: String,
}

impl Display for DispenseError {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "{self:?}")
}
}

impl std::error::Error for DispenseError {}
83 changes: 54 additions & 29 deletions src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@ fn check_and_mark_dispense_limit(
));
}

if tracker.is_in_progress(&address) {
return Err(error(
"Account is already in the process of receiving assets".to_string(),
StatusCode::TOO_MANY_REQUESTS,
));
}

tracker.mark_in_progress(address);
Ok(())
}
Expand Down Expand Up @@ -238,27 +245,37 @@ pub async fn dispense_tokens(
}

check_and_mark_dispense_limit(&dispense_tracker, address, config.dispense_limit_interval)?;
let cleanup = || {

struct CleanUpper<Fn>(Fn)
where
Fn: FnMut();

impl<Fn> Drop for CleanUpper<Fn>
where
Fn: FnMut(),
{
fn drop(&mut self) {
self.0();
}
}

// We want to remove the address from `in_progress` regardless of the outcome of the transaction.
let _cleanup = CleanUpper(|| {
dispense_tracker
.lock()
.unwrap()
.remove_in_progress(&address);
};
});

let provider = wallet.provider().expect("client provider");
let mut tx_id;

loop {
let mut tx_id = None;
for _ in 0..5 {
let mut guard = state.lock().await;
let coin_output = if let Some(previous_coin_output) = &guard.last_output {
*previous_coin_output
} else {
get_coin_output(&wallet, config.dispense_amount)
.await
.map_err(|e| {
cleanup();
e
})?
get_coin_output(&wallet, config.dispense_amount).await?
};

let coin_type = CoinType::Coin(Coin {
Expand Down Expand Up @@ -296,55 +313,63 @@ pub async fn dispense_tokens(
.fee_checked_from_tx(&network_config.network_info.consensus_parameters)
.expect("Should be able to calculate fee");

tx_id = script.id(network_config.network_info.consensus_parameters.chain_id);
let id = script.id(network_config.network_info.consensus_parameters.chain_id);
let result = tokio::time::timeout(
Duration::from_secs(config.timeout),
provider.send_transaction(script),
)
.await
.map(|r| {
.map_err(|_| {
error(
format!("Timeout while submitting transaction for address: {address:X}"),
StatusCode::INTERNAL_SERVER_ERROR,
)
})
.and_then(|r| {
r.map_err(|e| {
error(
format!("Failed to submit transaction: {e}"),
format!(
"Failed to submit transaction for address: {address:X} with error: {}",
e
),
StatusCode::INTERNAL_SERVER_ERROR,
)
})
})
.map_err(|e| {
error(
format!("Timeout while submitting transaction: {e}"),
StatusCode::INTERNAL_SERVER_ERROR,
)
});

match result {
Ok(Ok(_)) => {
Ok(_) => {
guard.last_output = Some(CoinOutput {
utxo_id: UtxoId::new(tx_id, 1),
utxo_id: UtxoId::new(id, 1),
owner: coin_output.owner,
amount: coin_output.amount - total_fee.min_fee() - config.dispense_amount,
});
tx_id = Some(id);
break;
}
_ => {
Err(e) => {
tracing::warn!("{}", e);
guard.last_output = None;
}
};
}

submit_tx_with_timeout(&client, &tx_id, config.timeout)
.await
.map_err(|e| {
cleanup();
e
})?;
let Some(tx_id) = tx_id else {
return Err(error(
"Failed to submit transaction".to_string(),
StatusCode::INTERNAL_SERVER_ERROR,
));
};

submit_tx_with_timeout(&client, &tx_id, config.timeout).await?;

info!(
"dispensed {} tokens to {:#x}",
config.dispense_amount, &address
);

dispense_tracker.lock().unwrap().track(address);
let mut tracker = dispense_tracker.lock().unwrap();
tracker.track(address);

Ok(DispenseResponse {
status: "Success".to_string(),
Expand Down
Loading