Skip to content

Commit

Permalink
Rename Receiver to Consumer and cleanup DisruptorWrapper.
Browse files Browse the repository at this point in the history
  • Loading branch information
nicholassm committed Jul 28, 2023
1 parent e80db58 commit 1c751a2
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 17 deletions.
8 changes: 4 additions & 4 deletions src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ use crate::DisruptorWrapper;
use crate::producer::ProducerBarrier;
use crate::wait_strategies::WaitStrategy;

pub(crate) struct Receiver {
pub(crate) struct Consumer {
join_handle: Option<JoinHandle<()>>
}

impl Receiver {
pub(crate) fn new<E, F, W, P>(wrapper: DisruptorWrapper<E, P>, mut process: F, wait_strategy: W) -> Receiver where
impl Consumer {
pub(crate) fn new<E, F, W, P>(wrapper: DisruptorWrapper<E, P>, mut process: F, wait_strategy: W) -> Consumer where
F: Send + FnMut(&E, i64, bool) + 'static,
E: 'static,
W: WaitStrategy + 'static,
Expand Down Expand Up @@ -43,7 +43,7 @@ impl Receiver {
}
});

Receiver { join_handle: Some(join_handle) }
Consumer { join_handle: Some(join_handle) }
}

pub(crate) fn join(&mut self) {
Expand Down
18 changes: 8 additions & 10 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ mod consumer;
use std::cell::UnsafeCell;
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use crossbeam_utils::CachePadded;
use crate::consumer::Receiver;
use crate::consumer::Consumer;
use crate::producer::{ProducerBarrier, Producer, SingleProducerBarrier};
use crate::wait_strategies::WaitStrategy;

Expand Down Expand Up @@ -162,8 +162,8 @@ impl<E, P, W> Builder<E, P, W> where
)
);

let wrapper = DisruptorWrapper { disruptor };
let receiver = Receiver::new(wrapper, self.processor, self.wait_strategy);
let wrapper = DisruptorWrapper(disruptor);
let receiver = Consumer::new(wrapper, self.processor, self.wait_strategy);
Producer::new(disruptor, receiver, self.ring_buffer_size - 1)
}
}
Expand All @@ -172,16 +172,14 @@ struct Slot<E> {
event: UnsafeCell<E>
}

struct DisruptorWrapper<T, P: ProducerBarrier> {
disruptor: *mut Disruptor<T, P>
}
// Needed for providing a `Disruptor` reference to the Consumer thread.
struct DisruptorWrapper<T, P: ProducerBarrier> (*mut Disruptor<T, P>);

unsafe impl<E, P: ProducerBarrier> Send for DisruptorWrapper<E, P> {}

impl<T, P: ProducerBarrier> DisruptorWrapper<T, P> {
#[inline]
fn unwrap(&self) -> &Disruptor<T, P> {
unsafe { &*self.disruptor }
impl<E, P: ProducerBarrier> DisruptorWrapper<E, P> {
fn unwrap(&self) -> &Disruptor<E, P> {
unsafe { &*self.0 }
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use crossbeam_utils::CachePadded;
use std::sync::atomic::{AtomicI64, Ordering};
use crate::consumer::Receiver;
use crate::consumer::Consumer;
use crate::Disruptor;

pub(crate) trait ProducerBarrier {
Expand Down Expand Up @@ -36,7 +36,7 @@ impl ProducerBarrier for SingleProducerBarrier {
/// Producer for publishing to the Disruptor from a single thread.
pub struct Producer<E> {
disruptor: *mut Disruptor<E, SingleProducerBarrier>,
receiver: Receiver,
receiver: Consumer,
available_publisher_sequence: i64,
}

Expand All @@ -52,7 +52,7 @@ pub struct RingBufferFull;
impl<E> Producer<E> {
pub(crate) fn new(
disruptor: *mut Disruptor<E, SingleProducerBarrier>,
receiver: Receiver,
receiver: Consumer,
available_publisher_sequence: i64
) -> Self {
Producer {
Expand Down

0 comments on commit 1c751a2

Please sign in to comment.