Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test for columnar reachability #579

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions timely/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ timely_communication = { path = "../communication", version = "0.12", default-fe
timely_container = { path = "../container", version = "0.12" }
crossbeam-channel = "0.5.0"
smallvec = { version = "1.13.2", features = ["serde", "const_generics"] }
columnar = { git = "https://github.com/frankmcsherry/columnar" }

[dev-dependencies]
# timely_sort="0.1.6"
Expand Down
2 changes: 1 addition & 1 deletion timely/examples/event_driven.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ fn main() {

println!("{:?}\tdataflows built ({} x {})", timer.elapsed(), dataflows, length);

for round in 0 .. {
for round in 0 .. 10 {
let dataflow = round % dataflows;
if record {
inputs[dataflow].send(());
Expand Down
119 changes: 85 additions & 34 deletions timely/src/progress/reachability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@
use std::collections::{BinaryHeap, HashMap, VecDeque};
use std::cmp::Reverse;

use columnar::{Len, Index};
use columnar::Vecs;

use crate::progress::Timestamp;
use crate::progress::{Source, Target};
use crate::progress::ChangeBatch;
Expand All @@ -84,6 +87,49 @@ use crate::progress::frontier::{Antichain, MutableAntichain};
use crate::progress::timestamp::PathSummary;


use antichains::Antichains;

/// A stand-in for `Vec<Antichain<T>>`.
mod antichains {

use columnar::{Len, Index, Push};
use columnar::Vecs;

use crate::progress::Antichain;

#[derive(Clone, Debug)]
pub struct Antichains<T> (Vecs<Vec<T>>);

impl<T> Default for Antichains<T> {
fn default() -> Self {
Self (Default::default())
}
}

impl<T> Len for Antichains<T> {
#[inline(always)] fn len(&self) -> usize { self.0.len() }
}

impl<T> Push<Antichain<T>> for Antichains<T> {
#[inline(always)]
fn push(&mut self, item: Antichain<T>) {
columnar::Push::extend(&mut self.0.values, item);
self.0.bounds.push(self.0.values.len());
}
}

impl<'a, T> Index for &'a Antichains<T> {
type Ref = <&'a Vecs<Vec<T>> as Index>::Ref;

#[inline(always)]
fn get(&self, index: usize) -> Self::Ref {
(&self.0).get(index)
}
}
}



/// A topology builder, which can summarize reachability along paths.
///
/// A `Builder` takes descriptions of the nodes and edges in a graph, and compiles
Expand Down Expand Up @@ -132,43 +178,43 @@ pub struct Builder<T: Timestamp> {
/// Indexed by operator index, then input port, then output port. This is the
/// same format returned by `get_internal_summary`, as if we simply appended
/// all of the summaries for the hosted nodes.
pub nodes: Vec<Vec<Vec<Antichain<T::Summary>>>>,
nodes: Vecs<Vecs<Antichains<T::Summary>>>,
/// Direct connections from sources to targets.
///
/// Edges do not affect timestamps, so we only need to know the connectivity.
/// Indexed by operator index then output port.
pub edges: Vec<Vec<Vec<Target>>>,
edges: Vec<Vec<Vec<Target>>>,
/// Numbers of inputs and outputs for each node.
pub shape: Vec<(usize, usize)>,
shape: Vec<(usize, usize)>,
}

impl<T: Timestamp> Builder<T> {

/// Create a new empty topology builder.
pub fn new() -> Self {
Builder {
nodes: Vec::new(),
nodes: Default::default(),
edges: Vec::new(),
shape: Vec::new(),
}
}

/// Add links internal to operators.
///
/// This method overwrites any existing summary, instead of anything more sophisticated.
/// Nodes must be added in strictly increasing order of `index`.
pub fn add_node(&mut self, index: usize, inputs: usize, outputs: usize, summary: Vec<Vec<Antichain<T::Summary>>>) {

// Assert that all summaries exist.
debug_assert_eq!(inputs, summary.len());
for x in summary.iter() { debug_assert_eq!(outputs, x.len()); }

while self.nodes.len() <= index {
self.nodes.push(Vec::new());
self.edges.push(Vec::new());
self.shape.push((0, 0));
}
assert_eq!(self.nodes.len(), index);

use columnar::Push;
self.nodes.push(summary);
self.edges.push(Vec::new());
self.shape.push((0, 0));

self.nodes[index] = summary;
if self.edges[index].len() != outputs {
self.edges[index] = vec![Vec::new(); outputs];
}
Expand Down Expand Up @@ -270,7 +316,7 @@ impl<T: Timestamp> Builder<T> {

// Load edges as default summaries.
for (index, ports) in self.edges.iter().enumerate() {
for (output, targets) in ports.iter().enumerate() {
for (output, targets) in (*ports).iter().enumerate() {
let source = Location::new_source(index, output);
in_degree.entry(source).or_insert(0);
for &target in targets.iter() {
Expand All @@ -281,13 +327,13 @@ impl<T: Timestamp> Builder<T> {
}

// Load default intra-node summaries.
for (index, summary) in self.nodes.iter().enumerate() {
for (input, outputs) in summary.iter().enumerate() {
for (index, summary) in (&self.nodes).into_iter().enumerate() {
for (input, outputs) in summary.into_iter().enumerate() {
let target = Location::new_target(index, input);
in_degree.entry(target).or_insert(0);
for (output, summaries) in outputs.iter().enumerate() {
for (output, summaries) in outputs.into_iter().enumerate() {
let source = Location::new_source(index, output);
for summary in summaries.elements().iter() {
for summary in summaries.into_iter() {
if summary == &Default::default() {
*in_degree.entry(source).or_insert(0) += 1;
}
Expand Down Expand Up @@ -322,9 +368,9 @@ impl<T: Timestamp> Builder<T> {
}
},
Port::Target(port) => {
for (output, summaries) in self.nodes[node][port].iter().enumerate() {
for (output, summaries) in (&self.nodes).get(node).get(port).into_iter().enumerate() {
let source = Location::new_source(node, output);
for summary in summaries.elements().iter() {
for summary in summaries.into_iter() {
if summary == &Default::default() {
*in_degree.get_mut(&source).unwrap() -= 1;
if in_degree[&source] == 0 {
Expand Down Expand Up @@ -361,12 +407,12 @@ pub struct Tracker<T:Timestamp> {
/// Indexed by operator index, then input port, then output port. This is the
/// same format returned by `get_internal_summary`, as if we simply appended
/// all of the summaries for the hosted nodes.
nodes: Vec<Vec<Vec<Antichain<T::Summary>>>>,
nodes: Vecs<Vecs<Antichains<T::Summary>>>,
/// Direct connections from sources to targets.
///
/// Edges do not affect timestamps, so we only need to know the connectivity.
/// Indexed by operator index then output port.
edges: Vec<Vec<Vec<Target>>>,
edges: Vecs<Vecs<Vec<Target>>>,

// TODO: All of the sizes of these allocations are static (except internal to `ChangeBatch`).
// It seems we should be able to flatten most of these so that there are a few allocations
Expand Down Expand Up @@ -544,10 +590,16 @@ impl<T:Timestamp> Tracker<T> {
let scope_outputs = builder.shape[0].0;
let output_changes = vec![ChangeBatch::new(); scope_outputs];

use columnar::Push;
let mut edges: Vecs<Vecs<Vec<Target>>> = Default::default();
for edge in builder.edges {
edges.push(edge);
}

let tracker =
Tracker {
nodes: builder.nodes,
edges: builder.edges,
edges,
per_operator,
target_changes: ChangeBatch::new(),
source_changes: ChangeBatch::new(),
Expand Down Expand Up @@ -663,10 +715,10 @@ impl<T:Timestamp> Tracker<T> {
.update_iter(Some((time, diff)));

for (time, diff) in changes {
let nodes = &self.nodes[location.node][port_index];
for (output_port, summaries) in nodes.iter().enumerate() {
let nodes = &(&self.nodes).get(location.node).get(port_index);
for (output_port, summaries) in nodes.into_iter().enumerate() {
let source = Location { node: location.node, port: Port::Source(output_port) };
for summary in summaries.elements().iter() {
for summary in summaries.into_iter() {
if let Some(new_time) = summary.results_in(&time) {
self.worklist.push(Reverse((new_time, source, diff)));
}
Expand All @@ -686,7 +738,7 @@ impl<T:Timestamp> Tracker<T> {
.update_iter(Some((time, diff)));

for (time, diff) in changes {
for new_target in self.edges[location.node][port_index].iter() {
for new_target in (&self.edges).get(location.node).get(port_index).into_iter() {
self.worklist.push(Reverse((
time.clone(),
Location::from(*new_target),
Expand Down Expand Up @@ -738,14 +790,14 @@ impl<T:Timestamp> Tracker<T> {
/// Graph locations may be missing from the output, in which case they have no
/// paths to scope outputs.
fn summarize_outputs<T: Timestamp>(
nodes: &Vec<Vec<Vec<Antichain<T::Summary>>>>,
nodes: &Vecs<Vecs<Antichains<T::Summary>>>,
edges: &Vec<Vec<Vec<Target>>>,
) -> HashMap<Location, Vec<Antichain<T::Summary>>>
{
// A reverse edge map, to allow us to walk back up the dataflow graph.
let mut reverse = HashMap::new();
for (node, outputs) in edges.iter().enumerate() {
for (output, targets) in outputs.iter().enumerate() {
for (node, outputs) in columnar::Index::into_iter(edges).enumerate() {
for (output, targets) in columnar::Index::into_iter(outputs).enumerate() {
for target in targets.iter() {
reverse.insert(
Location::from(*target),
Expand All @@ -759,10 +811,9 @@ fn summarize_outputs<T: Timestamp>(
let mut worklist = VecDeque::<(Location, usize, T::Summary)>::new();

let outputs =
edges
.iter()
.flat_map(|x| x.iter())
.flat_map(|x| x.iter())
columnar::Index::into_iter(edges)
.flat_map(|x| columnar::Index::into_iter(x))
.flat_map(|x| columnar::Index::into_iter(x))
.filter(|target| target.node == 0);

// The scope may have no outputs, in which case we can do no work.
Expand All @@ -780,7 +831,7 @@ fn summarize_outputs<T: Timestamp>(
Port::Source(output_port) => {

// Consider each input port of the associated operator.
for (input_port, summaries) in nodes[location.node].iter().enumerate() {
for (input_port, summaries) in nodes.get(location.node).into_iter().enumerate() {

// Determine the current path summaries from the input port.
let location = Location { node: location.node, port: Port::Target(input_port) };
Expand All @@ -792,7 +843,7 @@ fn summarize_outputs<T: Timestamp>(
while antichains.len() <= output { antichains.push(Antichain::new()); }

// Combine each operator-internal summary to the output with `summary`.
for operator_summary in summaries[output_port].elements().iter() {
for operator_summary in summaries.get(output_port).into_iter() {
if let Some(combined) = operator_summary.followed_by(&summary) {
if antichains[output].insert(combined.clone()) {
worklist.push_back((location, output, combined));
Expand Down