Skip to content

Commit

Permalink
Add OwnedStream type
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Mar 5, 2023
1 parent cb41e11 commit 465f715
Show file tree
Hide file tree
Showing 35 changed files with 343 additions and 201 deletions.
11 changes: 6 additions & 5 deletions kafkaesque/src/kafka_source.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use timely::Data;
use timely::dataflow::{Scope, Stream};
use timely::dataflow::Scope;
use timely::dataflow::operators::Capability;
use timely::dataflow::operators::generic::OutputHandle;
use timely::dataflow::channels::pushers::Tee;

use rdkafka::Message;
use rdkafka::consumer::{ConsumerContext, BaseConsumer};
use timely::dataflow::channels::pushers::tee::PushOwned;
use timely::dataflow::stream::OwnedStream;

/// Constructs a stream of data from a Kafka consumer.
///
Expand Down Expand Up @@ -89,14 +90,14 @@ pub fn kafka_source<C, G, D, L>(
name: &str,
consumer: BaseConsumer<C>,
logic: L
) -> Stream<G, D>
) -> OwnedStream<G, Vec<D>>
where
C: ConsumerContext+'static,
G: Scope,
D: Data,
L: Fn(&[u8],
&mut Capability<G::Timestamp>,
&mut OutputHandle<G::Timestamp, D, Tee<G::Timestamp, D>>) -> bool+'static,
&mut OutputHandle<G::Timestamp, D, PushOwned<G::Timestamp, Vec<D>>>) -> bool+'static,
{
use timely::dataflow::operators::generic::source;
source(scope, name, move |capability, info| {
Expand Down Expand Up @@ -135,4 +136,4 @@ where
}

})
}
}
2 changes: 1 addition & 1 deletion timely/examples/bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ fn main() {
});
}
)
.concat(&(0..1).map(|x| (x,x)).to_stream(scope))
.concat((0..1).map(|x| (x,x)).to_stream(scope))
.connect_loop(handle);
});
}).unwrap(); // asserts error-free execution;
Expand Down
2 changes: 1 addition & 1 deletion timely/examples/loopdemo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ fn main() {

let step =
stream
.concat(&loop_stream)
.concat(loop_stream)
.map(|x| if x % 2 == 0 { x / 2 } else { 3 * x + 1 })
.filter(|x| x > &1);
step
Expand Down
2 changes: 1 addition & 1 deletion timely/examples/pingpong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ fn main() {
(0 .. elements)
.filter(move |&x| (x as usize) % peers == index)
.to_stream(scope)
.concat(&cycle)
.concat(cycle)
.exchange(|&x| x)
.map_in_place(|x| *x += 1)
.branch_when(move |t| t < &iterations).1
Expand Down
9 changes: 5 additions & 4 deletions timely/examples/unionfind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use timely::dataflow::*;
use timely::dataflow::operators::{Input, Exchange, Probe};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::stream::{OwnedStream, StreamLike};

fn main() {

Expand Down Expand Up @@ -50,12 +51,12 @@ fn main() {
}).unwrap(); // asserts error-free execution;
}

trait UnionFind {
fn union_find(self) -> Self;
trait UnionFind<G: Scope> {
fn union_find(self) -> OwnedStream<G, Vec<(usize, usize)>>;
}

impl<G: Scope> UnionFind for Stream<G, (usize, usize)> {
fn union_find(self) -> Stream<G, (usize, usize)> {
impl<G: Scope, S: StreamLike<G, Vec<(usize, usize)>>> UnionFind<G> for S {
fn union_find(self) -> OwnedStream<G, Vec<(usize, usize)>> {

self.unary(Pipeline, "UnionFind", |_,_| {

Expand Down
46 changes: 45 additions & 1 deletion timely/src/dataflow/channels/pushers/tee.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! A `Push` implementor with a list of `Box<Push>` to forward pushes to.

use std::cell::RefCell;
use std::fmt::{self, Debug};
use std::fmt::{self, Debug, Formatter};
use std::rc::Rc;

use crate::dataflow::channels::{BundleCore, Message};
Expand All @@ -10,6 +10,39 @@ use crate::communication::Push;
use crate::{Container, Data};

type PushList<T, D> = Rc<RefCell<Vec<Box<dyn Push<BundleCore<T, D>>>>>>;
/// TODO
pub struct PushOwned<T, D>(Rc<RefCell<Option<Box<dyn Push<BundleCore<T, D>>>>>>);

impl<T, D> PushOwned<T, D> {
/// TODO
pub fn new() -> (Self, Self) {
let zelf = Self(Rc::new(RefCell::new(None)));
(zelf.clone(), zelf)
}

/// TODO
pub fn set<P: Push<BundleCore<T, D>> + 'static>(&self, pusher: P) {
*self.0.borrow_mut() = Some(Box::new(pusher));
}
}

impl<T, D> Default for PushOwned<T, D> {
fn default() -> Self {
Self(Rc::new(RefCell::new(None)))
}
}

impl<T, D> Debug for PushOwned<T, D> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("PushOwned").finish_non_exhaustive()
}
}

impl<T, D> Clone for PushOwned<T, D> {
fn clone(&self) -> Self {
Self(Rc::clone(&self.0))
}
}

/// Wraps a shared list of `Box<Push>` to forward pushes to. Owned by `Stream`.
pub struct TeeCore<T, D> {
Expand All @@ -20,6 +53,17 @@ pub struct TeeCore<T, D> {
/// [TeeCore] specialized to `Vec`-based container.
pub type Tee<T, D> = TeeCore<T, Vec<D>>;

impl<T: Data, D: Container> Push<BundleCore<T, D>> for PushOwned<T, D> {
#[inline]
fn push(&mut self, message: &mut Option<BundleCore<T, D>>) {
let mut pusher = self.0.borrow_mut();
if let Some(pusher) = pusher.as_mut() {
pusher.push(message);
}
}
}


impl<T: Data, D: Container> Push<BundleCore<T, D>> for TeeCore<T, D> {
#[inline]
fn push(&mut self, message: &mut Option<BundleCore<T, D>>) {
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//! });
//! ```

pub use self::stream::{StreamCore, Stream};
pub use self::stream::{StreamCore, Stream, StreamLike, OwnedStream};
pub use self::scopes::{Scope, ScopeParent};

pub use self::operators::input::HandleCore as InputHandleCore;
Expand Down
5 changes: 3 additions & 2 deletions timely/src/dataflow/operators/aggregation/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{Data, ExchangeData};
use crate::dataflow::{Stream, Scope};
use crate::dataflow::operators::generic::operator::Operator;
use crate::dataflow::channels::pact::Exchange;
use crate::dataflow::stream::OwnedStream;

/// Generic intra-timestamp aggregation
///
Expand Down Expand Up @@ -64,7 +65,7 @@ pub trait Aggregate<S: Scope, K: ExchangeData+Hash, V: ExchangeData> {
self,
fold: F,
emit: E,
hash: H) -> Stream<S, R> where S::Timestamp: Eq;
hash: H) -> OwnedStream<S, Vec<R>> where S::Timestamp: Eq;
}

impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> Aggregate<S, K, V> for Stream<S, (K, V)> {
Expand All @@ -73,7 +74,7 @@ impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> Aggregate<S, K, V> for
self,
fold: F,
emit: E,
hash: H) -> Stream<S, R> where S::Timestamp: Eq {
hash: H) -> OwnedStream<S, Vec<R>> where S::Timestamp: Eq {

let mut aggregates = HashMap::new();
let mut vector = Vec::new();
Expand Down
5 changes: 3 additions & 2 deletions timely/src/dataflow/operators/aggregation/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{Data, ExchangeData};
use crate::dataflow::{Stream, Scope};
use crate::dataflow::operators::generic::operator::Operator;
use crate::dataflow::channels::pact::Exchange;
use crate::dataflow::stream::OwnedStream;

/// Generic state-transition machinery: each key has a state, and receives a sequence of events.
/// Events are applied in time-order, but no other promises are made. Each state transition can
Expand Down Expand Up @@ -51,7 +52,7 @@ pub trait StateMachine<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> {
I: IntoIterator<Item=R>, // type of output iterator
F: Fn(&K, V, &mut D)->(bool, I)+'static, // state update logic
H: Fn(&K)->u64+'static, // "hash" function for keys
>(self, fold: F, hash: H) -> Stream<S, R> where S::Timestamp : Hash+Eq ;
>(self, fold: F, hash: H) -> OwnedStream<S, Vec<R>> where S::Timestamp : Hash+Eq ;
}

impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> StateMachine<S, K, V> for Stream<S, (K, V)> {
Expand All @@ -61,7 +62,7 @@ impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> StateMachine<S, K, V> f
I: IntoIterator<Item=R>, // type of output iterator
F: Fn(&K, V, &mut D)->(bool, I)+'static, // state update logic
H: Fn(&K)->u64+'static, // "hash" function for keys
>(self, fold: F, hash: H) -> Stream<S, R> where S::Timestamp : Hash+Eq {
>(self, fold: F, hash: H) -> OwnedStream<S, Vec<R>> where S::Timestamp : Hash+Eq {

let mut pending: HashMap<_, Vec<(K, V)>> = HashMap::new(); // times -> (keys -> state)
let mut states = HashMap::new(); // keys -> state
Expand Down
13 changes: 7 additions & 6 deletions timely/src/dataflow/operators/branch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
use crate::dataflow::{Scope, Stream, StreamCore};
use crate::{Container, Data};
use crate::dataflow::stream::{OwnedStream, StreamLike};

/// Extension trait for `Stream`.
pub trait Branch<S: Scope, D: Data> {
Expand Down Expand Up @@ -31,14 +32,14 @@ pub trait Branch<S: Scope, D: Data> {
fn branch(
self,
condition: impl Fn(&S::Timestamp, &D) -> bool + 'static,
) -> (Stream<S, D>, Stream<S, D>);
) -> (OwnedStream<S, Vec<D>>, OwnedStream<S, Vec<D>>);
}

impl<S: Scope, D: Data> Branch<S, D> for Stream<S, D> {
fn branch(
self,
condition: impl Fn(&S::Timestamp, &D) -> bool + 'static,
) -> (Stream<S, D>, Stream<S, D>) {
) -> (OwnedStream<S, Vec<D>>, OwnedStream<S, Vec<D>>) {
let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope());

let mut input = builder.new_input(self, Pipeline);
Expand Down Expand Up @@ -71,7 +72,7 @@ impl<S: Scope, D: Data> Branch<S, D> for Stream<S, D> {
}

/// Extension trait for `Stream`.
pub trait BranchWhen<T>: Sized {
pub trait BranchWhen<G: Scope, C: Container>: Sized {
/// Takes one input stream and splits it into two output streams.
/// For each time, the supplied closure is called. If it returns true,
/// the records for that will be sent to the second returned stream, otherwise
Expand All @@ -91,11 +92,11 @@ pub trait BranchWhen<T>: Sized {
/// after_five.inspect(|x| println!("Times 5 and later: {:?}", x));
/// });
/// ```
fn branch_when(self, condition: impl Fn(&T) -> bool + 'static) -> (Self, Self);
fn branch_when(self, condition: impl Fn(&G::Timestamp) -> bool + 'static) -> (OwnedStream<G, C>, OwnedStream<G, C>);
}

impl<S: Scope, C: Container> BranchWhen<S::Timestamp> for StreamCore<S, C> {
fn branch_when(self, condition: impl Fn(&S::Timestamp) -> bool + 'static) -> (Self, Self) {
impl<G: Scope, C: Container, S: StreamLike<G, C>> BranchWhen<G, C> for S {
fn branch_when(self, condition: impl Fn(&G::Timestamp) -> bool + 'static) -> (OwnedStream<G, C>, OwnedStream<G, C>) {
let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope());

let mut input = builder.new_input(self, Pipeline);
Expand Down
9 changes: 5 additions & 4 deletions timely/src/dataflow/operators/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
use crate::ExchangeData;
use crate::dataflow::{Stream, Scope};
use crate::dataflow::operators::{Map, Exchange};
use crate::dataflow::stream::{OwnedStream, StreamLike};

/// Broadcast records to all workers.
pub trait Broadcast<D: ExchangeData> {
pub trait Broadcast<G: Scope, D: ExchangeData> {
/// Broadcast records to all workers.
///
/// # Examples
Expand All @@ -18,11 +19,11 @@ pub trait Broadcast<D: ExchangeData> {
/// .inspect(|x| println!("seen: {:?}", x));
/// });
/// ```
fn broadcast(self) -> Self;
fn broadcast(self) -> OwnedStream<G, Vec<D>>;
}

impl<G: Scope, D: ExchangeData> Broadcast<D> for Stream<G, D> {
fn broadcast(self) -> Stream<G, D> {
impl<G: Scope, D: ExchangeData, S: StreamLike<G, Vec<D>>> Broadcast<G, D> for S {
fn broadcast(self) -> OwnedStream<G, Vec<D>> {

// NOTE: Simplified implementation due to underlying motion
// in timely dataflow internals. Optimize once they have
Expand Down
13 changes: 7 additions & 6 deletions timely/src/dataflow/operators/capture/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ use crate::dataflow::channels::pullers::Counter as PullCounter;
use crate::dataflow::operators::generic::builder_raw::OperatorBuilder;

use crate::Container;
use crate::dataflow::stream::StreamLike;
use crate::progress::ChangeBatch;
use crate::progress::Timestamp;

use super::{EventCore, EventPusherCore};

/// Capture a stream of timestamped data for later replay.
pub trait Capture<T: Timestamp, D: Container>: Sized {
pub trait Capture<G: Scope, D: Container>: Sized {
/// Captures a stream of timestamped data for later replay.
///
/// # Examples
Expand Down Expand Up @@ -103,18 +104,18 @@ pub trait Capture<T: Timestamp, D: Container>: Sized {
///
/// assert_eq!(recv0.extract()[0].1, (0..10).collect::<Vec<_>>());
/// ```
fn capture_into<P: EventPusherCore<T, D>+'static>(self, pusher: P);
fn capture_into<P: EventPusherCore<G::Timestamp, D>+'static>(self, pusher: P);

/// Captures a stream using Rust's MPSC channels.
fn capture(self) -> ::std::sync::mpsc::Receiver<EventCore<T, D>> {
fn capture(self) -> ::std::sync::mpsc::Receiver<EventCore<G::Timestamp, D>> {
let (send, recv) = ::std::sync::mpsc::channel();
self.capture_into(send);
recv
}
}

impl<S: Scope, D: Container> Capture<S::Timestamp, D> for StreamCore<S, D> {
fn capture_into<P: EventPusherCore<S::Timestamp, D>+'static>(self, mut event_pusher: P) {
impl<G: Scope, D: Container, S: StreamLike<G, D>> Capture<G, D> for S {
fn capture_into<P: EventPusherCore<G::Timestamp, D>+'static>(self, mut event_pusher: P) {

let mut builder = OperatorBuilder::new("Capture".to_owned(), self.scope());
let mut input = PullCounter::new(builder.new_input(self, Pipeline));
Expand All @@ -125,7 +126,7 @@ impl<S: Scope, D: Container> Capture<S::Timestamp, D> for StreamCore<S, D> {

if !started {
// discard initial capability.
progress.frontiers[0].update(S::Timestamp::minimum(), -1);
progress.frontiers[0].update(Timestamp::minimum(), -1);
started = true;
}
if !progress.frontiers[0].is_empty() {
Expand Down
7 changes: 4 additions & 3 deletions timely/src/dataflow/operators/capture/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,26 @@ use crate::progress::Timestamp;
use super::EventCore;
use super::event::EventIteratorCore;
use crate::Container;
use crate::dataflow::stream::OwnedStream;

/// Replay a capture stream into a scope with the same timestamp.
pub trait Replay<T: Timestamp, C> : Sized {
/// Replays `self` into the provided scope, as a `Stream<S, D>`.
fn replay_into<S: Scope<Timestamp=T>>(self, scope: &mut S) -> StreamCore<S, C> {
fn replay_into<S: Scope<Timestamp=T>>(self, scope: &mut S) -> OwnedStream<S, C> {
self.replay_core(scope, Some(std::time::Duration::new(0, 0)))
}
/// Replays `self` into the provided scope, as a `Stream<S, D>'.
///
/// The `period` argument allows the specification of a re-activation period, where the operator
/// will re-activate itself every so often. The `None` argument instructs the operator not to
/// re-activate itself.us
fn replay_core<S: Scope<Timestamp=T>>(self, scope: &mut S, period: Option<std::time::Duration>) -> StreamCore<S, C>;
fn replay_core<S: Scope<Timestamp=T>>(self, scope: &mut S, period: Option<std::time::Duration>) -> OwnedStream<S, C>;
}

impl<T: Timestamp, C: Container, I> Replay<T, C> for I
where I : IntoIterator,
<I as IntoIterator>::Item: EventIteratorCore<T, C>+'static {
fn replay_core<S: Scope<Timestamp=T>>(self, scope: &mut S, period: Option<std::time::Duration>) -> StreamCore<S, C>{
fn replay_core<S: Scope<Timestamp=T>>(self, scope: &mut S, period: Option<std::time::Duration>) -> OwnedStream<S, C>{

let mut builder = OperatorBuilder::new("Replay".to_owned(), scope.clone());

Expand Down
Loading

0 comments on commit 465f715

Please sign in to comment.