Skip to content

Commit

Permalink
Discard shards when buffers are not available
Browse files Browse the repository at this point in the history
  • Loading branch information
zmerp committed Aug 17, 2023
1 parent 20be76d commit 3e88963
Showing 1 changed file with 29 additions and 15 deletions.
44 changes: 29 additions & 15 deletions alvr/sockets/src/stream_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

// Performance analysis:
// We want to minimize the transmission time for various sizes of packets.
// The current code locks the write socker *per shard* and not *per packet*. This leds to the best
// The current code locks the write socket *per shard* and not *per packet*. This leds to the best
// performance outcome given that the possible packets can be either very small (one shard) or very
// large (hundreds/thousands of shards, for video). if we don't allow interleaving shards, a very
// small packet will need to wait a long time before getting received if there was an ongoing
Expand Down Expand Up @@ -373,6 +373,7 @@ struct RecvState {
shard_index: usize,
packet_cursor: usize, // counts also the prefix bytes
overwritten_data_backup: Option<[u8; SHARD_PREFIX_SIZE]>,
should_discard: bool,
}

struct InProgressPacket {
Expand All @@ -386,6 +387,7 @@ struct StreamRecvComponents {
used_buffer_receiver: mpsc::Receiver<Vec<u8>>,
packet_queue: mpsc::Sender<ReconstructedPacket>,
in_progress_packets: HashMap<u32, InProgressPacket>,
discarded_shards_sink: InProgressPacket,
}

// Note: used buffers don't *have* to be split by stream ID, but doing so improves memory usage
Expand All @@ -411,10 +413,8 @@ impl StreamSocket {
}

// max_concurrent_buffers: number of buffers allocated by this call which will be reused to
// receive packets for this stream ID. Receive threads MUST return the buffers with
// return_buffer(), otherwise all buffers will be exausted and this stream will hang. If all
// buffers are dropped or packets are not read fast enough by this StreamReceiver, all streams
// will hang.
// receive packets for this stream ID. If packets are not read fast enough, the shards received
// for this particular stream will be discarded
pub fn subscribe_to_stream<T>(
&mut self,
stream_id: u16,
Expand All @@ -434,6 +434,11 @@ impl StreamSocket {
used_buffer_receiver,
packet_queue: packet_sender,
in_progress_packets: HashMap::new(),
discarded_shards_sink: InProgressPacket {
buffer: vec![],
buffer_length: 0,
received_shard_indices: HashSet::new(),
},
},
);

Expand Down Expand Up @@ -472,6 +477,7 @@ impl StreamSocket {
shard_index,
packet_cursor: 0,
overwritten_data_backup: None,
should_discard: false,
})
};

Expand All @@ -483,17 +489,14 @@ impl StreamSocket {
return alvr_common::try_again();
};

let in_progress_packet = if let Some(packet) = components
let in_progress_packet = if shard_recv_state_mut.should_discard {
&mut components.discarded_shards_sink
} else if let Some(packet) = components
.in_progress_packets
.get_mut(&shard_recv_state_mut.packet_index)
{
packet
} else {
let buffer = components
.used_buffer_receiver
.try_recv()
.handle_try_again()?;

} else if let Ok(buffer) = components.used_buffer_receiver.try_recv() {
// NB: Can't use entry pattern because we want to allow bailing out on the line above
components.in_progress_packets.insert(
shard_recv_state_mut.packet_index,
Expand All @@ -510,6 +513,15 @@ impl StreamSocket {
.in_progress_packets
.get_mut(&shard_recv_state_mut.packet_index)
.unwrap()
} else {
// In case a stream got stuck and is not returning back empty buffers, discard the
// current shard
shard_recv_state_mut.should_discard = true;
shard_recv_state_mut.packet_cursor = 0; // reset cursor from old shards
// always write at the start of the packet so the buffer doesn't grow much
shard_recv_state_mut.shard_index = 0;

&mut components.discarded_shards_sink
};

let max_shard_data_size = self.max_packet_size - SHARD_PREFIX_SIZE;
Expand Down Expand Up @@ -557,9 +569,11 @@ impl StreamSocket {
.copy_from_slice(&shard_recv_state_mut.overwritten_data_backup.take().unwrap());
}

in_progress_packet
.received_shard_indices
.insert(shard_recv_state_mut.shard_index);
if !shard_recv_state_mut.should_discard {
in_progress_packet
.received_shard_indices
.insert(shard_recv_state_mut.shard_index);
}

// Check if packet is complete and send
if in_progress_packet.received_shard_indices.len() == shard_recv_state_mut.shards_count {
Expand Down

0 comments on commit 3e88963

Please sign in to comment.