diff --git a/dogsdogsdogs/src/operators/lookup_map.rs b/dogsdogsdogs/src/operators/lookup_map.rs index d0aa14cae..23b24108e 100644 --- a/dogsdogsdogs/src/operators/lookup_map.rs +++ b/dogsdogsdogs/src/operators/lookup_map.rs @@ -78,13 +78,16 @@ where // to observe the frontier and to drive scheduling. input2.for_each(|_, _| { }); - if let Some(ref mut trace) = propose_trace { + // Frontier for receiving the upper bound of `arrangement`. + let mut input2_trace_frontier = Antichain::new(); + if let Some(ref mut trace) = propose_trace { + trace.read_upper(&mut input2_trace_frontier); for (capability, prefixes) in stash.iter_mut() { // defer requests at incomplete times. // NOTE: not all updates may be at complete times, but if this test fails then none of them are. - if !input2.frontier.less_equal(capability.time()) { + if !input2_trace_frontier.less_equal(capability.time()) { let mut session = output.session(capability); @@ -98,7 +101,7 @@ where let (mut cursor, storage) = trace.cursor(); for &mut (ref prefix, ref time, ref mut diff) in prefixes.iter_mut() { - if !input2.frontier.less_equal(time) { + if !input2_trace_frontier.less_equal(time) { logic2(prefix, &mut key1); cursor.seek_key(&storage, &key1); if cursor.get_key(&storage) == Some(&key1) {