Skip to content

Commit

Permalink
feat: send add tx requests through network channel
Browse files Browse the repository at this point in the history
  • Loading branch information
MohammadNassar1 committed May 20, 2024
1 parent 597efc4 commit 0cb907c
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 72 deletions.
2 changes: 2 additions & 0 deletions crates/gateway/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use tokio::task::JoinError;
pub enum GatewayError {
#[error("Internal server error: {0}")]
InternalServerError(#[from] JoinError),
#[error("Error sending message: {0}")]
MessageSendError(String),
#[error(transparent)]
StatelessTransactionValidatorError(#[from] StatelessTransactionValidatorError),
}
Expand Down
45 changes: 32 additions & 13 deletions crates/gateway/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ use std::sync::Arc;
use axum::extract::State;
use axum::routing::{get, post};
use axum::{Json, Router};
use mempool_infra::network_component::CommunicationInterface;
use starknet_api::external_transaction::ExternalTransaction;
use starknet_mempool_types::mempool_types::GatewayNetworkComponent;
use starknet_mempool_types::mempool_types::{
GatewayNetworkComponent, GatewayToMempoolMessage, MempoolInput,
};

use crate::config::{GatewayNetworkConfig, StatelessTransactionValidatorConfig};
use crate::errors::GatewayError;
Expand Down Expand Up @@ -54,7 +57,7 @@ impl Gateway {

Router::new()
.route("/is_alive", get(is_alive))
.route("/add_tx", post(async_add_tx))
.route("/add_tx", post(add_tx))
.with_state(app_state)
// TODO: when we need to configure the router, like adding banned ips, add it here via
// `with_state`.
Expand All @@ -65,27 +68,43 @@ async fn is_alive() -> GatewayResult<String> {
unimplemented!("Future handling should be implemented here.");
}

async fn async_add_tx(
State(gateway_state): State<AppState>,
async fn add_tx(
State(app_state): State<AppState>,
Json(tx): Json<ExternalTransaction>,
) -> GatewayResult<String> {
tokio::task::spawn_blocking(move || add_tx(gateway_state, tx)).await?
let (response, mempool_input) = tokio::task::spawn_blocking(move || {
process_tx(app_state.stateless_transaction_validator, tx)
})
.await??;

let message = GatewayToMempoolMessage::AddTransaction(mempool_input);
app_state.network_component.send(message).await.map_err(|e| {
GatewayError::MessageSendError(format!("Failed to send transaction message: {}", e))
})?;
Ok(response)
}

fn add_tx(gateway_state: AppState, tx: ExternalTransaction) -> GatewayResult<String> {
fn process_tx(
stateless_transaction_validator: StatelessTransactionValidator,
tx: ExternalTransaction,
) -> GatewayResult<(String, MempoolInput)> {
// TODO(Arni, 1/5/2024): Preform congestion control.

// Perform stateless validations.
gateway_state.stateless_transaction_validator.validate(&tx)?;
stateless_transaction_validator.validate(&tx)?;

// TODO(Yael, 1/5/2024): Preform state related validations.
// TODO(Arni, 1/5/2024): Move transaction to mempool.

// TODO(Arni, 1/5/2024): Produce response.
// Send response.
Ok(match tx {
ExternalTransaction::Declare(_) => "DECLARE".into(),
ExternalTransaction::DeployAccount(_) => "DEPLOY_ACCOUNT".into(),
ExternalTransaction::Invoke(_) => "INVOKE".into(),
})

Ok((
match tx {
ExternalTransaction::Declare(_) => "DECLARE".into(),
ExternalTransaction::DeployAccount(_) => "DEPLOY_ACCOUNT".into(),
ExternalTransaction::Invoke(_) => "INVOKE".into(),
},
// TODO(Yael, 15/5/2024): Add tx converter.
MempoolInput::default(),
))
}
33 changes: 15 additions & 18 deletions crates/gateway/src/gateway_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,19 @@ use axum::extract::State;
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use pretty_assertions::assert_str_eq;
use rstest::{fixture, rstest};
use rstest::rstest;
use starknet_api::external_transaction::ExternalTransaction;
use starknet_mempool_types::mempool_types::{
GatewayNetworkComponent, GatewayToMempoolMessage, MempoolToGatewayMessage,
};
use tokio::sync::mpsc::channel;

use crate::config::StatelessTransactionValidatorConfig;
use crate::gateway::{async_add_tx, AppState};
use crate::gateway::{add_tx, AppState};
use crate::stateless_transaction_validator::StatelessTransactionValidator;

const TEST_FILES_FOLDER: &str = "./tests/fixtures";

#[fixture]
pub fn network_component() -> GatewayNetworkComponent {
let (tx_gateway_to_mempool, _rx_gateway_to_mempool) = channel::<GatewayToMempoolMessage>(1);
let (_, rx_mempool_to_gateway) = channel::<MempoolToGatewayMessage>(1);

GatewayNetworkComponent::new(tx_gateway_to_mempool, rx_mempool_to_gateway)
}

// TODO(Ayelet): Replace the use of the JSON files with generated instances, then serialize these
// into JSON for testing.
#[rstest]
Expand All @@ -38,11 +30,16 @@ pub fn network_component() -> GatewayNetworkComponent {
)]
#[case::invoke(&Path::new(TEST_FILES_FOLDER).join("invoke_v3.json"), "INVOKE")]
#[tokio::test]
async fn test_add_tx(
#[case] json_file_path: &Path,
#[case] expected_response: &str,
network_component: GatewayNetworkComponent,
) {
async fn test_add_tx(#[case] json_file_path: &Path, #[case] expected_response: &str) {
// The `_rx_gateway_to_mempool` is retained to keep the channel open, as dropping it would
// prevent the sender from transmitting messages.
let (tx_gateway_to_mempool, _rx_gateway_to_mempool) = channel::<GatewayToMempoolMessage>(1);
let (_, rx_mempool_to_gateway) = channel::<MempoolToGatewayMessage>(1);

// TODO: Add fixture.
let network_component =
Arc::new(GatewayNetworkComponent::new(tx_gateway_to_mempool, rx_mempool_to_gateway));

let json_file = File::open(json_file_path).unwrap();
let tx: ExternalTransaction = serde_json::from_reader(json_file).unwrap();

Expand All @@ -54,15 +51,15 @@ async fn test_add_tx(
..Default::default()
},
},
network_component: Arc::new(network_component),
network_component,
};

// Negative flow.
const TOO_SMALL_SIGNATURE_LENGTH: usize = 0;
app_state.stateless_transaction_validator.config.max_signature_length =
TOO_SMALL_SIGNATURE_LENGTH;

let response = async_add_tx(State(app_state.clone()), tx.clone().into()).await.into_response();
let response = add_tx(State(app_state.clone()), tx.clone().into()).await.into_response();

let status_code = response.status();
assert_eq!(status_code, StatusCode::INTERNAL_SERVER_ERROR);
Expand All @@ -74,7 +71,7 @@ async fn test_add_tx(
// Positive flow.
app_state.stateless_transaction_validator.config.max_signature_length = 2;

let response = async_add_tx(State(app_state), tx.into()).await.into_response();
let response = add_tx(State(app_state), tx.into()).await.into_response();

let status_code = response.status();
assert_eq!(status_code, StatusCode::OK);
Expand Down
14 changes: 7 additions & 7 deletions crates/gateway/tests/end_to_end_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use mempool_infra::network_component::CommunicationInterface;
use starknet_api::core::{ContractAddress, Nonce};
use starknet_api::transaction::TransactionHash;
use starknet_mempool_types::mempool_types::{
Account, AccountState, GatewayNetworkComponent, GatewayToMempoolMessage,
MempoolNetworkComponent, MempoolToGatewayMessage, ThinTransaction,
Account, AccountState, GatewayNetworkComponent, GatewayToMempoolMessage, MempoolInput,
MempoolNetworkComponent, MempoolToGatewayMessage,
};
use tokio::sync::mpsc::channel;
use tokio::task;
Expand All @@ -22,10 +22,10 @@ async fn test_send_and_receive() {
let mut mempool_network =
MempoolNetworkComponent::new(tx_mempool_to_gateway, rx_gateway_to_mempool);

let tx = ThinTransaction::default();
let account = create_default_account();
let tx_hash = TransactionHash::default();
let mempool_input = MempoolInput::default();
task::spawn(async move {
let gateway_to_mempool = GatewayToMempoolMessage::AddTransaction(tx, account);
let gateway_to_mempool = GatewayToMempoolMessage::AddTransaction(mempool_input);
gateway_network.send(gateway_to_mempool).await.unwrap();
})
.await
Expand All @@ -35,8 +35,8 @@ async fn test_send_and_receive() {
task::spawn(async move { mempool_network.recv().await }).await.unwrap().unwrap();

match mempool_message {
GatewayToMempoolMessage::AddTransaction(tx, _) => {
assert_eq!(tx.tx_hash, TransactionHash::default())
GatewayToMempoolMessage::AddTransaction(mempool_input) => {
assert_eq!(mempool_input.tx.tx_hash, tx_hash);
}
}
}
57 changes: 27 additions & 30 deletions crates/gateway/tests/routing_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::path::Path;
use axum::body::{Body, Bytes, HttpBody};
use axum::http::{Request, StatusCode};
use pretty_assertions::assert_str_eq;
use rstest::{fixture, rstest};
use rstest::rstest;
use starknet_gateway::config::{GatewayNetworkConfig, StatelessTransactionValidatorConfig};
use starknet_gateway::gateway::Gateway;
use starknet_mempool_types::mempool_types::{
Expand All @@ -16,26 +16,6 @@ use tower::ServiceExt;

const TEST_FILES_FOLDER: &str = "./tests/fixtures";

#[fixture]
pub fn gateway() -> Gateway {
let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
let port = 3000;
let network_config: GatewayNetworkConfig = GatewayNetworkConfig { ip, port };

let stateless_transaction_validator_config = StatelessTransactionValidatorConfig {
max_calldata_length: 1000,
max_signature_length: 2,
..Default::default()
};

let (tx_gateway_to_mempool, _rx_gateway_to_mempool) = channel::<GatewayToMempoolMessage>(1);
let (_, rx_mempool_to_gateway) = channel::<MempoolToGatewayMessage>(1);
let network_component =
GatewayNetworkComponent::new(tx_gateway_to_mempool, rx_mempool_to_gateway);

Gateway { network_config, stateless_transaction_validator_config, network_component }
}

// TODO(Ayelet): Replace the use of the JSON files with generated instances, then serialize these
// into JSON for testing.
#[rstest]
Expand All @@ -46,18 +26,14 @@ pub fn gateway() -> Gateway {
)]
#[case::invoke(&Path::new(TEST_FILES_FOLDER).join("invoke_v3.json"), "INVOKE")]
#[tokio::test]
async fn test_routes(
#[case] json_file_path: &Path,
#[case] expected_response: &str,
gateway: Gateway,
) {
async fn test_routes(#[case] json_file_path: &Path, #[case] expected_response: &str) {
let tx_json = fs::read_to_string(json_file_path).unwrap();
let request = Request::post("/add_tx")
.header("content-type", "application/json")
.body(Body::from(tx_json))
.unwrap();

let response = check_request(request, StatusCode::OK, gateway).await;
let response = check_request(request, StatusCode::OK).await;

assert_str_eq!(expected_response, String::from_utf8_lossy(&response));
}
Expand All @@ -66,13 +42,34 @@ async fn test_routes(
#[tokio::test]
#[should_panic]
// FIXME: Currently is_alive is not implemented, fix this once it is implemented.
async fn test_is_alive(gateway: Gateway) {
async fn test_is_alive() {
let request = Request::get("/is_alive").body(Body::empty()).unwrap();
// Status code doesn't matter, this panics ATM.
check_request(request, StatusCode::default(), gateway).await;
check_request(request, StatusCode::default()).await;
}

async fn check_request(request: Request<Body>, status_code: StatusCode, gateway: Gateway) -> Bytes {
async fn check_request(request: Request<Body>, status_code: StatusCode) -> Bytes {
let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
let port = 3000;
let network_config: GatewayNetworkConfig = GatewayNetworkConfig { ip, port };

let stateless_transaction_validator_config = StatelessTransactionValidatorConfig {
max_calldata_length: 1000,
max_signature_length: 2,
..Default::default()
};

// The `_rx_gateway_to_mempool` is retained to keep the channel open, as dropping it would
// prevent the sender from transmitting messages.
let (tx_gateway_to_mempool, _rx_gateway_to_mempool) = channel::<GatewayToMempoolMessage>(1);
let (_, rx_mempool_to_gateway) = channel::<MempoolToGatewayMessage>(1);
let network_component =
GatewayNetworkComponent::new(tx_gateway_to_mempool, rx_mempool_to_gateway);

// TODO: Add fixture.
let gateway =
Gateway { network_config, stateless_transaction_validator_config, network_component };

let response = gateway.app().oneshot(request).await.unwrap();
assert_eq!(response.status(), status_code);

Expand Down
5 changes: 3 additions & 2 deletions crates/mempool/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ impl Mempool {

/// Adds a new transaction to the mempool.
/// TODO: support fee escalation and transactions with future nonces.
/// TODO: change input type to `MempoolInput`.
pub fn add_tx(&mut self, tx: ThinTransaction, account: Account) -> MempoolResult<()> {
match self.state.entry(account.address) {
Occupied(_) => Err(MempoolError::DuplicateTransaction { tx_hash: tx.tx_hash }),
Expand Down Expand Up @@ -114,8 +115,8 @@ impl Mempool {

fn process_network_message(&mut self, message: GatewayToMempoolMessage) -> Result<()> {
match message {
GatewayToMempoolMessage::AddTransaction(tx, account_state) => {
self.add_tx(tx, account_state)?;
GatewayToMempoolMessage::AddTransaction(mempool_input) => {
self.add_tx(mempool_input.tx, mempool_input.account)?;
Ok(())
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/mempool_types/src/mempool_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ pub struct Account {
pub state: AccountState,
}

#[derive(Debug)]
#[derive(Debug, Default)]
pub struct MempoolInput {
pub tx: ThinTransaction,
pub account: Account,
}

#[derive(Debug)]
pub enum GatewayToMempoolMessage {
AddTransaction(ThinTransaction, Account),
AddTransaction(MempoolInput),
}

pub type MempoolToGatewayMessage = ();
Expand Down

0 comments on commit 0cb907c

Please sign in to comment.