From d3920f3060fc3745b8a50170dafb2beaa898adc2 Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Tue, 22 Oct 2024 18:46:49 -0400 Subject: [PATCH] Minor: Cleaned physical-plan Comments (#13055) * Fixed documentations * small fixes --- datafusion/physical-plan/src/analyze.rs | 8 ++--- .../physical-plan/src/coalesce_batches.rs | 2 +- .../physical-plan/src/coalesce_partitions.rs | 4 +-- datafusion/physical-plan/src/common.rs | 14 ++++---- datafusion/physical-plan/src/display.rs | 1 + datafusion/physical-plan/src/empty.rs | 2 +- datafusion/physical-plan/src/explain.rs | 4 +-- datafusion/physical-plan/src/filter.rs | 18 +++++----- datafusion/physical-plan/src/insert.rs | 2 +- datafusion/physical-plan/src/limit.rs | 34 +++++++++---------- datafusion/physical-plan/src/memory.rs | 4 +-- .../physical-plan/src/placeholder_row.rs | 6 ++-- datafusion/physical-plan/src/projection.rs | 10 +++--- datafusion/physical-plan/src/stream.rs | 30 ++++++++-------- datafusion/physical-plan/src/streaming.rs | 6 ++-- datafusion/physical-plan/src/test.rs | 6 ++-- datafusion/physical-plan/src/unnest.rs | 28 +++++++-------- datafusion/physical-plan/src/values.rs | 6 ++-- datafusion/physical-plan/src/work_table.rs | 12 +++---- 19 files changed, 99 insertions(+), 98 deletions(-) diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index 287446328f8d..c8b329fabdaa 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -40,9 +40,9 @@ use futures::StreamExt; /// discards the results, and then prints out an annotated plan with metrics #[derive(Debug, Clone)] pub struct AnalyzeExec { - /// control how much extra to print + /// Control how much extra to print verbose: bool, - /// if statistics should be displayed + /// If statistics should be displayed show_statistics: bool, /// The input plan (the plan being analyzed) pub(crate) input: Arc, @@ -69,12 +69,12 @@ impl AnalyzeExec { } } - /// access to verbose + /// Access to verbose pub fn verbose(&self) -> bool { self.verbose } - /// access to show_statistics + /// Access to show_statistics pub fn show_statistics(&self) -> bool { self.show_statistics } diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index e1a2f32d8a38..61fb3599f013 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -52,7 +52,7 @@ use futures::stream::{Stream, StreamExt}; pub struct CoalesceBatchesExec { /// The input plan input: Arc, - /// Minimum number of rows for coalesces batches + /// Minimum number of rows for coalescing batches target_batch_size: usize, /// Maximum number of rows to fetch, `None` means fetching all rows fetch: Option, diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index 2ab6e3de1add..f9d4ec6a1a34 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -236,10 +236,10 @@ mod tests { let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 2)); let refs = blocking_exec.refs(); - let coaelesce_partitions_exec = + let coalesce_partitions_exec = Arc::new(CoalescePartitionsExec::new(blocking_exec)); - let fut = collect(coaelesce_partitions_exec, task_ctx); + let fut = collect(coalesce_partitions_exec, task_ctx); let mut fut = fut.boxed(); assert_is_pending(&mut fut); diff --git a/datafusion/physical-plan/src/common.rs b/datafusion/physical-plan/src/common.rs index 5abdf367c571..844208999d25 100644 --- a/datafusion/physical-plan/src/common.rs +++ b/datafusion/physical-plan/src/common.rs @@ -109,7 +109,7 @@ pub(crate) fn spawn_buffered( builder.spawn(async move { while let Some(item) = input.next().await { if sender.send(item).await.is_err() { - // receiver dropped when query is shutdown early (e.g., limit) or error, + // Receiver dropped when query is shutdown early (e.g., limit) or error, // no need to return propagate the send error. return Ok(()); } @@ -182,15 +182,15 @@ pub fn compute_record_batch_statistics( /// Write in Arrow IPC format. pub struct IPCWriter { - /// path + /// Path pub path: PathBuf, - /// inner writer + /// Inner writer pub writer: FileWriter, - /// batches written + /// Batches written pub num_batches: usize, - /// rows written + /// Rows written pub num_rows: usize, - /// bytes written + /// Bytes written pub num_bytes: usize, } @@ -315,7 +315,7 @@ mod tests { ], )?; - // just select f32,f64 + // Just select f32,f64 let select_projection = Some(vec![0, 1]); let byte_size = batch .project(&select_projection.clone().unwrap()) diff --git a/datafusion/physical-plan/src/display.rs b/datafusion/physical-plan/src/display.rs index 0d2653c5c775..4e936fb37a12 100644 --- a/datafusion/physical-plan/src/display.rs +++ b/datafusion/physical-plan/src/display.rs @@ -231,6 +231,7 @@ impl<'a> DisplayableExecutionPlan<'a> { } } +/// Enum representing the different levels of metrics to display #[derive(Debug, Clone, Copy)] enum ShowMetrics { /// Do not show any metrics diff --git a/datafusion/physical-plan/src/empty.rs b/datafusion/physical-plan/src/empty.rs index 4bacea48c347..f6e0abb94fa8 100644 --- a/datafusion/physical-plan/src/empty.rs +++ b/datafusion/physical-plan/src/empty.rs @@ -173,7 +173,7 @@ mod tests { let empty = EmptyExec::new(Arc::clone(&schema)); assert_eq!(empty.schema(), schema); - // we should have no results + // We should have no results let iter = empty.execute(0, task_ctx)?; let batches = common::collect(iter).await?; assert!(batches.is_empty()); diff --git a/datafusion/physical-plan/src/explain.rs b/datafusion/physical-plan/src/explain.rs index 56dc35e8819d..96f55a1446b0 100644 --- a/datafusion/physical-plan/src/explain.rs +++ b/datafusion/physical-plan/src/explain.rs @@ -67,7 +67,7 @@ impl ExplainExec { &self.stringified_plans } - /// access to verbose + /// Access to verbose pub fn verbose(&self) -> bool { self.verbose } @@ -112,7 +112,7 @@ impl ExecutionPlan for ExplainExec { } fn children(&self) -> Vec<&Arc> { - // this is a leaf node and has no children + // This is a leaf node and has no children vec![] } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index c39a91e251b7..30b0af19f43b 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -115,7 +115,7 @@ impl FilterExec { /// Return new instance of [FilterExec] with the given projection. pub fn with_projection(&self, projection: Option>) -> Result { - // check if the projection is valid + // Check if the projection is valid can_project(&self.schema(), projection.as_ref())?; let projection = match projection { @@ -157,7 +157,7 @@ impl FilterExec { self.default_selectivity } - /// projection + /// Projection pub fn projection(&self) -> Option<&Vec> { self.projection.as_ref() } @@ -255,9 +255,9 @@ impl FilterExec { let expr = Arc::new(column) as _; ConstExpr::new(expr).with_across_partitions(true) }); - // this is for statistics + // This is for statistics eq_properties = eq_properties.with_constants(constants); - // this is for logical constant (for example: a = '1', then a could be marked as a constant) + // This is for logical constant (for example: a = '1', then a could be marked as a constant) // to do: how to deal with multiple situation to represent = (for example c1 between 0 and 0) eq_properties = eq_properties.with_constants(Self::extend_constants(input, predicate)); @@ -331,7 +331,7 @@ impl ExecutionPlan for FilterExec { } fn maintains_input_order(&self) -> Vec { - // tell optimizer this operator doesn't reorder its input + // Tell optimizer this operator doesn't reorder its input vec![true] } @@ -425,7 +425,7 @@ struct FilterExecStream { predicate: Arc, /// The input partition to filter. input: SendableRecordBatchStream, - /// runtime metrics recording + /// Runtime metrics recording baseline_metrics: BaselineMetrics, /// The projection indices of the columns in the input schema projection: Option>, @@ -449,7 +449,7 @@ fn filter_and_project( .and_then(|v| v.into_array(batch.num_rows())) .and_then(|array| { Ok(match (as_boolean_array(&array), projection) { - // apply filter array to record batch + // Apply filter array to record batch (Ok(filter_array), None) => filter_record_batch(batch, filter_array)?, (Ok(filter_array), Some(projection)) => { let projected_columns = projection @@ -490,7 +490,7 @@ impl Stream for FilterExecStream { &self.schema, )?; timer.done(); - // skip entirely filtered batches + // Skip entirely filtered batches if filtered_batch.num_rows() == 0 { continue; } @@ -507,7 +507,7 @@ impl Stream for FilterExecStream { } fn size_hint(&self) -> (usize, Option) { - // same number of record batches + // Same number of record batches self.input.size_hint() } } diff --git a/datafusion/physical-plan/src/insert.rs b/datafusion/physical-plan/src/insert.rs index 5dc27bc239d2..dda45ebebb0c 100644 --- a/datafusion/physical-plan/src/insert.rs +++ b/datafusion/physical-plan/src/insert.rs @@ -271,7 +271,7 @@ fn make_count_batch(count: u64) -> RecordBatch { } fn make_count_schema() -> SchemaRef { - // define a schema. + // Define a schema. Arc::new(Schema::new(vec![Field::new( "count", DataType::UInt64, diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index a42e2da60587..eda75b37fe66 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -398,7 +398,7 @@ impl LimitStream { if batch.num_rows() > 0 { break poll; } else { - // continue to poll input stream + // Continue to poll input stream } } Poll::Ready(Some(Err(_e))) => break poll, @@ -408,12 +408,12 @@ impl LimitStream { } } - /// fetches from the batch + /// Fetches from the batch fn stream_limit(&mut self, batch: RecordBatch) -> Option { // records time on drop let _timer = self.baseline_metrics.elapsed_compute().timer(); if self.fetch == 0 { - self.input = None; // clear input so it can be dropped early + self.input = None; // Clear input so it can be dropped early None } else if batch.num_rows() < self.fetch { // @@ -422,7 +422,7 @@ impl LimitStream { } else if batch.num_rows() >= self.fetch { let batch_rows = self.fetch; self.fetch = 0; - self.input = None; // clear input so it can be dropped early + self.input = None; // Clear input so it can be dropped early // It is guaranteed that batch_rows is <= batch.num_rows Some(batch.slice(0, batch_rows)) @@ -453,7 +453,7 @@ impl Stream for LimitStream { other => other, }) } - // input has been cleared + // Input has been cleared None => Poll::Ready(None), }; @@ -489,17 +489,17 @@ mod tests { let num_partitions = 4; let csv = test::scan_partitioned(num_partitions); - // input should have 4 partitions + // Input should have 4 partitions assert_eq!(csv.output_partitioning().partition_count(), num_partitions); let limit = GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(csv)), 0, Some(7)); - // the result should contain 4 batches (one per input partition) + // The result should contain 4 batches (one per input partition) let iter = limit.execute(0, task_ctx)?; let batches = common::collect(iter).await?; - // there should be a total of 100 rows + // There should be a total of 100 rows let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum(); assert_eq!(row_count, 7); @@ -520,7 +520,7 @@ mod tests { let index = input.index(); assert_eq!(index.value(), 0); - // limit of six needs to consume the entire first record batch + // Limit of six needs to consume the entire first record batch // (5 rows) and 1 row from the second (1 row) let baseline_metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0); let limit_stream = @@ -550,7 +550,7 @@ mod tests { let index = input.index(); assert_eq!(index.value(), 0); - // limit of six needs to consume the entire first record batch + // Limit of six needs to consume the entire first record batch // (6 rows) and stop immediately let baseline_metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0); let limit_stream = @@ -580,7 +580,7 @@ mod tests { let index = input.index(); assert_eq!(index.value(), 0); - // limit of six needs to consume the entire first record batch + // Limit of six needs to consume the entire first record batch // (6 rows) and stop immediately let baseline_metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0); let limit_stream = @@ -598,7 +598,7 @@ mod tests { Ok(()) } - // test cases for "skip" + // Test cases for "skip" async fn skip_and_fetch(skip: usize, fetch: Option) -> Result { let task_ctx = Arc::new(TaskContext::default()); @@ -611,7 +611,7 @@ mod tests { let offset = GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(csv)), skip, fetch); - // the result should contain 4 batches (one per input partition) + // The result should contain 4 batches (one per input partition) let iter = offset.execute(0, task_ctx)?; let batches = common::collect(iter).await?; Ok(batches.iter().map(|batch| batch.num_rows()).sum()) @@ -633,7 +633,7 @@ mod tests { #[tokio::test] async fn skip_3_fetch_none() -> Result<()> { - // there are total of 400 rows, we skipped 3 rows (offset = 3) + // There are total of 400 rows, we skipped 3 rows (offset = 3) let row_count = skip_and_fetch(3, None).await?; assert_eq!(row_count, 397); Ok(()) @@ -641,7 +641,7 @@ mod tests { #[tokio::test] async fn skip_3_fetch_10_stats() -> Result<()> { - // there are total of 100 rows, we skipped 3 rows (offset = 3) + // There are total of 100 rows, we skipped 3 rows (offset = 3) let row_count = skip_and_fetch(3, Some(10)).await?; assert_eq!(row_count, 10); Ok(()) @@ -656,7 +656,7 @@ mod tests { #[tokio::test] async fn skip_400_fetch_1() -> Result<()> { - // there are a total of 400 rows + // There are a total of 400 rows let row_count = skip_and_fetch(400, Some(1)).await?; assert_eq!(row_count, 0); Ok(()) @@ -664,7 +664,7 @@ mod tests { #[tokio::test] async fn skip_401_fetch_none() -> Result<()> { - // there are total of 400 rows, we skipped 401 rows (offset = 3) + // There are total of 400 rows, we skipped 401 rows (offset = 3) let row_count = skip_and_fetch(401, None).await?; assert_eq!(row_count, 0); Ok(()) diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index 456f0ef2dcc8..52a8631d5a63 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -119,7 +119,7 @@ impl ExecutionPlan for MemoryExec { } fn children(&self) -> Vec<&Arc> { - // this is a leaf node and has no children + // This is a leaf node and has no children vec![] } @@ -179,7 +179,7 @@ impl MemoryExec { }) } - /// set `show_sizes` to determine whether to display partition sizes + /// Set `show_sizes` to determine whether to display partition sizes pub fn with_show_sizes(mut self, show_sizes: bool) -> Self { self.show_sizes = show_sizes; self diff --git a/datafusion/physical-plan/src/placeholder_row.rs b/datafusion/physical-plan/src/placeholder_row.rs index 272211d5056e..5d8ca7e76935 100644 --- a/datafusion/physical-plan/src/placeholder_row.rs +++ b/datafusion/physical-plan/src/placeholder_row.rs @@ -208,7 +208,7 @@ mod tests { let schema = test::aggr_test_schema(); let placeholder = PlaceholderRowExec::new(schema); - // ask for the wrong partition + // Ask for the wrong partition assert!(placeholder.execute(1, Arc::clone(&task_ctx)).is_err()); assert!(placeholder.execute(20, task_ctx).is_err()); Ok(()) @@ -223,7 +223,7 @@ mod tests { let iter = placeholder.execute(0, task_ctx)?; let batches = common::collect(iter).await?; - // should have one item + // Should have one item assert_eq!(batches.len(), 1); Ok(()) @@ -240,7 +240,7 @@ mod tests { let iter = placeholder.execute(n, Arc::clone(&task_ctx))?; let batches = common::collect(iter).await?; - // should have one item + // Should have one item assert_eq!(batches.len(), 1); } diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index a28328fb5d43..936cf742a792 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -90,7 +90,7 @@ impl ProjectionExec { input_schema.metadata().clone(), )); - // construct a map from the input expressions to the output expression of the Projection + // Construct a map from the input expressions to the output expression of the Projection let projection_mapping = ProjectionMapping::try_new(&expr, &input_schema)?; let cache = Self::compute_properties(&input, &projection_mapping, Arc::clone(&schema))?; @@ -183,7 +183,7 @@ impl ExecutionPlan for ProjectionExec { } fn maintains_input_order(&self) -> Vec { - // tell optimizer this operator doesn't reorder its input + // Tell optimizer this operator doesn't reorder its input vec![true] } @@ -240,7 +240,7 @@ impl ExecutionPlan for ProjectionExec { } } -/// If e is a direct column reference, returns the field level +/// If 'e' is a direct column reference, returns the field level /// metadata for that field, if any. Otherwise returns None pub(crate) fn get_field_metadata( e: &Arc, @@ -294,7 +294,7 @@ fn stats_projection( impl ProjectionStream { fn batch_project(&self, batch: &RecordBatch) -> Result { - // records time on drop + // Records time on drop let _timer = self.baseline_metrics.elapsed_compute().timer(); let arrays = self .expr @@ -340,7 +340,7 @@ impl Stream for ProjectionStream { } fn size_hint(&self) -> (usize, Option) { - // same number of record batches + // Same number of record batches self.input.size_hint() } } diff --git a/datafusion/physical-plan/src/stream.rs b/datafusion/physical-plan/src/stream.rs index faeb4799f5af..9220646653e6 100644 --- a/datafusion/physical-plan/src/stream.rs +++ b/datafusion/physical-plan/src/stream.rs @@ -56,7 +56,7 @@ pub(crate) struct ReceiverStreamBuilder { } impl ReceiverStreamBuilder { - /// create new channels with the specified buffer size + /// Create new channels with the specified buffer size pub fn new(capacity: usize) -> Self { let (tx, rx) = tokio::sync::mpsc::channel(capacity); @@ -83,10 +83,10 @@ impl ReceiverStreamBuilder { } /// Spawn a blocking task that will be aborted if this builder (or the stream - /// built from it) are dropped + /// built from it) are dropped. /// - /// this is often used to spawn tasks that write to the sender - /// retrieved from `Self::tx` + /// This is often used to spawn tasks that write to the sender + /// retrieved from `Self::tx`. pub fn spawn_blocking(&mut self, f: F) where F: FnOnce() -> Result<()>, @@ -103,7 +103,7 @@ impl ReceiverStreamBuilder { mut join_set, } = self; - // don't need tx + // Doesn't need tx drop(tx); // future that checks the result of the join set, and propagates panic if seen @@ -112,7 +112,7 @@ impl ReceiverStreamBuilder { match result { Ok(task_result) => { match task_result { - // nothing to report + // Nothing to report Ok(_) => continue, // This means a blocking task error Err(error) => return Some(Err(error)), @@ -215,7 +215,7 @@ pub struct RecordBatchReceiverStreamBuilder { } impl RecordBatchReceiverStreamBuilder { - /// create new channels with the specified buffer size + /// Create new channels with the specified buffer size pub fn new(schema: SchemaRef, capacity: usize) -> Self { Self { schema, @@ -256,7 +256,7 @@ impl RecordBatchReceiverStreamBuilder { self.inner.spawn_blocking(f) } - /// runs the `partition` of the `input` ExecutionPlan on the + /// Runs the `partition` of the `input` ExecutionPlan on the /// tokio threadpool and writes its outputs to this stream /// /// If the input partition produces an error, the error will be @@ -299,7 +299,7 @@ impl RecordBatchReceiverStreamBuilder { return Ok(()); } - // stop after the first error is encontered (don't + // Stop after the first error is encountered (Don't // drive all streams to completion) if is_err { debug!( @@ -483,13 +483,13 @@ mod test { async fn record_batch_receiver_stream_propagates_panics_early_shutdown() { let schema = schema(); - // make 2 partitions, second partition panics before the first + // Make 2 partitions, second partition panics before the first let num_partitions = 2; let input = PanicExec::new(Arc::clone(&schema), num_partitions) .with_partition_panic(0, 10) .with_partition_panic(1, 3); // partition 1 should panic first (after 3 ) - // ensure that the panic results in an early shutdown (that + // Ensure that the panic results in an early shutdown (that // everything stops after the first panic). // Since the stream reads every other batch: (0,1,0,1,0,panic) @@ -512,10 +512,10 @@ mod test { builder.run_input(Arc::new(input), 0, Arc::clone(&task_ctx)); let stream = builder.build(); - // input should still be present + // Input should still be present assert!(std::sync::Weak::strong_count(&refs) > 0); - // drop the stream, ensure the refs go to zero + // Drop the stream, ensure the refs go to zero drop(stream); assert_strong_count_converges_to_zero(refs).await; } @@ -539,7 +539,7 @@ mod test { builder.run_input(Arc::new(error_stream), 0, Arc::clone(&task_ctx)); let mut stream = builder.build(); - // get the first result, which should be an error + // Get the first result, which should be an error let first_batch = stream.next().await.unwrap(); let first_err = first_batch.unwrap_err(); assert_eq!(first_err.strip_backtrace(), "Execution error: Test1"); @@ -570,7 +570,7 @@ mod test { } let mut stream = builder.build(); - // drain the stream until it is complete, panic'ing on error + // Drain the stream until it is complete, panic'ing on error let mut num_batches = 0; while let Some(next) = stream.next().await { next.unwrap(); diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs index b02e4fb5738d..0f7c75c2c90b 100644 --- a/datafusion/physical-plan/src/streaming.rs +++ b/datafusion/physical-plan/src/streaming.rs @@ -295,7 +295,7 @@ mod test { #[tokio::test] async fn test_no_limit() { let exec = TestBuilder::new() - // make 2 batches, each with 100 rows + // Make 2 batches, each with 100 rows .with_batches(vec![make_partition(100), make_partition(100)]) .build(); @@ -306,9 +306,9 @@ mod test { #[tokio::test] async fn test_limit() { let exec = TestBuilder::new() - // make 2 batches, each with 100 rows + // Make 2 batches, each with 100 rows .with_batches(vec![make_partition(100), make_partition(100)]) - // limit to only the first 75 rows back + // Limit to only the first 75 rows back .with_limit(Some(75)) .build(); diff --git a/datafusion/physical-plan/src/test.rs b/datafusion/physical-plan/src/test.rs index 4da43b313403..90ec9b106850 100644 --- a/datafusion/physical-plan/src/test.rs +++ b/datafusion/physical-plan/src/test.rs @@ -65,7 +65,7 @@ pub fn aggr_test_schema() -> SchemaRef { Arc::new(schema) } -/// returns record batch with 3 columns of i32 in memory +/// Returns record batch with 3 columns of i32 in memory pub fn build_table_i32( a: (&str, &Vec), b: (&str, &Vec), @@ -88,7 +88,7 @@ pub fn build_table_i32( .unwrap() } -/// returns memory table scan wrapped around record batch with 3 columns of i32 +/// Returns memory table scan wrapped around record batch with 3 columns of i32 pub fn build_table_scan_i32( a: (&str, &Vec), b: (&str, &Vec), @@ -125,7 +125,7 @@ pub fn mem_exec(partitions: usize) -> MemoryExec { MemoryExec::try_new(&data, schema, projection).unwrap() } -// construct a stream partition for test purposes +// Construct a stream partition for test purposes #[derive(Debug)] pub struct TestPartitionStream { pub schema: SchemaRef, diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index 2311541816f3..40ec3830ea0c 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -62,9 +62,9 @@ pub struct UnnestExec { input: Arc, /// The schema once the unnest is applied schema: SchemaRef, - /// indices of the list-typed columns in the input schema + /// Indices of the list-typed columns in the input schema list_column_indices: Vec, - /// indices of the struct-typed columns in the input schema + /// Indices of the struct-typed columns in the input schema struct_column_indices: Vec, /// Options options: UnnestOptions, @@ -115,12 +115,12 @@ impl UnnestExec { &self.input } - /// indices of the list-typed columns in the input schema + /// Indices of the list-typed columns in the input schema pub fn list_column_indices(&self) -> &[ListUnnest] { &self.list_column_indices } - /// indices of the struct-typed columns in the input schema + /// Indices of the struct-typed columns in the input schema pub fn struct_column_indices(&self) -> &[usize] { &self.struct_column_indices } @@ -203,7 +203,7 @@ impl ExecutionPlan for UnnestExec { #[derive(Clone, Debug)] struct UnnestMetrics { - /// total time for column unnesting + /// Total time for column unnesting elapsed_compute: metrics::Time, /// Number of batches consumed input_batches: metrics::Count, @@ -411,7 +411,7 @@ fn list_unnest_at_level( level_to_unnest: usize, options: &UnnestOptions, ) -> Result<(Vec, usize)> { - // extract unnestable columns at this level + // Extract unnestable columns at this level let (arrs_to_unnest, list_unnest_specs): (Vec>, Vec<_>) = list_type_unnests .iter() @@ -422,7 +422,7 @@ fn list_unnest_at_level( *unnesting, )); } - // this means the unnesting on this item has started at higher level + // This means the unnesting on this item has started at higher level // and need to continue until depth reaches 1 if level_to_unnest < unnesting.depth { return Some(( @@ -434,7 +434,7 @@ fn list_unnest_at_level( }) .unzip(); - // filter out so that list_arrays only contain column with the highest depth + // Filter out so that list_arrays only contain column with the highest depth // at the same time, during iteration remove this depth so next time we don't have to unnest them again let longest_length = find_longest_length(&arrs_to_unnest, options)?; let unnested_length = longest_length.as_primitive::(); @@ -456,7 +456,7 @@ fn list_unnest_at_level( // Create the take indices array for other columns let take_indices = create_take_indicies(unnested_length, total_length); - // dimension of arrays in batch is untouch, but the values are repeated + // Dimension of arrays in batch is untouched, but the values are repeated // as the side effect of unnesting let ret = repeat_arrs_from_indices(batch, &take_indices)?; unnested_temp_arrays @@ -548,8 +548,8 @@ fn build_batch( // This arr always has the same column count with the input batch let mut flatten_arrs = vec![]; - // original batch has the same columns - // all unnesting results are written to temp_batch + // Original batch has the same columns + // All unnesting results are written to temp_batch for depth in (1..=max_recursion).rev() { let input = match depth == max_recursion { true => batch.columns(), @@ -593,11 +593,11 @@ fn build_batch( .map(|(order, unnest_def)| (*unnest_def, order)) .collect(); - // one original column may be unnested multiple times into separate columns + // One original column may be unnested multiple times into separate columns let mut multi_unnested_per_original_index = unnested_array_map .into_iter() .map( - // each item in unnested_columns is the result of unnesting the same input column + // Each item in unnested_columns is the result of unnesting the same input column // we need to sort them to conform with the original expression order // e.g unnest(unnest(col)) must goes before unnest(col) |(original_index, mut unnested_columns)| { @@ -636,7 +636,7 @@ fn build_batch( .into_iter() .enumerate() .flat_map(|(col_idx, arr)| { - // convert original column into its unnested version(s) + // Convert original column into its unnested version(s) // Plural because one column can be unnested with different recursion level // and into separate output columns match multi_unnested_per_original_index.remove(&col_idx) { diff --git a/datafusion/physical-plan/src/values.rs b/datafusion/physical-plan/src/values.rs index ab5b45463b0c..991146d245a7 100644 --- a/datafusion/physical-plan/src/values.rs +++ b/datafusion/physical-plan/src/values.rs @@ -47,7 +47,7 @@ pub struct ValuesExec { } impl ValuesExec { - /// create a new values exec from data as expr + /// Create a new values exec from data as expr pub fn try_new( schema: SchemaRef, data: Vec>>, @@ -57,7 +57,7 @@ impl ValuesExec { } let n_row = data.len(); let n_col = schema.fields().len(); - // we have this single row batch as a placeholder to satisfy evaluation argument + // We have this single row batch as a placeholder to satisfy evaluation argument // and generate a single output row let batch = RecordBatch::try_new_with_options( Arc::new(Schema::empty()), @@ -126,7 +126,7 @@ impl ValuesExec { }) } - /// provides the data + /// Provides the data pub fn data(&self) -> Vec { self.data.clone() } diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index ba95640a87c7..61d444171cc7 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -225,31 +225,31 @@ mod tests { #[test] fn test_work_table() { let work_table = WorkTable::new(); - // can't take from empty work_table + // Can't take from empty work_table assert!(work_table.take().is_err()); let pool = Arc::new(UnboundedMemoryPool::default()) as _; let mut reservation = MemoryConsumer::new("test_work_table").register(&pool); - // update batch to work_table + // Update batch to work_table let array: ArrayRef = Arc::new((0..5).collect::()); let batch = RecordBatch::try_from_iter(vec![("col", array)]).unwrap(); reservation.try_grow(100).unwrap(); work_table.update(ReservedBatches::new(vec![batch.clone()], reservation)); - // take from work_table + // Take from work_table let reserved_batches = work_table.take().unwrap(); assert_eq!(reserved_batches.batches, vec![batch.clone()]); - // consume the batch by the MemoryStream + // Consume the batch by the MemoryStream let memory_stream = MemoryStream::try_new(reserved_batches.batches, batch.schema(), None) .unwrap() .with_reservation(reserved_batches.reservation); - // should still be reserved + // Should still be reserved assert_eq!(pool.reserved(), 100); - // the reservation should be freed after drop the memory_stream + // The reservation should be freed after drop the memory_stream drop(memory_stream); assert_eq!(pool.reserved(), 0); }