Skip to content

Commit

Permalink
feat: Add Parquet Sink to new streaming engine (#20690)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored Jan 15, 2025
1 parent c3a7460 commit ab8060f
Show file tree
Hide file tree
Showing 11 changed files with 389 additions and 9 deletions.
36 changes: 35 additions & 1 deletion crates/polars-io/src/parquet/write/batched_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Mutex;
use arrow::record_batch::RecordBatch;
use polars_core::prelude::*;
use polars_core::POOL;
use polars_parquet::read::ParquetError;
use polars_parquet::read::{fallible_streaming_iterator, ParquetError};
use polars_parquet::write::{
array_to_columns, CompressedPage, Compressor, DynIter, DynStreamingIterator, Encoding,
FallibleStreamingIterator, FileWriter, Page, ParquetType, RowGroupIterColumns,
Expand All @@ -15,14 +15,32 @@ use rayon::prelude::*;
pub struct BatchedWriter<W: Write> {
// A mutex so that streaming engine can get concurrent read access to
// compress pages.
//
// @TODO: Remove mutex when old streaming engine is removed
pub(super) writer: Mutex<FileWriter<W>>,
// @TODO: Remove when old streaming engine is removed
pub(super) parquet_schema: SchemaDescriptor,
pub(super) encodings: Vec<Vec<Encoding>>,
pub(super) options: WriteOptions,
pub(super) parallel: bool,
}

impl<W: Write> BatchedWriter<W> {
pub fn new(
writer: Mutex<FileWriter<W>>,
encodings: Vec<Vec<Encoding>>,
options: WriteOptions,
parallel: bool,
) -> Self {
Self {
writer,
parquet_schema: SchemaDescriptor::new(PlSmallStr::EMPTY, vec![]),
encodings,
options,
parallel,
}
}

pub fn encode_and_compress<'a>(
&'a self,
df: &'a DataFrame,
Expand Down Expand Up @@ -63,6 +81,22 @@ impl<W: Write> BatchedWriter<W> {
Ok(())
}

pub fn parquet_schema(&mut self) -> &SchemaDescriptor {
let writer = self.writer.get_mut().unwrap();
writer.parquet_schema()
}

pub fn write_row_group(&mut self, rg: &[Vec<CompressedPage>]) -> PolarsResult<()> {
let writer = self.writer.get_mut().unwrap();
let rg = DynIter::new(rg.iter().map(|col_pages| {
Ok(DynStreamingIterator::new(
fallible_streaming_iterator::convert(col_pages.iter().map(PolarsResult::Ok)),
))
}));
writer.write(rg)?;
Ok(())
}

pub fn get_writer(&self) -> &Mutex<FileWriter<W>> {
&self.writer
}
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ mod writer;
pub use batched_writer::BatchedWriter;
pub use options::{BrotliLevel, GzipLevel, ParquetCompression, ParquetWriteOptions, ZstdLevel};
pub use polars_parquet::write::{RowGroupIterColumns, StatisticsOptions};
pub use writer::ParquetWriter;
pub use writer::{get_encodings, ParquetWriter};
2 changes: 1 addition & 1 deletion crates/polars-io/src/parquet/write/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ where
}
}

fn get_encodings(schema: &ArrowSchema) -> Vec<Vec<Encoding>> {
pub fn get_encodings(schema: &ArrowSchema) -> Vec<Vec<Encoding>> {
schema
.iter_values()
.map(|f| transverse(&f.dtype, encoding_map))
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ pub(crate) fn finish_reader<R: ArrowReader>(
Ok(df)
}

pub(crate) fn schema_to_arrow_checked(
pub fn schema_to_arrow_checked(
schema: &Schema,
compat_level: CompatLevel,
_file_name: &str,
Expand Down
26 changes: 21 additions & 5 deletions crates/polars-parquet/src/arrow/write/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,15 @@ impl<W: Write> FileWriter<W> {
/// Returns a new [`FileWriter`].
/// # Error
/// If it is unable to derive a parquet schema from [`ArrowSchema`].
pub fn try_new(writer: W, schema: ArrowSchema, options: WriteOptions) -> PolarsResult<Self> {
let parquet_schema = to_parquet_schema(&schema)?;

pub fn new_with_parquet_schema(
writer: W,
schema: ArrowSchema,
parquet_schema: SchemaDescriptor,
options: WriteOptions,
) -> Self {
let created_by = Some("Polars".to_string());

Ok(Self {
Self {
writer: crate::parquet::write::FileWriter::new(
writer,
parquet_schema,
Expand All @@ -67,7 +70,20 @@ impl<W: Write> FileWriter<W> {
),
schema,
options,
})
}
}

/// Returns a new [`FileWriter`].
/// # Error
/// If it is unable to derive a parquet schema from [`ArrowSchema`].
pub fn try_new(writer: W, schema: ArrowSchema, options: WriteOptions) -> PolarsResult<Self> {
let parquet_schema = to_parquet_schema(&schema)?;
Ok(Self::new_with_parquet_schema(
writer,
schema,
parquet_schema,
options,
))
}

/// Writes a row group to the file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ impl SchemaDescriptor {
&self.fields
}

/// The schemas' leaves.
pub fn leaves(&self) -> &[ColumnDescriptor] {
&self.leaves
}

pub(crate) fn into_thrift(self) -> Vec<SchemaElement> {
ParquetType::GroupType {
field_info: FieldInfo {
Expand Down
21 changes: 21 additions & 0 deletions crates/polars-parquet/src/parquet/write/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,24 @@ impl<I: Iterator<Item = ParquetResult<Page>>> FallibleStreamingIterator for Comp
self.current.as_ref()
}
}

impl<I: Iterator<Item = ParquetResult<Page>>> Iterator for Compressor<I> {
type Item = ParquetResult<CompressedPage>;

fn next(&mut self) -> Option<Self::Item> {
let mut compressed_buffer = if let Some(page) = self.current.as_mut() {
std::mem::take(page.buffer_mut())
} else {
std::mem::take(&mut self.buffer)
};
compressed_buffer.clear();

let page = self.iter.next()?;
let page = match page {
Ok(page) => page,
Err(err) => return Some(Err(err)),
};

Some(compress(page, compressed_buffer, self.compression))
}
}
2 changes: 2 additions & 0 deletions crates/polars-stream/src/nodes/io_sinks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
#[cfg(feature = "ipc")]
pub mod ipc;
#[cfg(feature = "parquet")]
pub mod parquet;
Loading

0 comments on commit ab8060f

Please sign in to comment.