Skip to content

Commit

Permalink
Split structure in order to implement transparent mode later
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgeajimenezl committed Jun 29, 2023
1 parent 7209c3b commit a4a3f4c
Show file tree
Hide file tree
Showing 9 changed files with 414 additions and 220 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ config = { version = "0.13.3", default-features = false, features = ["ini"] }
thiserror = "1.0.40"
http = "0.2.9"
wildmatch = "2.1.1"
async-trait = "0.1.68"
167 changes: 7 additions & 160 deletions src/app.rs
Original file line number Diff line number Diff line change
@@ -1,78 +1,19 @@
use super::http::{DigestState, HttpHandler};
use super::http::DigestState;
use super::proxy::{Credentials, Proxy};
use crate::acl::{Acl, Rule};
use crate::error::Error;

use tokio::net::{TcpListener, TcpStream};
use tokio::{self, signal, sync::oneshot};

use http_body_util::{combinators::BoxBody, BodyExt, Empty};
use hyper::{
body::{Bytes, Incoming},
header::{PROXY_AUTHENTICATE, PROXY_AUTHORIZATION},
server::conn::http1,
service::service_fn,
Request, Response, StatusCode,
};
use crate::transport::http::HttpServer;
use crate::transport::Server;

use config::Config;
use log::{debug, error, info, trace, warn};
use log::info;

use std::{
convert::Infallible,
net::SocketAddr,
str::FromStr,
sync::{
atomic::{AtomicU32, Ordering},
Arc, Mutex,
},
sync::{Arc, Mutex},
};

macro_rules! box_body {
($t:expr) => {
$t.map(|f| f.boxed())
};
}

#[inline]
pub(crate) fn empty() -> BoxBody<Bytes, hyper::Error> {
Empty::<Bytes>::new()
.map_err(|never| match never {})
.boxed()
}

pub async fn redirect_http(
req: Request<Incoming>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, Error> {
debug!("Request forwarded directly to original destination");

let host = req.uri().host().ok_or("Uri has no host")?;
let port = req.uri().port_u16().unwrap_or(80);

let address = format!("{port}:{host}");

// Open a TCP connection to the remote host
let stream = match TcpStream::connect(address).await {
Ok(v) => v,
Err(e) => {
warn!("Unable to connect to {}: {e}", req.uri());
return Ok(Response::builder()
.status(StatusCode::BAD_GATEWAY)
.body(empty())
.unwrap());
}
};

let (mut sender, conn) = hyper::client::conn::http1::handshake(stream).await?;
tokio::spawn(async move {
if let Err(err) = conn.await {
println!("Connection failed: {:?}", err);
}
});

Ok(box_body!(sender.send_request(box_body!(req)).await?))
}

#[derive(Clone, Debug)]
pub enum OperationMode {
Transparent,
Expand All @@ -98,7 +39,7 @@ pub struct AppContext {
pub mode: OperationMode,
pub acl: Acl,

digest_state: Arc<Mutex<DigestState>>,
pub digest_state: Arc<Mutex<DigestState>>,
}

pub struct App {
Expand Down Expand Up @@ -182,107 +123,13 @@ impl App {
})
}

async fn handle_connection(
context: AppContext,
id: u32,
mut req: Request<Incoming>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, Infallible> {
debug!("[#{id}] Requested: {}", req.uri());
trace!("[#{id}] Request struct: {req:?}");

if let Some(host) = req.uri().host() {
if context.acl.match_hostname(host) == Rule::Deny {
debug!("[#{id}] Avoided try to connect with {host}");
return Ok(redirect_http(req).await.unwrap_or_else(|e| {
error!("Error forwarding request to destination: {e}");

Response::builder()
.status(StatusCode::BAD_GATEWAY)
.body(empty())
.unwrap()
}));
}
}

// Remove proxy headers
if matches!(context.mode, OperationMode::Proxy) {
let headers = req.headers_mut();
headers.remove(PROXY_AUTHENTICATE);
headers.remove(PROXY_AUTHORIZATION);
}

// Forward the request
let client = HttpHandler::new(id, context.proxies, Arc::clone(&context.digest_state));
if let Err(e) = client.request(req).await {
error!("Error forwarding request to destination: {e}");
return Ok(Response::builder()
.status(StatusCode::BAD_GATEWAY)
.body(empty())
.unwrap());
}

debug!("[#{id}] Connection processed successful");
Ok(Response::builder().body(empty()).unwrap())
}

async fn serve_http(context: AppContext) -> Result<(), Error> {
let addr = context.addr;
let count = Arc::new(AtomicU32::new(0));

let tcp_listener = TcpListener::bind(addr).await?;
info!("Proxy listening at http://{addr}. Press Ctrl+C to stop it",);

let (tx, mut rx) = oneshot::channel();

// Main loop
tokio::spawn(async move {
loop {
tokio::select! {
conn = tcp_listener.accept() => {
let (stream, remote_addr) = match conn {
Ok(v) => v,
Err(e) => {
error!("Unable to accept incomming TCP connection: {e}");
return;
}
};

// Get connections count
let id = count.fetch_add(1, Ordering::SeqCst);
debug!("[#{id}] Incoming connection: <{remote_addr}>");

let context = context.clone();
let proxy =
service_fn(move |req| App::handle_connection(context.clone(), id, req));

tokio::spawn(async move {
if let Err(e) = http1::Builder::new()
.keep_alive(true)
.preserve_header_case(true)
.serve_connection(stream, proxy)
.with_upgrades()
.await {
error!("Server error: {e}");
}
});
}
_ = (&mut rx) => { break; }
}
}
});

signal::ctrl_c().await?;
let _ = tx.send(());
Ok(())
}

pub async fn run(self) -> Result<(), String> {
// Separate to avoid add more logic
if matches!(self.context.mode, OperationMode::Transparent) {
todo!("Wait a little more :(");
}

App::serve_http(self.context)
HttpServer::serve(self.context)
.await
.map_err(|e| format!("Server Error: {}", e))?;

Expand Down
116 changes: 116 additions & 0 deletions src/core.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
use async_trait::async_trait;
use hyper::{body::Incoming, Request};
use std::{
marker::PhantomData,
net::{IpAddr, SocketAddr},
};
use tokio::{
io::{AsyncRead, AsyncWrite},
net::TcpStream,
};

use crate::error::Error;

#[derive(Hash, Clone, Eq, PartialEq, Debug)]
pub(crate) enum MaybeNamedHost {
Address(IpAddr),
Hostname(String),
}

impl From<IpAddr> for MaybeNamedHost {
fn from(value: IpAddr) -> Self {
MaybeNamedHost::Address(value)
}
}

impl From<&str> for MaybeNamedHost {
fn from(value: &str) -> Self {
MaybeNamedHost::Hostname(value.to_string())
}
}

impl std::fmt::Display for MaybeNamedHost {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
MaybeNamedHost::Address(addr) => addr.fmt(f),
MaybeNamedHost::Hostname(name) => name.fmt(f),
}
}
}

#[derive(Hash, Clone, Eq, PartialEq, Debug)]
pub struct MaybeNamedSock {
pub(crate) host: MaybeNamedHost,
pub(crate) port: u16,
}

impl TryFrom<MaybeNamedSock> for SocketAddr {
type Error = Error;
fn try_from(value: MaybeNamedSock) -> Result<Self, Self::Error> {
let ip = match value.host {
MaybeNamedHost::Address(addr) => addr,
MaybeNamedHost::Hostname(e) => {
return Err(e.into());
}
};
Ok(SocketAddr::new(ip, value.port))
}
}

impl From<SocketAddr> for MaybeNamedSock {
fn from(addr: SocketAddr) -> Self {
Self {
host: MaybeNamedHost::Address(addr.ip()),
port: addr.port(),
}
}
}

impl std::fmt::Display for MaybeNamedSock {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if let MaybeNamedHost::Address(IpAddr::V6(addr)) = self.host {
write!(f, "[{}]:{}", addr, self.port)
} else {
write!(f, "{}:{}", self.host, self.port)
}
}
}

#[async_trait]
pub trait ToStream<S: AsyncRead + AsyncWrite + Send + Unpin + 'static> {
async fn into_stream(self) -> Result<S, Error>;
}

pub struct ProxyRequest<T, S>
where
T: ToStream<S>,
S: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
pub destination: MaybeNamedSock,
pub inner: T,
pub _phanton: PhantomData<S>,
}

impl<T, S> ProxyRequest<T, S>
where
T: ToStream<S>,
S: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
pub async fn into_stream(self) -> Result<S, Error> {
self.inner.into_stream().await
}
}

#[async_trait]
impl ToStream<hyper::upgrade::Upgraded> for Request<Incoming> {
async fn into_stream(self) -> Result<hyper::upgrade::Upgraded, Error> {
Ok(hyper::upgrade::on(self).await?)
}
}

#[async_trait]
impl ToStream<TcpStream> for TcpStream {
async fn into_stream(self) -> Result<TcpStream, Error> {
Ok(self)
}
}
Loading

0 comments on commit a4a3f4c

Please sign in to comment.