Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

count_total panic with advanced trace #526

Closed
nooberfsh opened this issue Oct 18, 2024 · 10 comments · Fixed by #530 · May be fixed by #527
Closed

count_total panic with advanced trace #526

nooberfsh opened this issue Oct 18, 2024 · 10 comments · Fixed by #530 · May be fixed by #527

Comments

@nooberfsh
Copy link
Contributor

nooberfsh commented Oct 18, 2024

    timely::execute_directly(move |worker| {
        let mut probe = ProbeHandle::new();
        let (mut input, mut trace) = worker.dataflow::<u32, _, _>(|scope| {
            let (handle, input) = scope.new_collection();
            let arrange = input.arrange_by_self();
            arrange.stream.probe_with(&mut probe);
            (handle, arrange.trace)
        });

        // ingest some batches
        for _ in 0..10 {
            input.insert(10);
            input.advance_to(input.time() + 1);
            input.flush();
            worker.step_while(|| probe.less_than(input.time()));
        }

        // advance the trace
        trace.set_logical_compaction(AntichainRef::new(&[2]));
        trace.set_physical_compaction(AntichainRef::new(&[2]));

        worker.dataflow::<u32, _, _>(|scope| {
            let arrange = trace.import(scope);
            arrange
                .count_total() // <-- panic
                .inspect(|d| println!("{:?}", d));
        });
    })

The above code panic:

thread 'main' panicked at /home/tom/hdx/apps/cargo/git/checkouts/differential-dataflow-d065d23d797aa027/b5046c8/src/trace/implementations/spine_fueled.rs:150:9:
assertion failed: PartialOrder::less_equal(&self.physical_frontier.borrow(), &upper)
note: run with RUST_BACKTRACE=1 environment variable to display a backtrace

It seems count_total only works for traces with default compaction frontier. Is it intentional?
Full code is here

@frankmcsherry
Copy link
Member

Almost certainly not intended. Will investigate today; thanks very much for the report!

@antiguru
Copy link
Member

threshold_total has the same issue. I wonder if the following diff is a correct fix:

diff --git a/src/operators/count.rs b/src/operators/count.rs
index e44a59a8..23572301 100644
--- a/src/operators/count.rs
+++ b/src/operators/count.rs
@@ -69,7 +69,8 @@ where
 
         self.stream.unary_frontier(Pipeline, "CountTotal", move |_,_| {
 
-            // tracks the upper limit of known-complete timestamps.
+            // tracks the lower and upper limits of known-complete timestamps.
+            let mut lower_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
             let mut upper_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
 
             move |input, output| {
@@ -80,7 +81,8 @@ where
                     let mut session = output.session(&capability);
                     for batch in buffer.drain(..) {
                         let mut batch_cursor = batch.cursor();
-                        let (mut trace_cursor, trace_storage) = trace.cursor_through(batch.lower().borrow()).unwrap();
+                        trace.advance_upper(&mut lower_limit);
+                        let (mut trace_cursor, trace_storage) = trace.cursor_through(lower_limit.borrow()).unwrap();
                         upper_limit.clone_from(batch.upper());
 
                         while let Some(key) = batch_cursor.get_key(&batch) {
diff --git a/src/operators/threshold.rs b/src/operators/threshold.rs
index 3d8a405a..027193fb 100644
--- a/src/operators/threshold.rs
+++ b/src/operators/threshold.rs
@@ -117,7 +117,8 @@ where
 
         self.stream.unary_frontier(Pipeline, "ThresholdTotal", move |_,_| {
 
-            // tracks the upper limit of known-complete timestamps.
+            // tracks the lower and upper limits of known-complete timestamps.
+            let mut lower_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
             let mut upper_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
 
             move |input, output| {
@@ -126,9 +127,9 @@ where
                     batches.swap(&mut buffer);
                     let mut session = output.session(&capability);
                     for batch in buffer.drain(..) {
-
+                        trace.advance_upper(&mut lower_limit);
                         let mut batch_cursor = batch.cursor();
-                        let (mut trace_cursor, trace_storage) = trace.cursor_through(batch.lower().borrow()).unwrap();
+                        let (mut trace_cursor, trace_storage) = trace.cursor_through(lower_limit.borrow()).unwrap();
 
                         upper_limit.clone_from(batch.upper());

antiguru added a commit to antiguru/differential-dataflow that referenced this issue Oct 18, 2024
Fixes a bug where we'd obtain a trace's cursor based on a batch's lower
frontier. The trace can be compacted past the batch's lower frontier,
which violates `curso_through`'s assumptions. Instead, we use the trace's
upper to obtain the cursor.

Fixes TimelyDataflow#526.

Signed-off-by: Moritz Hoffmann <[email protected]>
antiguru added a commit to antiguru/differential-dataflow that referenced this issue Oct 18, 2024
Fixes a bug where we'd obtain a trace's cursor based on a batch's lower
frontier. The trace can be compacted past the batch's lower frontier,
which violates `cursor_through`'s assumptions. Instead, we use the trace's
upper to obtain the cursor.

Fixes TimelyDataflow#526.

Signed-off-by: Moritz Hoffmann <[email protected]>
@nooberfsh
Copy link
Contributor Author

@antiguru After the patch, the demo program won't panic, but it produce invalid data

((10, 1), 0, 1)
((10, 1), 1, 1)
((10, 1), 2, 1)
((10, 1), 3, 1)
((10, 1), 4, 1)

...

You can see the history count was never extracted, the correct output should be

((10, 1), 0, 1)
((10, 1), 1, -1)
((10, 2), 1, 1)
((10, 2), 2, -1)
((10, 3), 2, 1)
((10, 3), 3, -1)
((10, 4), 3, 1)
((10, 4), 4, -1)
((10, 5), 4, 1)
...

I think the main problem is that count_total handles batch one by one, the upper_limit would bump when the next batch begins and it cause the demo problem panic. One way to workaround it is we can drain all batches first, then handle
the count logic key by key, after that we bump the upper_limit, this is the way the reduce operator do.

Another way is we can handle all the data in the trace in the first place, then we do the computation batch by batch, the
join operator does the similar thing. I have implemented it at here

@frankmcsherry
Copy link
Member

I'm going to peek at this now, but one thing that .. probably isn't clear even to me .. is that importing an advanced trace is some amount of undefined. The operators will react to the data they are presented, but there is no guarantee that a trace that has been allowed to compact will have done so. I recommend using import_frontier or potentially import_frontier_core, which will pull out the logical compaction frontier, and bring all updates up to at least that frontier.

The doccomments for import lightly suggest that there is a problem here, but it's definitely a footgun.

    /// The current behavior is that the introduced collection accumulates updates to some times less or equal
    /// to `self.get_logical_compaction()`. There is *not* currently a guarantee that the updates are accumulated *to*
    /// the frontier, and the resulting collection history may be weirdly partial until this point. In particular,
    /// the historical collection may move through configurations that did not actually occur, even if eventually
    /// arriving at the correct collection. This is probably a bug; although we get to the right place in the end,
    /// the intermediate computation could do something that the original computation did not, like diverge.
    ///
    /// I would expect the semantics to improve to "updates are advanced to `self.get_logical_compaction()`", which
    /// means the computation will run as if starting from exactly this frontier. It is not currently clear whose
    /// responsibility this should be (the trace/batch should only reveal these times, or an operator should know
    /// to advance times before using them).

The panic occurs even if using import_frontier, so there's still a bug, but I think the "correct answer" is closer to what happens when you use import_frontier and count() (no _total).

((10, 3), 2, 1)
((10, 3), 3, -1)
((10, 4), 3, 1)
((10, 4), 4, -1)
((10, 5), 4, 1)
((10, 5), 5, -1)
((10, 6), 5, 1)
((10, 6), 6, -1)
((10, 7), 6, 1)
((10, 7), 7, -1)
((10, 8), 7, 1)
((10, 8), 8, -1)
((10, 9), 8, 1)
((10, 9), 9, -1)
((10, 10), 9, 1)

It may be as simple as what @antiguru has, though I need to page in this logic. T.T

@frankmcsherry
Copy link
Member

It seems that the @antiguru fix is not exactly right, in that it produces

((10, 1), 2, 1)
((10, 1), 2, 1)
((10, 1), 2, 1)
((10, 1), 3, 1)
((10, 1), 4, 1)
((10, 1), 5, 1)
((10, 1), 6, 1)
((10, 1), 7, 1)
((10, 1), 8, 1)
((10, 1), 9, 1)

which hasn't counted anything (ideally we'd see something other than (10, 1)). Next step: diving in to count.rs!

@frankmcsherry
Copy link
Member

Quick diagnosis: reduce.rs doesn't have this problem, because when scheduled it drains all available batches and treats them as one. This means that on its initial scheduling, it grabs a cursor at [0], and by the next scheduling it uses the last upper of batches it saw previously. This avoids using an intermediate compacted frontier, which is what is causing the panic.

Little bit of a rewrite to fix this up, but .. makes sense.

@frankmcsherry
Copy link
Member

I've got a fix put together, but probably worth getting eyes on it when @antiguru has free cycles (e.g. not the weekend). Sorry about the crash! These methods are definitely less well attended to than others, but .. they shouldn't be crashing! :D

@frankmcsherry
Copy link
Member

frankmcsherry commented Oct 26, 2024

One caveat is that count_total and threshold_total are unable to consolidate their output at the moment. They .. could. But at the moment they do not, and as a consequence, with import_frontier they'll emit various self-canceling updates. This is a consequence of how import_frontier works: the act of bringing updates to the logical compaction frontier may introduce cancelation, but the act of importing cannot modify the batches, only wrap them with logic that presents them at different times. The cancelation is more complicated for that operator to perform (though, there's another PR around that means to do this). This is .. mostly cosmetic, but I'll take a look into that PR as well!

@nooberfsh
Copy link
Contributor Author

Thanks for pointing out the existence of import_frontier, It makes much more sense to use import_frontier to import advanced trace to another scope than import.

I found #530 have a small problem :

        timely::execute_directly(move |worker| {
            let mut probe = ProbeHandle::new();
            let (mut input, mut trace) = worker.dataflow::<u32, _, _>(|scope| {
                let (handle, input) = scope.new_collection();
                let arrange = input.arrange_by_self();
                arrange.stream.probe_with(&mut probe);
                (handle, arrange.trace)
            });

            // introduce empty batch
            input.advance_to(input.time() + 1);
            input.flush();
            worker.step_while(|| probe.less_than(input.time()));

            // ingest some batches
            for _ in 0..10 {
                input.insert(10);
                input.advance_to(input.time() + 1);
                input.flush();
                worker.step_while(|| probe.less_than(input.time()));
            }

            // advance the trace
            trace.set_physical_compaction(AntichainRef::new(&[2]));
            trace.set_logical_compaction(AntichainRef::new(&[2]));

            worker.dataflow::<u32, _, _>(|scope| {
                let arrange = trace.import(scope);
                arrange
                    .count_total()
                    .inspect(|x| println!("{:?}", x));
            });
        });

If I introduce some empty batches before ingesting any data the program still panics.
Remove trace.advance_upper(&mut lower_limit); make it works as expected.

@frankmcsherry
Copy link
Member

frankmcsherry commented Oct 27, 2024

Ah, right that line shouldn't be there. The lower_limit gets advanced up above, where we copy in the previous upper_limit. Hrm. Feels bad just deleting that and announcing "tada", but it might be the right answer. =/

PR updated to reflect this. It seems like that same bug didn't make its way into threshold.rs, which maybe speaks to it being more of a glitch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants