Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidate activation requests as they occur #502

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 127 additions & 45 deletions timely/src/scheduling/activate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,10 @@ impl Scheduler for Box<dyn Scheduler> {
/// Allocation-free activation tracker.
#[derive(Debug)]
pub struct Activations {
clean: usize,
/// `(offset, length)`
bounds: Vec<(usize, usize)>,
slices: Vec<usize>,
buffer: Vec<usize>,
/// Current activations that are being served.
current: ActivationsBuffer,
/// Upcoming activations that may soon be served.
pending: ActivationsBuffer,

// Inter-thread activations.
tx: Sender<Vec<usize>>,
Expand All @@ -60,10 +59,8 @@ impl Activations {
pub fn new(timer: Instant) -> Self {
let (tx, rx) = crossbeam_channel::unbounded();
Self {
clean: 0,
bounds: Vec::new(),
slices: Vec::new(),
buffer: Vec::new(),
current: ActivationsBuffer::new(),
pending: ActivationsBuffer::new(),
tx,
rx,
timer,
Expand All @@ -73,8 +70,7 @@ impl Activations {

/// Activates the task addressed by `path`.
pub fn activate(&mut self, path: &[usize]) {
self.bounds.push((self.slices.len(), path.len()));
self.slices.extend(path);
self.pending.activate(path);
}

/// Schedules a future activation for the task addressed by `path`.
Expand All @@ -89,6 +85,20 @@ impl Activations {
}
}

/// Reactivates the task addressed by `path`, ideally within `delay`.
///
/// The task may be activated before `delay`, in which case the task should reactivate itself if
/// it requires further reactivation, as this specific `delay` may no longer be in effect.
pub fn activate_within(&mut self, path: &[usize], delay: Duration) {
if delay == Duration::new(0, 0) {
self.activate(path)
}
else {
let moment = self.timer.elapsed() + delay;
self.pending.activate_by(path, moment)
}
}

/// Discards the current active set and presents the next active set.
pub fn advance(&mut self) {

Expand All @@ -104,37 +114,135 @@ impl Activations {
self.activate(&path[..]);
}

self.bounds.drain(.. self.clean);
self.current.clear();
self.pending.compact();
self.pending.extract_through(&mut self.current, now);

}

/// Maps a function across activated paths.
pub fn map_active(&self, logic: impl Fn(&[usize])) {
self.current.map_active(logic)
}

/// Sets as active any symbols that follow `path`.
pub fn for_extensions(&self, path: &[usize], action: impl FnMut(usize)) {
self.current.for_extensions(path, action)
}

/// Constructs a thread-safe `SyncActivations` handle to this activator.
pub fn sync(&self) -> SyncActivations {
SyncActivations {
tx: self.tx.clone(),
thread: std::thread::current(),
}
}

/// Time until next scheduled event.
///
/// This method should be used before putting a worker thread to sleep, as it
/// indicates the amount of time before the thread should be unparked for the
/// next scheduled activation.
pub fn empty_for(&self) -> Option<Duration> {
if !self.current.is_empty() || !self.pending.is_empty() {
Some(Duration::new(0,0))
}
else {
self.queue.peek().map(|Reverse((t,_a))| {
let elapsed = self.timer.elapsed();
if t < &elapsed { Duration::new(0,0) }
else { *t - elapsed }
})
}
}
}

/// Manages delayed activations for path-named tasks.
#[derive(Debug)]
pub struct ActivationsBuffer {
/// `(offset, length)`, and an elapsed timer duration.
/// The zero duration can be used to indicate "immediately".
bounds: Vec<(usize, usize, Duration)>,
/// Elements of path slices.
slices: Vec<usize>,
/// A spare buffer to copy into.
buffer: Vec<usize>,
}

impl ActivationsBuffer {
/// Creates a new activation tracker.
pub fn new() -> Self {
Self {
bounds: Vec::new(),
slices: Vec::new(),
buffer: Vec::new(),
}
}

fn clear(&mut self) {
self.bounds.clear();
self.slices.clear();
self.buffer.clear();
}

fn is_empty(&self) -> bool {
self.bounds.is_empty()
}

/// Activates the task addressed by `path`.
pub fn activate(&mut self, path: &[usize]) {
self.activate_by(path, Duration::new(0, 0));
}

/// Activates the task addressed by `path`.
pub fn activate_by(&mut self, path: &[usize], duration: Duration) {
self.bounds.push((self.slices.len(), path.len(), duration));
self.slices.extend(path);
}

/// Orders activations by their path, and retains only each's most immediate duration.
pub fn compact(&mut self) {

{ // Scoped, to allow borrow to drop.
let slices = &self.slices[..];
self.bounds.sort_by_key(|x| &slices[x.0 .. (x.0 + x.1)]);
// Sort slices by path, and within each path by duration.
self.bounds.sort_by_key(|x| (&slices[x.0 .. (x.0 + x.1)], x.2));
// Deduplicate by path, retaining the least duration.
self.bounds.dedup_by_key(|x| &slices[x.0 .. (x.0 + x.1)]);
}

// Compact the slices.
self.buffer.clear();
for (offset, length) in self.bounds.iter_mut() {
for (offset, length, _duration) in self.bounds.iter_mut() {
self.buffer.extend(&self.slices[*offset .. (*offset + *length)]);
*offset = self.buffer.len() - *length;
}
::std::mem::swap(&mut self.buffer, &mut self.slices);
}

self.clean = self.bounds.len();
/// Extracts all activations less or equal to `threshold` into `other`.
pub fn extract_through(&mut self, other: &mut Self, threshold: Duration) {
for (offset, length, duration) in self.bounds.iter_mut() {
if *duration <= threshold {
other.activate_by(&self.slices[*offset .. (*offset + *length)], *duration);
}
}
self.bounds.retain(|(_off, _len, duration)| *duration > threshold);
// Could `self.compact()` here, but it will happen as part of future work.
}

/// Maps a function across activated paths.
pub fn map_active(&self, logic: impl Fn(&[usize])) {
for (offset, length) in self.bounds.iter() {
for (offset, length, _duration) in self.bounds.iter() {
logic(&self.slices[*offset .. (*offset + *length)]);
}
}

/// Sets as active any symbols that follow `path`.
pub fn for_extensions(&self, path: &[usize], mut action: impl FnMut(usize)) {

let position =
self.bounds[..self.clean]
self.bounds
.binary_search_by_key(&path, |x| &self.slices[x.0 .. (x.0 + x.1)]);
let position = match position {
Ok(x) => x,
Expand All @@ -146,7 +254,7 @@ impl Activations {
.iter()
.cloned()
.skip(position)
.map(|(offset, length)| &self.slices[offset .. (offset + length)])
.map(|(offset, length, _)| &self.slices[offset .. (offset + length)])
.take_while(|x| x.starts_with(path))
.for_each(|x| {
// push non-empty, non-duplicate extensions.
Expand All @@ -158,32 +266,6 @@ impl Activations {
}
});
}

/// Constructs a thread-safe `SyncActivations` handle to this activator.
pub fn sync(&self) -> SyncActivations {
SyncActivations {
tx: self.tx.clone(),
thread: std::thread::current(),
}
}

/// Time until next scheduled event.
///
/// This method should be used before putting a worker thread to sleep, as it
/// indicates the amount of time before the thread should be unparked for the
/// next scheduled activation.
pub fn empty_for(&self) -> Option<Duration> {
if !self.bounds.is_empty() {
Some(Duration::new(0,0))
}
else {
self.queue.peek().map(|Reverse((t,_a))| {
let elapsed = self.timer.elapsed();
if t < &elapsed { Duration::new(0,0) }
else { *t - elapsed }
})
}
}
}

/// A thread-safe handle to an `Activations`.
Expand Down
Loading