From 093e1abc7bf22dc2cd3ebfd738e993e1bb8ac846 Mon Sep 17 00:00:00 2001 From: Pranay Tulugu Date: Thu, 26 Aug 2021 11:46:58 -0700 Subject: [PATCH] Migrated Exit Manager and QueryExitDebts to async await. Removed the futures based http client in these handlers and replaced them with the async http client 'awc' --- Cargo.lock | 37 +- rita_client/Cargo.toml | 2 + rita_client/src/dashboard/prices.rs | 4 +- rita_client/src/exit_manager/mod.rs | 461 +++++++++----------- rita_client/src/light_client_manager/mod.rs | 4 +- rita_client/src/rita_loop/mod.rs | 60 ++- rita_client/src/traffic_watcher/mod.rs | 249 +++++------ 7 files changed, 407 insertions(+), 410 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index af971f7ba..abe13d077 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -108,6 +108,7 @@ dependencies = [ "ahash", "base64 0.13.0", "bitflags", + "brotli2", "bytes 1.0.1", "bytestring", "derive_more", @@ -267,7 +268,7 @@ dependencies = [ "bitflags", "byteorder", "bytes 0.4.12", - "cookie", + "cookie 0.11.4", "encoding", "failure", "futures 0.1.31", @@ -607,6 +608,7 @@ dependencies = [ "base64 0.13.0", "bytes 1.0.1", "cfg-if 1.0.0", + "cookie 0.15.1", "derive_more", "futures-core", "itoa", @@ -770,6 +772,26 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d696c370c750c948ada61c69a0ee2cbbb9c50b1019ddb86d9317157a99c2cae" +[[package]] +name = "brotli-sys" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4445dea95f4c2b41cde57cc9fee236ae4dbae88d8fcbdb4750fc1bb5d86aaecd" +dependencies = [ + "cc", + "libc", +] + +[[package]] +name = "brotli2" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0cb036c3eade309815c15ddbacec5b22c4d1f3983a774ab2eac2e3e9ea85568e" +dependencies = [ + "brotli-sys", + "libc", +] + [[package]] name = "bufstream" version = "0.1.4" @@ -963,6 +985,17 @@ dependencies = [ "time 0.1.43", ] +[[package]] +name = "cookie" +version = "0.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5f1c7727e460397e56abc4bddc1d49e07a1ad78fc98eb2e1c8f032a58a2f80d" +dependencies = [ + "percent-encoding 2.1.0", + "time 0.2.27", + "version_check 0.9.3", +] + [[package]] name = "core-foundation" version = "0.9.1" @@ -3238,11 +3271,13 @@ dependencies = [ name = "rita_client" version = "0.18.1" dependencies = [ + "actix 0.12.0", "actix 0.7.11", "actix-web 0.7.19", "althea_kernel_interface", "althea_types", "arrayvec 0.7.1", + "awc", "babel_monitor", "babel_monitor_legacy", "clarity", diff --git a/rita_client/Cargo.toml b/rita_client/Cargo.toml index 2654c199c..21b47f1a5 100644 --- a/rita_client/Cargo.toml +++ b/rita_client/Cargo.toml @@ -35,6 +35,8 @@ sodiumoxide = "0.2" clu = { path = "../clu" } web30 = {git = "https://github.com/althea-net/web30", rev = "a600b8b8ebf2c49badee6b65ac90a56bbdad4ab9"} actix = "0.7" +awc = "3.0.0-beta.7" +actix-async = {package="actix", version = "0.12"} actix-web = { version = "0.7", default_features = false, features= ["ssl"] } clarity = "0.4" diff --git a/rita_client/src/dashboard/prices.rs b/rita_client/src/dashboard/prices.rs index 2bee692de..2885850a4 100644 --- a/rita_client/src/dashboard/prices.rs +++ b/rita_client/src/dashboard/prices.rs @@ -1,5 +1,5 @@ use crate::traffic_watcher::GetExitDestPrice; -use crate::traffic_watcher::TrafficWatcher; +use crate::traffic_watcher::TrafficWatcherActor; use actix::SystemService; use actix_web::Path; @@ -47,7 +47,7 @@ pub fn get_prices(_req: HttpRequest) -> Box, Erro debug!("/prices GET hit"); let payment = settings::get_rita_client().payment; - let f = TrafficWatcher::from_registry().send(GetExitDestPrice); + let f = TrafficWatcherActor::from_registry().send(GetExitDestPrice); let b = f.from_err().and_then(move |exit_dest_price| { let exit_dest_price = exit_dest_price.unwrap(); let simulated_tx_fee = payment.simulated_transaction_fee; diff --git a/rita_client/src/exit_manager/mod.rs b/rita_client/src/exit_manager/mod.rs index be32e4064..e6205a0b6 100644 --- a/rita_client/src/exit_manager/mod.rs +++ b/rita_client/src/exit_manager/mod.rs @@ -16,12 +16,10 @@ //! //! Signup is complete and the user may use the connection -use crate::rita_loop::Tick; use crate::rita_loop::CLIENT_LOOP_TIMEOUT; -use crate::traffic_watcher::{QueryExitDebts, TrafficWatcher}; -use actix::registry::SystemService; -use actix::{Actor, Arbiter, Context, Handler, ResponseFuture, Supervised}; +use crate::traffic_watcher::{query_exit_debts, QueryExitDebts}; use actix_web::client::Connection; + use actix_web::{client, HttpMessage, Result}; use althea_kernel_interface::{ exit_client_tunnel::ClientExitTunnelConfig, DefaultRoute, KernelInterfaceError, @@ -31,12 +29,9 @@ use althea_types::ExitDetails; use althea_types::WgKey; use althea_types::{EncryptedExitClientIdentity, EncryptedExitState}; use althea_types::{ExitClientIdentity, ExitRegistrationDetails, ExitState, ExitVerifMode}; -use babel_monitor_legacy::open_babel_stream_legacy; -use babel_monitor_legacy::parse_routes_legacy; -use babel_monitor_legacy::start_connection_legacy; +use babel_monitor::{open_babel_stream, parse_routes}; use failure::Error; use futures01::future; -use futures01::future::join_all; use futures01::Future; use rita_common::blockchain_oracle::low_balance; use rita_common::KI; @@ -45,9 +40,15 @@ use sodiumoxide::crypto::box_; use sodiumoxide::crypto::box_::curve25519xsalsa20poly1305::Nonce; use sodiumoxide::crypto::box_::curve25519xsalsa20poly1305::PublicKey; use std::net::SocketAddr; +use std::sync::Arc; +use std::sync::RwLock; use std::time::Duration; use tokio::net::TcpStream as TokioTcpStream; -use tokio::util::FutureExt; + +lazy_static! { + pub static ref EXIT_MANAGER: Arc> = + Arc::new(RwLock::new(ExitManager::default())); +} fn linux_setup_exit_tunnel( current_exit: &ExitServer, @@ -97,20 +98,19 @@ fn remove_nat() { } } -pub fn get_exit_info(to: &SocketAddr) -> impl Future { +pub async fn get_exit_info(to: &SocketAddr) -> Result { let endpoint = format!("http://[{}]:{}/exit_info", to.ip(), to.port()); - let stream = TokioTcpStream::connect(to); + let client = awc::Client::default(); + let mut response = match client.get(&endpoint).send().await { + Ok(a) => a, + Err(e) => { + bail!("Error with get request for exit info: {}", e); + } + }; + let response_json = response.json().await?; - stream.from_err().and_then(move |stream| { - client::get(&endpoint) - .with_connection(Connection::from_stream(stream)) - .finish() - .unwrap() - .send() - .from_err() - .and_then(|response| response.json().from_err().and_then(Ok)) - }) + Ok(response_json) } fn encrypt_exit_client_id( @@ -200,60 +200,51 @@ fn send_exit_setup_request( }) } -fn send_exit_status_request( +async fn send_exit_status_request( exit_pubkey: WgKey, to: &SocketAddr, ident: ExitClientIdentity, -) -> impl Future { +) -> Result { let endpoint = format!("http://[{}]:{}/secure_status", to.ip(), to.port()); let ident = encrypt_exit_client_id(&exit_pubkey.into(), ident); - let stream = TokioTcpStream::connect(to); + let client = awc::Client::default(); + let response = client + .post(&endpoint) + .timeout(CLIENT_LOOP_TIMEOUT) + .send_json(&ident) + .await; - stream.from_err().and_then(move |stream| { - client::post(&endpoint) - .timeout(CLIENT_LOOP_TIMEOUT) - .with_connection(Connection::from_stream(stream)) - .json(ident) - .unwrap() - .send() - .from_err() - .and_then(move |response| { - response - .json() - .from_err() - .and_then(move |value: EncryptedExitState| { - decrypt_exit_state(value, exit_pubkey.into()) - }) - }) - }) + let mut response = match response { + Ok(a) => a, + Err(e) => bail!("Error with post request for exit status: {}", e), + }; + + let value = response.json().await?; + + decrypt_exit_state(value, exit_pubkey.into()) } -fn exit_general_details_request(exit: String) -> impl Future { +async fn exit_general_details_request(exit: String) -> Result<(), Error> { let current_exit = match settings::get_rita_client().exit_client.exits.get(&exit) { Some(current_exit) => current_exit.clone(), None => { - return Box::new(future::err(format_err!("No valid exit for {}", exit))) - as Box>; + return Err(format_err!("No valid exit for {}", exit)); } }; let endpoint = SocketAddr::new(current_exit.id.mesh_ip, current_exit.registration_port); trace!("sending exit general details request to {}", exit); - let r = get_exit_info(&endpoint).and_then(move |exit_details| { - let mut rita_client = settings::get_rita_client(); - let current_exit = match rita_client.exit_client.exits.get_mut(&exit) { - Some(exit) => exit, - None => bail!("Could not find exit {}", exit), - }; - current_exit.info = exit_details; - - settings::set_rita_client(rita_client); - Ok(()) - }); - - Box::new(r) + let exit_details = get_exit_info(&endpoint).await?; + let mut rita_client = settings::get_rita_client(); + let current_exit = match rita_client.exit_client.exits.get_mut(&exit) { + Some(exit) => exit, + None => bail!("Could not find exit {}", exit), + }; + current_exit.info = exit_details; + settings::set_rita_client(rita_client); + Ok(()) } pub fn exit_setup_request( @@ -340,17 +331,16 @@ pub fn exit_setup_request( ) } -fn exit_status_request(exit: String) -> impl Future { +async fn exit_status_request(exit: String) -> Result<(), Error> { let current_exit = match settings::get_rita_client().exit_client.exits.get(&exit) { Some(current_exit) => current_exit.clone(), None => { - return Box::new(future::err(format_err!("No valid exit for {}", exit))) - as Box>; + return Err(format_err!("No valid exit for {}", exit)); } }; let reg_details = match settings::get_rita_client().exit_client.contact_info { Some(val) => val.into(), - None => return Box::new(future::err(format_err!("No valid details"))), + None => return Err(format_err!("No valid details")), }; let exit_server = current_exit.id.mesh_ip; @@ -359,9 +349,7 @@ fn exit_status_request(exit: String) -> impl Future { global: match settings::get_rita_client().get_identity() { Some(id) => id, None => { - return Box::new(future::err(format_err!( - "Identity has no mesh IP ready yet" - ))); + return Err(format_err!("Identity has no mesh IP ready yet")); } }, wg_port: settings::get_rita_client().exit_client.wg_listen_port, @@ -376,44 +364,24 @@ fn exit_status_request(exit: String) -> impl Future { endpoint ); - let r = - send_exit_status_request(exit_pubkey, &endpoint, ident).and_then(move |exit_response| { - let mut rita_client = settings::get_rita_client(); - - let current_exit = match rita_client.exit_client.exits.get_mut(&exit) { - Some(exit_struct) => exit_struct, - None => bail!("Could not find exit {:?}", exit), - }; - - current_exit.info = exit_response.clone(); - settings::set_rita_client(rita_client); - - trace!("Got exit status response {:?}", exit_response); - - Ok(()) - }); - - Box::new(r) + let exit_response = send_exit_status_request(exit_pubkey, &endpoint, ident).await?; + let mut rita_client = settings::get_rita_client(); + let current_exit = match rita_client.exit_client.exits.get_mut(&exit) { + Some(exit_struct) => exit_struct, + None => bail!("Could not find exit {:?}", exit), + }; + current_exit.info = exit_response.clone(); + settings::set_rita_client(rita_client); + trace!("Got exit status response {:?}", exit_response); + Ok(()) } /// An actor which pays the exit #[derive(Default)] pub struct ExitManager { // used to determine if we've changed exits - last_exit: Option, - nat_setup: bool, -} - -impl Actor for ExitManager { - type Context = Context; -} - -impl Supervised for ExitManager {} -impl SystemService for ExitManager { - fn service_started(&mut self, _ctx: &mut Context) { - info!("Exit Manager started"); - self.last_exit = None; - } + pub last_exit: Option, + pub nat_setup: bool, } fn correct_default_route(input: Option) -> bool { @@ -423,174 +391,161 @@ fn correct_default_route(input: Option) -> bool { } } -impl Handler for ExitManager { - type Result = ResponseFuture<(), Error>; - - fn handle(&mut self, _: Tick, _ctx: &mut Context) -> Self::Result { - // scopes our access to SETTING and prevent - // holding a readlock while exit tunnel setup requires a write lock - // roughly the same as a drop(); inline - let client_can_use_free_tier = - { settings::get_rita_client().payment.client_can_use_free_tier }; - let exit_server = { - settings::get_rita_client() - .exit_client - .get_current_exit() - .cloned() - }; +pub async fn exit_manager_tick() { + // scopes our access to SETTING and prevent + // holding a readlock while exit tunnel setup requires a write lock + // roughly the same as a drop(); inline + let client_can_use_free_tier = { settings::get_rita_client().payment.client_can_use_free_tier }; + let exit_server = { + settings::get_rita_client() + .exit_client + .get_current_exit() + .cloned() + }; - // code that connects to the current exit server - trace!("About to setup exit tunnel!"); - if let Some(exit) = exit_server { - trace!("We have selected an exit!"); - if let Some(general_details) = exit.info.general_details() { - trace!("We have details for the selected exit!"); - - let signed_up_for_exit = exit.info.our_details().is_some(); - let exit_has_changed = - !(self.last_exit.is_some() && self.last_exit.clone().unwrap() == exit); - let correct_default_route = correct_default_route(KI.get_default_route()); - - match (signed_up_for_exit, exit_has_changed, correct_default_route) { - (true, true, _) => { - info!("Exit change, setting up exit tunnel"); - linux_setup_exit_tunnel( - &exit, - &general_details.clone(), - exit.info.our_details().unwrap(), - ) - .expect("failure setting up exit tunnel"); - self.nat_setup = true; - self.last_exit = Some(exit.clone()); - } - (true, false, false) => { - info!("DHCP overwrite setup exit tunnel again"); - linux_setup_exit_tunnel( - &exit, - &general_details.clone(), - exit.info.our_details().unwrap(), - ) - .expect("failure setting up exit tunnel"); - self.nat_setup = true; - } - _ => {} + // code that connects to the current exit server + info!("About to setup exit tunnel!"); + if let Some(exit) = exit_server { + trace!("We have selected an exit!"); + if let Some(general_details) = exit.info.general_details() { + trace!("We have details for the selected exit!"); + + let signed_up_for_exit = exit.info.our_details().is_some(); + + let mut writer = &mut *EXIT_MANAGER.write().unwrap(); + + let exit_has_changed = + !(writer.last_exit.is_some() && writer.last_exit.clone().unwrap() == exit); + let correct_default_route = correct_default_route(KI.get_default_route()); + + match (signed_up_for_exit, exit_has_changed, correct_default_route) { + (true, true, _) => { + trace!("Exit change, setting up exit tunnel"); + linux_setup_exit_tunnel( + &exit, + &general_details.clone(), + exit.info.our_details().unwrap(), + ) + .expect("failure setting up exit tunnel"); + writer.nat_setup = true; + writer.last_exit = Some(exit.clone()); } - - // Adds and removes the nat rules in low balance situations - // this prevents the free tier from being confusing (partially working) - // when deployments are not interested in having a sufficiently fast one - let low_balance = low_balance(); - let nat_setup = self.nat_setup; - trace!( - "client can use free tier {} low balance {}", - client_can_use_free_tier, - low_balance - ); - match (low_balance, client_can_use_free_tier, nat_setup) { - // remove when we have a low balance, do not have a free tier - // and have a nat setup. - (true, false, true) => { - trace!("removing exit tunnel!"); - remove_nat(); - self.nat_setup = false; - } - // restore when our balance is not low and our nat is not setup - // regardless of the free tier value - (false, _, false) => { - trace!("restoring exit tunnel!"); - restore_nat(); - self.nat_setup = true; - } - // restore if the nat is not setup and the free tier is enabled - // this only happens when settings change under the hood - (true, true, false) => { - trace!("restoring exit tunnel!"); - restore_nat(); - self.nat_setup = true; - } - _ => {} - } - - // run billing at all times when an exit is setup - if signed_up_for_exit { - let exit_price = general_details.exit_price; - let exit_internal_addr = general_details.server_internal_ip; - let exit_port = exit.registration_port; - let exit_id = exit.id; - let babel_port = settings::get_rita_client().network.babel_port; - trace!("We are signed up for the selected exit!"); - - Arbiter::spawn( - open_babel_stream_legacy(babel_port) - .from_err() - .and_then(move |stream| { - start_connection_legacy(stream).and_then(move |stream| { - parse_routes_legacy(stream).and_then(move |routes| { - TrafficWatcher::from_registry().do_send(QueryExitDebts { - exit_id, - exit_price, - routes: routes.1, - exit_internal_addr, - exit_port, - }); - Ok(()) - }) - }) - }) - .timeout(CLIENT_LOOP_TIMEOUT) - .then(|ret| { - if let Err(e) = ret { - error!("Failed to watch client traffic with {:?}", e) - } - Ok(()) - }), - ); + (true, false, false) => { + trace!("DHCP overwrite setup exit tunnel again"); + linux_setup_exit_tunnel( + &exit, + &general_details.clone(), + exit.info.our_details().unwrap(), + ) + .expect("failure setting up exit tunnel"); + writer.nat_setup = true; } + _ => {} } - } - // code that manages requesting details to exits - let servers = { settings::get_rita_client().exit_client.exits }; - - let mut futs: Vec>> = Vec::new(); - - for (k, s) in servers { - match s.info { - ExitState::Denied { .. } | ExitState::Disabled | ExitState::GotInfo { .. } => {} - ExitState::New { .. } => { - futs.push(Box::new(exit_general_details_request(k.clone()).then( - move |res| { - match res { - Ok(_) => { - trace!("exit details request to {} was successful", k); - } - Err(e) => { - trace!("exit details request to {} failed with {:?}", k, e); - } - }; - Ok(()) - }, - ))); + // Adds and removes the nat rules in low balance situations + // this prevents the free tier from being confusing (partially working) + // when deployments are not interested in having a sufficiently fast one + let low_balance = low_balance(); + let nat_setup = writer.nat_setup; + trace!( + "client can use free tier {} low balance {}", + client_can_use_free_tier, + low_balance + ); + match (low_balance, client_can_use_free_tier, nat_setup) { + // remove when we have a low balance, do not have a free tier + // and have a nat setup. + (true, false, true) => { + trace!("removing exit tunnel!"); + remove_nat(); + writer.nat_setup = false; } - ExitState::Registered { .. } => { - futs.push(Box::new(exit_status_request(k.clone()).then(move |res| { - match res { - Ok(_) => { - trace!("exit status request to {} was successful", k); - } - Err(e) => { - trace!("exit status request to {} failed with {:?}", k, e); - } - }; - Ok(()) - }))); + // restore when our balance is not low and our nat is not setup + // regardless of the free tier value + (false, _, false) => { + trace!("restoring exit tunnel!"); + restore_nat(); + writer.nat_setup = true; } - state => { - trace!("Waiting on exit state {:?} for {}", state, k); + // restore if the nat is not setup and the free tier is enabled + // this only happens when settings change under the hood + (true, true, false) => { + trace!("restoring exit tunnel!"); + restore_nat(); + writer.nat_setup = true; } + _ => {} + } + + // run billing at all times when an exit is setup + if signed_up_for_exit { + let exit_price = general_details.exit_price; + let exit_internal_addr = general_details.server_internal_ip; + let exit_port = exit.registration_port; + let exit_id = exit.id; + let babel_port = settings::get_rita_client().network.babel_port; + info!("We are signed up for the selected exit!"); + + let mut stream = match open_babel_stream(babel_port, CLIENT_LOOP_TIMEOUT) { + Ok(a) => a, + Err(_) => { + error!("open babel stream error in exit manager tick"); + return; + } + }; + let routes = match parse_routes(&mut stream) { + Ok(a) => a, + Err(_) => { + error!("Parse routes error in exit manager tick"); + return; + } + }; + + query_exit_debts(QueryExitDebts { + exit_id, + exit_price, + routes, + exit_internal_addr, + exit_port, + }) + .await; } } + } + + // code that manages requesting details to exits + let servers = { settings::get_rita_client().exit_client.exits }; + + for (k, s) in servers { + match s.info { + ExitState::Denied { .. } | ExitState::Disabled | ExitState::GotInfo { .. } => {} - Box::new(join_all(futs).and_then(|_| Ok(()))) as ResponseFuture<(), Error> + ExitState::New { .. } => { + match exit_general_details_request(k.clone()).await { + Ok(_) => { + trace!("exit details request to {} was successful", k); + } + Err(e) => { + trace!("exit details request to {} failed with {:?}", k, e); + } + }; + } + + ExitState::Registered { .. } => { + match exit_status_request(k.clone()).await { + Ok(_) => { + trace!("exit status request to {} was successful", k); + } + Err(e) => { + trace!("exit status request to {} failed with {:?}", k, e); + } + }; + } + + state => { + trace!("Waiting on exit state {:?} for {}", state, k); + } + } } } diff --git a/rita_client/src/light_client_manager/mod.rs b/rita_client/src/light_client_manager/mod.rs index 492b0ed82..9d3b9b18e 100644 --- a/rita_client/src/light_client_manager/mod.rs +++ b/rita_client/src/light_client_manager/mod.rs @@ -5,7 +5,7 @@ //! far as I can tell due to the restrictive nature of how and when Android allows ipv6 routing. use crate::traffic_watcher::GetExitDestPrice; -use crate::traffic_watcher::TrafficWatcher; +use crate::traffic_watcher::TrafficWatcherActor; use rita_common::debt_keeper::traffic_update; use rita_common::debt_keeper::Traffic; use rita_common::peer_listener::Peer; @@ -75,7 +75,7 @@ pub fn light_client_hello_response( ) -> Box> { let their_id = *req.0; let a = LightClientManager::from_registry().send(GetAddress(their_id)); - let b = TrafficWatcher::from_registry().send(GetExitDestPrice); + let b = TrafficWatcherActor::from_registry().send(GetExitDestPrice); Box::new( a.join(b) diff --git a/rita_client/src/rita_loop/mod.rs b/rita_client/src/rita_loop/mod.rs index 9ce1566e4..170cd5ece 100644 --- a/rita_client/src/rita_loop/mod.rs +++ b/rita_client/src/rita_loop/mod.rs @@ -4,7 +4,7 @@ //! This loop manages exit signup based on the settings configuration state and deploys an exit vpn //! tunnel if the signup was successful on the selected exit. -use crate::exit_manager::ExitManager; +use crate::exit_manager::exit_manager_tick; use crate::heartbeat::send_udp_heartbeat; use crate::light_client_manager::light_client_hello_response; use crate::light_client_manager::LightClientManager; @@ -13,22 +13,26 @@ use crate::operator_fee_manager::OperatorFeeManager; use crate::operator_fee_manager::Tick as OperatorTick; use crate::operator_update::{OperatorUpdate, Update}; use crate::traffic_watcher::GetExitDestPrice; -use crate::traffic_watcher::TrafficWatcher; +use crate::traffic_watcher::TrafficWatcherActor; use rita_common::tunnel_manager::GetNeighbors; use rita_common::tunnel_manager::GetTunnels; use rita_common::tunnel_manager::TunnelManager; use actix::{ Actor, ActorContext, Addr, Arbiter, AsyncContext, Context, Handler, Message, Supervised, - SystemService, + System, SystemService, }; use actix_web::http::Method; use actix_web::{server, App}; + +use actix_async::System as AsyncSystem; +use std::thread; +use std::time::{Duration, Instant}; + use althea_types::ExitState; use failure::Error; use futures01::future::Future; use std::sync::atomic::{AtomicBool, Ordering}; -use std::time::{Duration, Instant}; lazy_static! { /// see the comment on check_for_gateway_client_billing_corner_case() @@ -116,11 +120,9 @@ impl Handler for RitaLoop { let start = Instant::now(); trace!("Client Tick!"); - ExitManager::from_registry().do_send(Tick {}); - Arbiter::spawn(check_for_gateway_client_billing_corner_case()); - let dest_price = TrafficWatcher::from_registry().send(GetExitDestPrice); + let dest_price = TrafficWatcherActor::from_registry().send(GetExitDestPrice); let tunnels = TunnelManager::from_registry().send(GetTunnels); Arbiter::spawn(dest_price.join(tunnels).then(move |res| { // unwrap top level actix error, ok to crash if this fails @@ -155,9 +157,51 @@ impl Handler for RitaLoop { } } +/// Rita loop thread spawning function, there are currently two rita loops, one that +/// runs as a thread with async/await support and one that runs as a actor using old futures +/// slowly things will be migrated into this new sync loop as we move to async/await +pub fn start_rita_loop() { + let mut last_restart = Instant::now(); + // this is a reference to the non-async actix system since this can bring down the whole process + let system = System::current(); + + // outer thread is a watchdog inner thread is the runner + thread::spawn(move || { + // this will always be an error, so it's really just a loop statement + // with some fancy destructuring + + while let Err(e) = { + thread::spawn(move || loop { + let start = Instant::now(); + trace!("Client tick!"); + + let runner = AsyncSystem::new(); + runner.block_on(async move { + exit_manager_tick().await; + }); + + // sleep until it has been CLIENT_LOOP_SPEED seconds from start, whenever that may be + // if it has been more than CLIENT_LOOP_SPEED seconds from start, go right ahead + let client_loop_speed = Duration::from_secs(CLIENT_LOOP_SPEED); + if start.elapsed() < client_loop_speed { + thread::sleep(client_loop_speed - start.elapsed()); + } + }) + .join() + } { + error!("Rita client loop thread paniced! Respawning {:?}", e); + if Instant::now() - last_restart < Duration::from_secs(60) { + error!("Restarting too quickly, leaving it to auto rescue!"); + system.stop_with_code(121) + } + last_restart = Instant::now(); + } + }); +} + pub fn check_rita_client_actors() { assert!(crate::rita_loop::RitaLoop::from_registry().connected()); - assert!(crate::exit_manager::ExitManager::from_registry().connected()); + crate::rita_loop::start_rita_loop(); } /// There is a complicated corner case where the gateway is a client and a relay to diff --git a/rita_client/src/traffic_watcher/mod.rs b/rita_client/src/traffic_watcher/mod.rs index 75158b96a..399f93e08 100644 --- a/rita_client/src/traffic_watcher/mod.rs +++ b/rita_client/src/traffic_watcher/mod.rs @@ -27,25 +27,26 @@ use rita_common::debt_keeper::{ traffic_replace, traffic_update, wgkey_insensitive_traffic_update, Traffic, }; -use actix::{Actor, Arbiter, Context, Handler, Message, Supervised, SystemService}; -use actix_web::client; -use actix_web::client::Connection; -use actix_web::HttpMessage; +use actix::{Actor, Context, Handler, Message, Supervised, SystemService}; use althea_types::Identity; use babel_monitor::Route as RouteLegacy; use failure::Error; -use futures01::future::ok as future_ok; -use futures01::future::Future; use num256::Int256; use num_traits::identities::Zero; use rita_common::usage_tracker::update_usage_data; use rita_common::usage_tracker::UpdateUsage; use rita_common::usage_tracker::UsageType; use rita_common::KI; -use std::net::{IpAddr, SocketAddr}; +use std::net::IpAddr; +use std::sync::Arc; +use std::sync::RwLock; use std::time::Duration; use std::time::Instant; -use tokio::net::TcpStream as TokioTcpStream; + +lazy_static! { + pub static ref TRAFFIC_WATCHER: Arc> = + Arc::new(RwLock::new(TrafficWatcher::default())); +} pub struct TrafficWatcher { // last read download @@ -56,18 +57,21 @@ pub struct TrafficWatcher { last_exit_dest_price: u128, } -impl Actor for TrafficWatcher { +/// Dummy Traffic Watcher struct for Actor, used to ensure we dont accidently read out of the +/// original TrafficWatcher struct. +pub struct TrafficWatcherActor {} + +impl Actor for TrafficWatcherActor { type Context = Context; } -impl Supervised for TrafficWatcher {} -impl SystemService for TrafficWatcher { - fn service_started(&mut self, _ctx: &mut Context) { - info!("Client traffic watcher started"); - self.last_read_input = 0; - self.last_read_output = 0; - self.last_exit_dest_price = 0; +impl Supervised for TrafficWatcherActor {} +impl SystemService for TrafficWatcherActor {} +impl Default for TrafficWatcherActor { + fn default() -> TrafficWatcherActor { + TrafficWatcherActor {} } } + impl Default for TrafficWatcher { fn default() -> TrafficWatcher { TrafficWatcher { @@ -104,134 +108,91 @@ impl Message for QueryExitDebts { type Result = Result<(), Error>; } -impl Handler for TrafficWatcher { - type Result = Result<(), Error>; - - fn handle(&mut self, msg: QueryExitDebts, _: &mut Context) -> Self::Result { - trace!("About to query the exit for client debts"); - - // we could exit the function if this fails, but doing so would remove the chance - // that we can get debts from the exit and continue anyways - let local_debt = - match local_traffic_calculation(self, &msg.exit_id, msg.exit_price, msg.routes) { - Ok(val) => Some(Int256::from(val)), - Err(_e) => None, - }; - - let gateway_exit_client = is_gateway_client(); - let start = Instant::now(); - let exit_addr = msg.exit_internal_addr; - let exit_id = msg.exit_id; - let exit_port = msg.exit_port; - // actix client behaves badly if you build a request the default way but don't give it - // a domain name, so in order to do peer to peer requests we use with_connection and our own - // socket specification - let our_id = settings::get_rita_client().get_identity(); - let request = format!("http://{}:{}/client_debt", exit_addr, exit_port); - // it's an ipaddr appended to a u16, there's no real way for this to fail - // unless of course it's an ipv6 address and you don't do the [] - let socket: SocketAddr = format!("{}:{}", exit_addr, exit_port).parse().unwrap(); - - let stream_future = TokioTcpStream::connect(&socket); - - let s = stream_future.then(move |active_stream| match active_stream { - Ok(stream) => Box::new( - client::post(request.clone()) - .with_connection(Connection::from_stream(stream)) - .json(our_id) - .unwrap() - .send() - .timeout(Duration::from_secs(5)) - .then(move |response| match response { - Ok(response) => Box::new(response.json().then(move |debt_value| { - match debt_value { - Ok(debt) => { - info!( - "Successfully got debt from the exit {:?} Rita Client TrafficWatcher completed in {}s {}ms", - debt, - start.elapsed().as_secs(), - start.elapsed().subsec_millis() - ); - let we_are_not_a_gateway = !gateway_exit_client; - let we_owe_exit = debt >= Int256::zero(); - match (we_are_not_a_gateway, we_owe_exit) { - (true, true) => { - traffic_replace( - Traffic { - from: exit_id, - amount: debt, - } - ) - }, - // the exit should never tell us it owes us, that doesn't make sense outside of the gateway - // client corner case - (true, false) => warn!("We're probably a gateway but haven't detected it yet"), - (false, _) => { - info!("We are a gateway!, Acting accordingly"); - if let Some(val) = local_debt { - wgkey_insensitive_traffic_update( - Traffic { - from: exit_id, - amount: val, - } - - ) - } - }, - } - } - Err(e) => { - error!("Failed deserializing exit debts update with {:?}", e); - if let Some(val) = local_debt { - traffic_update( - vec![Traffic { - from: exit_id, - amount: val, - }] - - ) - } - } - } - Ok(()) as Result<(), ()> - })), - Err(e) => { - error!("Exit debts request to {} failed with {:?}", request, e); - if let Some(val) = local_debt { - traffic_update( - vec![Traffic { - from: exit_id, - amount: val, - }] - - ) - } - Box::new(future_ok(())) as Box> - } - }), - ), - - Err(e) => { - error!( - "Failed to open stream to exit for debts update! with {:?}", - e - ); - if let Some(val) = local_debt { - traffic_update( - vec![Traffic { - from: exit_id, - amount: val, - }] - - ) - - +pub async fn query_exit_debts(msg: QueryExitDebts) { + trace!("About to query the exit for client debts"); + + let writer = &mut *TRAFFIC_WATCHER.write().unwrap(); + + // we could exit the function if this fails, but doing so would remove the chance + // that we can get debts from the exit and continue anyways + let local_debt = + match local_traffic_calculation(writer, &msg.exit_id, msg.exit_price, msg.routes) { + Ok(val) => Some(Int256::from(val)), + Err(_e) => None, + }; + let gateway_exit_client = is_gateway_client(); + let start = Instant::now(); + let exit_addr = msg.exit_internal_addr; + let exit_id = msg.exit_id; + let exit_port = msg.exit_port; + // actix client behaves badly if you build a request the default way but don't give it + // a domain name, so in order to do peer to peer requests we use with_connection and our own + // socket specification + let our_id = settings::get_rita_client().get_identity(); + let request = format!("http://{}:{}/client_debt", exit_addr, exit_port); + // it's an ipaddr appended to a u16, there's no real way for this to fail + // unless of course it's an ipv6 address and you don't do the [] + + let client = awc::Client::default(); + let response = client + .post(request.clone()) + .timeout(Duration::from_secs(5)) + .send_json(&our_id) + .await; + let mut response = match response { + Ok(a) => a, + Err(e) => { + error!("Exit debts request to {} failed with {:?}", request, e); + if let Some(val) = local_debt { + traffic_update(vec![Traffic { + from: exit_id, + amount: val, + }]) + } + return; + } + }; + let response = response.json().await; + match response { + Ok(debt) => { + info!( + "Successfully got debt from the exit {:?} Rita Client TrafficWatcher completed in {}s {}ms", + debt, + start.elapsed().as_secs(), + start.elapsed().subsec_millis() + ); + let we_are_not_a_gateway = !gateway_exit_client; + let we_owe_exit = debt >= Int256::zero(); + match (we_are_not_a_gateway, we_owe_exit) { + (true, true) => traffic_replace(Traffic { + from: exit_id, + amount: debt, + }), + // the exit should never tell us it owes us, that doesn't make sense outside of the gateway + // client corner case + (true, false) => { + warn!("We're probably a gateway but haven't detected it yet") + } + (false, _) => { + info!("We are a gateway!, Acting accordingly"); + if let Some(val) = local_debt { + wgkey_insensitive_traffic_update(Traffic { + from: exit_id, + amount: val, + }) + } } - Box::new(future_ok(())) as Box> } - }); - Arbiter::spawn(s); - Ok(()) + } + Err(e) => { + error!("Failed deserializing exit debts update with {:?}", e); + if let Some(val) = local_debt { + traffic_update(vec![Traffic { + from: exit_id, + amount: val, + }]) + } + } } } @@ -368,10 +329,10 @@ impl Message for GetExitDestPrice { type Result = Result; } -impl Handler for TrafficWatcher { +impl Handler for TrafficWatcherActor { type Result = Result; fn handle(&mut self, _msg: GetExitDestPrice, _: &mut Context) -> Self::Result { - Ok(self.last_exit_dest_price) + Ok(TRAFFIC_WATCHER.read().unwrap().last_exit_dest_price) } }