Skip to content

Commit

Permalink
amux: added a few options to configure channel sizes
Browse files Browse the repository at this point in the history
  • Loading branch information
eycorsican committed Sep 27, 2023
1 parent 7718eae commit f7788cd
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 6 deletions.
12 changes: 12 additions & 0 deletions leaf/src/option/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,18 @@ lazy_static! {
get_env_var_or("QUIC_ACCEPT_CHANNEL_SIZE", 1024)
};

pub static ref AMUX_ACCEPT_CHANNEL_SIZE: usize = {
get_env_var_or("AMUX_ACCEPT_CHANNEL_SIZE", 1024)
};

pub static ref AMUX_STREAM_CHANNEL_SIZE: usize = {
get_env_var_or("AMUX_STREAM_CHANNEL_SIZE", 16)
};

pub static ref AMUX_FRAME_CHANNEL_SIZE: usize = {
get_env_var_or("AMUX_FRAME_CHANNEL_SIZE", 32)
};

/// Buffer size for UDP datagrams receiving/sending, in KB.
pub static ref DATAGRAM_BUFFER_SIZE: usize = {
get_env_var_or("DATAGRAM_BUFFER_SIZE", 2)
Expand Down
14 changes: 8 additions & 6 deletions leaf/src/proxy/amux/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ impl MuxStream {
stream_end: Arc<AtomicBool>,
) -> (Self, Sender<Vec<u8>>) {
trace!("new mux stream {} (session {})", stream_id, session_id);
let (stream_read_tx, stream_read_rx) = mpsc::channel::<Vec<u8>>(1);
let (stream_read_tx, stream_read_rx) =
mpsc::channel::<Vec<u8>>(*crate::option::AMUX_STREAM_CHANNEL_SIZE);
(
MuxStream {
session_id,
Expand Down Expand Up @@ -364,8 +365,6 @@ impl<S: AsyncWrite + Unpin> Sink<MuxFrame> for MuxConnection<S> {
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let me = &mut *self;

// ready!(Pin::new(&mut me.inner.write_all(&me.write_buf)).poll(cx))?;

while !me.write_buf.is_empty() {
let n = ready!(Pin::new(&mut me.inner).poll_write(cx, &me.write_buf))?;
if n == 0 {
Expand Down Expand Up @@ -528,7 +527,8 @@ impl MuxSession {
S: 'static + AsyncRead + AsyncWrite + Unpin + Send,
{
let (frame_sink, frame_stream) = MuxConnection::new(conn).split();
let (frame_write_tx, frame_write_rx) = mpsc::channel::<MuxFrame>(1);
let (frame_write_tx, frame_write_rx) =
mpsc::channel::<MuxFrame>(*crate::option::AMUX_FRAME_CHANNEL_SIZE);
let (recv_end, send_end) = (Arc::new(Mutex::new(false)), Arc::new(Mutex::new(false)));
let streams: Streams = Arc::new(Mutex::new(HashMap::new()));
let recv_bytes_counter = Arc::new(AtomicUsize::new(0));
Expand Down Expand Up @@ -569,9 +569,11 @@ impl MuxSession {
S: 'static + AsyncRead + AsyncWrite + Unpin + Send,
{
let (frame_sink, frame_stream) = MuxConnection::new(conn).split();
let (frame_write_tx, frame_write_rx) = mpsc::channel::<MuxFrame>(1);
let (frame_write_tx, frame_write_rx) =
mpsc::channel::<MuxFrame>(*crate::option::AMUX_FRAME_CHANNEL_SIZE);
let streams: Streams = Arc::new(Mutex::new(HashMap::new()));
let (stream_accept_tx, stream_accept_rx) = mpsc::channel(1);
let (stream_accept_tx, stream_accept_rx) =
mpsc::channel(*crate::option::AMUX_ACCEPT_CHANNEL_SIZE);
let session_id = random_u16();
let recv_handle = Self::run_frame_receive_loop(
streams.clone(),
Expand Down

0 comments on commit f7788cd

Please sign in to comment.