diff --git a/crates/polars-io/src/parquet/write/batched_writer.rs b/crates/polars-io/src/parquet/write/batched_writer.rs index 86b95bc36f85..4cef9041986d 100644 --- a/crates/polars-io/src/parquet/write/batched_writer.rs +++ b/crates/polars-io/src/parquet/write/batched_writer.rs @@ -4,7 +4,7 @@ use std::sync::Mutex; use arrow::record_batch::RecordBatch; use polars_core::prelude::*; use polars_core::POOL; -use polars_parquet::read::ParquetError; +use polars_parquet::read::{fallible_streaming_iterator, ParquetError}; use polars_parquet::write::{ array_to_columns, CompressedPage, Compressor, DynIter, DynStreamingIterator, Encoding, FallibleStreamingIterator, FileWriter, Page, ParquetType, RowGroupIterColumns, @@ -15,7 +15,10 @@ use rayon::prelude::*; pub struct BatchedWriter { // A mutex so that streaming engine can get concurrent read access to // compress pages. + // + // @TODO: Remove mutex when old streaming engine is removed pub(super) writer: Mutex>, + // @TODO: Remove when old streaming engine is removed pub(super) parquet_schema: SchemaDescriptor, pub(super) encodings: Vec>, pub(super) options: WriteOptions, @@ -23,6 +26,21 @@ pub struct BatchedWriter { } impl BatchedWriter { + pub fn new( + writer: Mutex>, + encodings: Vec>, + options: WriteOptions, + parallel: bool, + ) -> Self { + Self { + writer, + parquet_schema: SchemaDescriptor::new(PlSmallStr::EMPTY, vec![]), + encodings, + options, + parallel, + } + } + pub fn encode_and_compress<'a>( &'a self, df: &'a DataFrame, @@ -63,6 +81,22 @@ impl BatchedWriter { Ok(()) } + pub fn parquet_schema(&mut self) -> &SchemaDescriptor { + let writer = self.writer.get_mut().unwrap(); + writer.parquet_schema() + } + + pub fn write_row_group(&mut self, rg: &[Vec]) -> PolarsResult<()> { + let writer = self.writer.get_mut().unwrap(); + let rg = DynIter::new(rg.iter().map(|col_pages| { + Ok(DynStreamingIterator::new( + fallible_streaming_iterator::convert(col_pages.iter().map(PolarsResult::Ok)), + )) + })); + writer.write(rg)?; + Ok(()) + } + pub fn get_writer(&self) -> &Mutex> { &self.writer } diff --git a/crates/polars-io/src/parquet/write/mod.rs b/crates/polars-io/src/parquet/write/mod.rs index 705cf2a96d6a..ffbbbfc7a5c0 100644 --- a/crates/polars-io/src/parquet/write/mod.rs +++ b/crates/polars-io/src/parquet/write/mod.rs @@ -7,4 +7,4 @@ mod writer; pub use batched_writer::BatchedWriter; pub use options::{BrotliLevel, GzipLevel, ParquetCompression, ParquetWriteOptions, ZstdLevel}; pub use polars_parquet::write::{RowGroupIterColumns, StatisticsOptions}; -pub use writer::ParquetWriter; +pub use writer::{get_encodings, ParquetWriter}; diff --git a/crates/polars-io/src/parquet/write/writer.rs b/crates/polars-io/src/parquet/write/writer.rs index 5d7d2ddafdb6..34e2dc0bf3aa 100644 --- a/crates/polars-io/src/parquet/write/writer.rs +++ b/crates/polars-io/src/parquet/write/writer.rs @@ -130,7 +130,7 @@ where } } -fn get_encodings(schema: &ArrowSchema) -> Vec> { +pub fn get_encodings(schema: &ArrowSchema) -> Vec> { schema .iter_values() .map(|f| transverse(&f.dtype, encoding_map)) diff --git a/crates/polars-io/src/shared.rs b/crates/polars-io/src/shared.rs index fe29f12feb83..bfed52e02615 100644 --- a/crates/polars-io/src/shared.rs +++ b/crates/polars-io/src/shared.rs @@ -122,7 +122,7 @@ pub(crate) fn finish_reader( Ok(df) } -pub(crate) fn schema_to_arrow_checked( +pub fn schema_to_arrow_checked( schema: &Schema, compat_level: CompatLevel, _file_name: &str, diff --git a/crates/polars-parquet/src/arrow/write/file.rs b/crates/polars-parquet/src/arrow/write/file.rs index 0fd32deb5b07..4139dcdea93c 100644 --- a/crates/polars-parquet/src/arrow/write/file.rs +++ b/crates/polars-parquet/src/arrow/write/file.rs @@ -50,12 +50,15 @@ impl FileWriter { /// Returns a new [`FileWriter`]. /// # Error /// If it is unable to derive a parquet schema from [`ArrowSchema`]. - pub fn try_new(writer: W, schema: ArrowSchema, options: WriteOptions) -> PolarsResult { - let parquet_schema = to_parquet_schema(&schema)?; - + pub fn new_with_parquet_schema( + writer: W, + schema: ArrowSchema, + parquet_schema: SchemaDescriptor, + options: WriteOptions, + ) -> Self { let created_by = Some("Polars".to_string()); - Ok(Self { + Self { writer: crate::parquet::write::FileWriter::new( writer, parquet_schema, @@ -67,7 +70,20 @@ impl FileWriter { ), schema, options, - }) + } + } + + /// Returns a new [`FileWriter`]. + /// # Error + /// If it is unable to derive a parquet schema from [`ArrowSchema`]. + pub fn try_new(writer: W, schema: ArrowSchema, options: WriteOptions) -> PolarsResult { + let parquet_schema = to_parquet_schema(&schema)?; + Ok(Self::new_with_parquet_schema( + writer, + schema, + parquet_schema, + options, + )) } /// Writes a row group to the file. diff --git a/crates/polars-parquet/src/parquet/metadata/schema_descriptor.rs b/crates/polars-parquet/src/parquet/metadata/schema_descriptor.rs index c40fcdd1309b..2306638cdce9 100644 --- a/crates/polars-parquet/src/parquet/metadata/schema_descriptor.rs +++ b/crates/polars-parquet/src/parquet/metadata/schema_descriptor.rs @@ -57,6 +57,11 @@ impl SchemaDescriptor { &self.fields } + /// The schemas' leaves. + pub fn leaves(&self) -> &[ColumnDescriptor] { + &self.leaves + } + pub(crate) fn into_thrift(self) -> Vec { ParquetType::GroupType { field_info: FieldInfo { diff --git a/crates/polars-parquet/src/parquet/write/compression.rs b/crates/polars-parquet/src/parquet/write/compression.rs index 04d01a6e34bc..cfd721a6f07d 100644 --- a/crates/polars-parquet/src/parquet/write/compression.rs +++ b/crates/polars-parquet/src/parquet/write/compression.rs @@ -163,3 +163,24 @@ impl>> FallibleStreamingIterator for Comp self.current.as_ref() } } + +impl>> Iterator for Compressor { + type Item = ParquetResult; + + fn next(&mut self) -> Option { + let mut compressed_buffer = if let Some(page) = self.current.as_mut() { + std::mem::take(page.buffer_mut()) + } else { + std::mem::take(&mut self.buffer) + }; + compressed_buffer.clear(); + + let page = self.iter.next()?; + let page = match page { + Ok(page) => page, + Err(err) => return Some(Err(err)), + }; + + Some(compress(page, compressed_buffer, self.compression)) + } +} diff --git a/crates/polars-stream/src/nodes/io_sinks/mod.rs b/crates/polars-stream/src/nodes/io_sinks/mod.rs index cc1682199a2a..65eb98c04035 100644 --- a/crates/polars-stream/src/nodes/io_sinks/mod.rs +++ b/crates/polars-stream/src/nodes/io_sinks/mod.rs @@ -1,2 +1,4 @@ #[cfg(feature = "ipc")] pub mod ipc; +#[cfg(feature = "parquet")] +pub mod parquet; diff --git a/crates/polars-stream/src/nodes/io_sinks/parquet.rs b/crates/polars-stream/src/nodes/io_sinks/parquet.rs new file mode 100644 index 000000000000..16ac920f430c --- /dev/null +++ b/crates/polars-stream/src/nodes/io_sinks/parquet.rs @@ -0,0 +1,284 @@ +use std::cmp::Reverse; +use std::io::BufWriter; +use std::path::{Path, PathBuf}; +use std::sync::Mutex; + +use polars_core::frame::DataFrame; +use polars_core::prelude::{ArrowSchema, CompatLevel}; +use polars_core::schema::SchemaRef; +use polars_error::PolarsResult; +use polars_expr::state::ExecutionState; +use polars_io::parquet::write::BatchedWriter; +use polars_io::prelude::{get_encodings, ParquetWriteOptions}; +use polars_io::schema_to_arrow_checked; +use polars_parquet::parquet::error::ParquetResult; +use polars_parquet::read::ParquetError; +use polars_parquet::write::{ + array_to_columns, to_parquet_schema, CompressedPage, Compressor, Encoding, FileWriter, + SchemaDescriptor, Version, WriteOptions, +}; +use polars_utils::priority::Priority; + +use crate::async_primitives::distributor_channel::distributor_channel; +use crate::async_primitives::linearizer::Linearizer; +use crate::nodes::{ComputeNode, JoinHandle, PortState, TaskPriority, TaskScope}; +use crate::pipe::{RecvPort, SendPort}; +use crate::{DEFAULT_DISTRIBUTOR_BUFFER_SIZE, DEFAULT_LINEARIZER_BUFFER_SIZE}; + +pub struct ParquetSinkNode { + path: PathBuf, + + input_schema: SchemaRef, + write_options: ParquetWriteOptions, + + parquet_schema: SchemaDescriptor, + arrow_schema: ArrowSchema, + encodings: Vec>, + + num_encoders: usize, +} + +impl ParquetSinkNode { + pub fn new( + input_schema: SchemaRef, + path: &Path, + write_options: &ParquetWriteOptions, + ) -> PolarsResult { + let schema = schema_to_arrow_checked(&input_schema, CompatLevel::newest(), "parquet")?; + let parquet_schema = to_parquet_schema(&schema)?; + let encodings: Vec> = get_encodings(&schema); + + Ok(Self { + path: path.to_path_buf(), + + input_schema, + write_options: *write_options, + + parquet_schema, + arrow_schema: schema, + encodings, + + num_encoders: 1, + }) + } +} + +// 512 ^ 2 +const DEFAULT_ROW_GROUP_SIZE: usize = 1 << 18; + +impl ComputeNode for ParquetSinkNode { + fn name(&self) -> &str { + "parquet_sink" + } + + fn initialize(&mut self, num_pipelines: usize) { + self.num_encoders = num_pipelines; + } + + fn update_state(&mut self, recv: &mut [PortState], send: &mut [PortState]) -> PolarsResult<()> { + assert!(send.is_empty()); + assert!(recv.len() == 1); + + // We are always ready to receive, unless the sender is done, then we're + // also done. + if recv[0] != PortState::Done { + recv[0] = PortState::Ready; + } + + Ok(()) + } + + fn spawn<'env, 's>( + &'env mut self, + scope: &'s TaskScope<'s, 'env>, + recv_ports: &mut [Option>], + send_ports: &mut [Option>], + _state: &'s ExecutionState, + join_handles: &mut Vec>>, + ) { + assert!(recv_ports.len() == 1); + assert!(send_ports.is_empty()); + + // .. -> Buffer task + let mut receiver = recv_ports[0].take().unwrap().serial(); + // Buffer task -> Encode tasks + let (mut distribute, distribute_channels) = + distributor_channel(self.num_encoders, DEFAULT_DISTRIBUTOR_BUFFER_SIZE); + // Encode tasks -> IO task + let (mut linearizer, senders) = Linearizer::< + Priority, Vec>>, + >::new( + self.num_encoders, DEFAULT_LINEARIZER_BUFFER_SIZE + ); + + let slf = &*self; + + let options = WriteOptions { + statistics: slf.write_options.statistics, + compression: slf.write_options.compression.into(), + version: Version::V1, + data_page_size: slf.write_options.data_page_size, + }; + + // Buffer task. + // + // This task linearizes and buffers morsels until a given a maximum chunk size is reached + // and then sends the whole record batch to be encoded and written. + join_handles.push(scope.spawn_task(TaskPriority::High, async move { + let mut buffer = DataFrame::empty_with_schema(slf.input_schema.as_ref()); + let row_group_size = slf + .write_options + .row_group_size + .unwrap_or(DEFAULT_ROW_GROUP_SIZE) + .max(1); + let mut stop_requested = false; + let mut row_group_index = 0; + + loop { + match receiver.recv().await { + Err(_) => stop_requested = true, + Ok(morsel) => { + let df = morsel.into_df(); + + // @NOTE: This also performs schema validation. + buffer.vstack_mut(&df)?; + }, + } + + while (stop_requested && buffer.height() > 0) || buffer.height() >= row_group_size { + let row_group; + + (row_group, buffer) = + buffer.split_at(row_group_size.min(buffer.height()) as i64); + + for (column_idx, column) in row_group.take_columns().into_iter().enumerate() { + distribute + .send((row_group_index, column_idx, column)) + .await + .unwrap(); + } + + row_group_index += 1; + } + + if stop_requested { + break; + } + } + + PolarsResult::Ok(()) + })); + + // Encode task. + // + // Task encodes the columns into their corresponding Parquet encoding. + for (mut receiver, mut sender) in distribute_channels.into_iter().zip(senders) { + join_handles.push(scope.spawn_task(TaskPriority::High, async move { + while let Ok((rg_idx, col_idx, column)) = receiver.recv().await { + let type_ = &slf.parquet_schema.fields()[col_idx]; + let encodings = &slf.encodings[col_idx]; + + let array = column.as_materialized_series().rechunk(); + let array = array.to_arrow(0, CompatLevel::newest()); + + // @TODO: This causes all structs fields to be handled on a single thread. It + // would be preferable to split the encoding among multiple threads. + + // @NOTE: Since one Polars column might contain multiple Parquet columns (when + // it has a struct datatype), we return a Vec>. + + // Array -> Parquet pages. + let encoded_columns = + array_to_columns(array, type_.clone(), options, encodings)?; + + // Compress the pages. + let compressed_pages = encoded_columns + .into_iter() + .map(|encoded_pages| { + Compressor::new_from_vec( + encoded_pages.map(|result| { + result.map_err(|e| { + ParquetError::FeatureNotSupported(format!( + "reraised in polars: {e}", + )) + }) + }), + options.compression, + vec![], + ) + .collect::>>() + }) + .collect::>>()?; + + sender + .insert(Priority(Reverse((rg_idx, col_idx)), compressed_pages)) + .await + .unwrap(); + } + + PolarsResult::Ok(()) + })); + } + + // IO task. + // + // Task that will actually do write to the target file. + let io_runtime = polars_io::pl_async::get_runtime(); + + let path = slf.path.clone(); + let input_schema = slf.input_schema.clone(); + let arrow_schema = slf.arrow_schema.clone(); + let parquet_schema = slf.parquet_schema.clone(); + let encodings = slf.encodings.clone(); + + let io_task = io_runtime.spawn(async move { + use tokio::fs::OpenOptions; + + let file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(path.as_path()) + .await + .map_err(|err| polars_utils::_limit_path_len_io_err(path.as_path(), err))?; + let writer = BufWriter::new(file.into_std().await); + let mut writer = BatchedWriter::new( + Mutex::new(FileWriter::new_with_parquet_schema( + writer, + arrow_schema, + parquet_schema, + options, + )), + encodings, + options, + false, + ); + + let num_parquet_columns = writer.parquet_schema().leaves().len(); + let mut current_row_group = Vec::with_capacity(num_parquet_columns); + + // Linearize from all the Encoder tasks. + while let Some(Priority(Reverse((_, col_idx)), compressed_pages)) = + linearizer.get().await + { + assert!(col_idx < input_schema.len()); + current_row_group.extend(compressed_pages); + + // Only if it is the last column of the row group, write the row group to the file. + if current_row_group.len() < num_parquet_columns { + continue; + } + + assert_eq!(current_row_group.len(), num_parquet_columns); + writer.write_row_group(¤t_row_group)?; + current_row_group.clear(); + } + + writer.finish()?; + + PolarsResult::Ok(()) + }); + join_handles + .push(scope.spawn_task(TaskPriority::Low, async move { io_task.await.unwrap() })); + } +} diff --git a/crates/polars-stream/src/physical_plan/lower_ir.rs b/crates/polars-stream/src/physical_plan/lower_ir.rs index 1c79abd29670..66dae935db46 100644 --- a/crates/polars-stream/src/physical_plan/lower_ir.rs +++ b/crates/polars-stream/src/physical_plan/lower_ir.rs @@ -227,6 +227,15 @@ pub fn lower_ir( input: phys_input, } }, + #[cfg(feature = "parquet")] + FileType::Parquet(_) => { + let phys_input = lower_ir!(*input)?; + PhysNodeKind::FileSink { + path, + file_type, + input: phys_input, + } + }, _ => todo!(), } }, diff --git a/crates/polars-stream/src/physical_plan/to_graph.rs b/crates/polars-stream/src/physical_plan/to_graph.rs index c9182a3eb964..02d66968c0b7 100644 --- a/crates/polars-stream/src/physical_plan/to_graph.rs +++ b/crates/polars-stream/src/physical_plan/to_graph.rs @@ -221,6 +221,15 @@ fn to_graph_rec<'a>( nodes::io_sinks::ipc::IpcSinkNode::new(input_schema, path, ipc_writer_options)?, [(input_key, input.port)], ), + #[cfg(feature = "ipc")] + FileType::Parquet(ipc_writer_options) => ctx.graph.add_node( + nodes::io_sinks::parquet::ParquetSinkNode::new( + input_schema, + path, + ipc_writer_options, + )?, + [(input_key, input.port)], + ), _ => todo!(), } },