Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: refine writer interface to support directory hierarchy #893

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions crates/iceberg/src/writer/base_writer/data_file_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,34 +22,38 @@ use itertools::Itertools;

use crate::spec::{DataContentType, DataFile, Struct};
use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder};
use crate::writer::output_file_generator::OutputFileGenerator;
use crate::writer::{CurrentFileStatus, IcebergWriter, SinglePartitionWriterBuilder};
use crate::Result;

/// Builder for `DataFileWriter`.
#[derive(Clone, Debug)]
pub struct DataFileWriterBuilder<B: FileWriterBuilder> {
inner: B,
partition_value: Option<Struct>,
outfile_genenerator: OutputFileGenerator,
}

impl<B: FileWriterBuilder> DataFileWriterBuilder<B> {
/// Create a new `DataFileWriterBuilder` using a `FileWriterBuilder`.
pub fn new(inner: B, partition_value: Option<Struct>) -> Self {
pub fn new(inner: B, outfile_genenerator: OutputFileGenerator) -> Self {
Self {
inner,
partition_value,
outfile_genenerator,
}
}
}

#[async_trait::async_trait]
impl<B: FileWriterBuilder> IcebergWriterBuilder for DataFileWriterBuilder<B> {
impl<B: FileWriterBuilder> SinglePartitionWriterBuilder for DataFileWriterBuilder<B> {
type R = DataFileWriter<B>;

async fn build(self) -> Result<Self::R> {
async fn build(self, partition: Option<Struct>) -> Result<Self::R> {
let output_file = self
.outfile_genenerator
.create_output_file(&partition)?;
Ok(DataFileWriter {
inner_writer: Some(self.inner.clone().build().await?),
partition_value: self.partition_value.unwrap_or(Struct::empty()),
inner_writer: Some(self.inner.clone().build(output_file).await?),
partition_value: partition.unwrap_or(Struct::empty()),
})
}
}
Expand Down Expand Up @@ -114,7 +118,7 @@ mod test {
use crate::writer::file_writer::location_generator::test::MockLocationGenerator;
use crate::writer::file_writer::location_generator::DefaultFileNameGenerator;
use crate::writer::file_writer::ParquetWriterBuilder;
use crate::writer::{IcebergWriter, IcebergWriterBuilder, RecordBatch};
use crate::writer::{IcebergWriter, SinglePartitionWriterBuilder, RecordBatch};
use crate::Result;

#[tokio::test]
Expand Down
39 changes: 23 additions & 16 deletions crates/iceberg/src/writer/base_writer/equality_delete_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,30 @@ use crate::arrow::record_batch_projector::RecordBatchProjector;
use crate::arrow::schema_to_arrow_schema;
use crate::spec::{DataFile, SchemaRef, Struct};
use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
use crate::writer::{IcebergWriter, IcebergWriterBuilder};
use crate::writer::output_file_generator::OutputFileGenerator;
use crate::writer::{IcebergWriter, SinglePartitionWriterBuilder};
use crate::{Error, ErrorKind, Result};

/// Builder for `EqualityDeleteWriter`.
#[derive(Clone, Debug)]
pub struct EqualityDeleteFileWriterBuilder<B: FileWriterBuilder> {
inner: B,
config: EqualityDeleteWriterConfig,
outfile_genenerator: OutputFileGenerator,
}

impl<B: FileWriterBuilder> EqualityDeleteFileWriterBuilder<B> {
/// Create a new `EqualityDeleteFileWriterBuilder` using a `FileWriterBuilder`.
pub fn new(inner: B, config: EqualityDeleteWriterConfig) -> Self {
Self { inner, config }
pub fn new(
inner: B,
config: EqualityDeleteWriterConfig,
outfile_genenerator: OutputFileGenerator,
) -> Self {
Self {
inner,
config,
outfile_genenerator,
}
}
}

Expand All @@ -52,16 +62,11 @@ pub struct EqualityDeleteWriterConfig {
equality_ids: Vec<i32>,
// Projector used to project the data chunk into specific fields.
projector: RecordBatchProjector,
partition_value: Struct,
}

impl EqualityDeleteWriterConfig {
/// Create a new `DataFileWriterConfig` with equality ids.
pub fn new(
equality_ids: Vec<i32>,
original_schema: SchemaRef,
partition_value: Option<Struct>,
) -> Result<Self> {
pub fn new(equality_ids: Vec<i32>, original_schema: SchemaRef) -> Result<Self> {
let original_arrow_schema = Arc::new(schema_to_arrow_schema(&original_schema)?);
let projector = RecordBatchProjector::new(
original_arrow_schema,
Expand Down Expand Up @@ -97,7 +102,6 @@ impl EqualityDeleteWriterConfig {
Ok(Self {
equality_ids,
projector,
partition_value: partition_value.unwrap_or(Struct::empty()),
})
}

Expand All @@ -108,15 +112,18 @@ impl EqualityDeleteWriterConfig {
}

#[async_trait::async_trait]
impl<B: FileWriterBuilder> IcebergWriterBuilder for EqualityDeleteFileWriterBuilder<B> {
impl<B: FileWriterBuilder> SinglePartitionWriterBuilder for EqualityDeleteFileWriterBuilder<B> {
type R = EqualityDeleteFileWriter<B>;

async fn build(self) -> Result<Self::R> {
async fn build(self, partition: Option<Struct>) -> Result<Self::R> {
let output_file = self
.outfile_genenerator
.create_output_file(&partition)?;
Ok(EqualityDeleteFileWriter {
inner_writer: Some(self.inner.clone().build().await?),
inner_writer: Some(self.inner.clone().build(output_file).await?),
projector: self.config.projector,
equality_ids: self.config.equality_ids,
partition_value: self.config.partition_value,
partition_value: partition.unwrap_or(Struct::empty()),
})
}
}
Expand Down Expand Up @@ -192,7 +199,7 @@ mod test {
use crate::writer::file_writer::location_generator::test::MockLocationGenerator;
use crate::writer::file_writer::location_generator::DefaultFileNameGenerator;
use crate::writer::file_writer::ParquetWriterBuilder;
use crate::writer::{IcebergWriter, IcebergWriterBuilder};
use crate::writer::{IcebergWriter, SinglePartitionWriterBuilder};

async fn check_parquet_data_file_with_equality_delete_write(
file_io: &FileIO,
Expand Down Expand Up @@ -385,7 +392,7 @@ mod test {

let equality_ids = vec![0_i32, 8];
let equality_config =
EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), None).unwrap();
EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema)).unwrap();
let delete_schema =
arrow_schema_to_schema(equality_config.projected_arrow_schema_ref()).unwrap();
let projector = equality_config.projector.clone();
Expand Down
6 changes: 2 additions & 4 deletions crates/iceberg/src/writer/file_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,21 @@ use arrow_array::RecordBatch;
use futures::Future;

use super::CurrentFileStatus;
use crate::spec::DataFileBuilder;
use crate::{io::OutputFile, spec::DataFileBuilder};
use crate::Result;

mod parquet_writer;
pub use parquet_writer::{ParquetWriter, ParquetWriterBuilder};
mod track_writer;

pub mod location_generator;

type DefaultOutput = Vec<DataFileBuilder>;

/// File writer builder trait.
pub trait FileWriterBuilder<O = DefaultOutput>: Send + Clone + 'static {
/// The associated file writer type.
type R: FileWriter<O>;
/// Build file writer.
fn build(self) -> impl Future<Output = Result<Self::R>> + Send;
fn build(self, output_file: OutputFile) -> impl Future<Output = Result<Self::R>> + Send;
}

/// File writer focus on writing record batch to different physical file format.(Such as parquet. orc)
Expand Down
120 changes: 43 additions & 77 deletions crates/iceberg/src/writer/file_writer/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,12 @@ use parquet::file::properties::WriterProperties;
use parquet::file::statistics::{from_thrift, Statistics};
use parquet::format::FileMetaData;

use super::location_generator::{FileNameGenerator, LocationGenerator};
use super::track_writer::TrackWriter;
use super::{FileWriter, FileWriterBuilder};
use crate::arrow::{
get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum, DEFAULT_MAP_FIELD_NAME,
};
use crate::io::{FileIO, FileWrite, OutputFile};
use crate::io::{FileWrite, OutputFile};
use crate::spec::{
visit_schema, DataFileBuilder, DataFileFormat, Datum, ListType, MapType, NestedFieldRef,
PrimitiveType, Schema, SchemaRef, SchemaVisitor, StructType, Type,
Expand All @@ -47,45 +46,25 @@ use crate::{Error, ErrorKind, Result};

/// ParquetWriterBuilder is used to builder a [`ParquetWriter`]
#[derive(Clone, Debug)]
pub struct ParquetWriterBuilder<T: LocationGenerator, F: FileNameGenerator> {
pub struct ParquetWriterBuilder {
props: WriterProperties,
schema: SchemaRef,

file_io: FileIO,
location_generator: T,
file_name_generator: F,
}

impl<T: LocationGenerator, F: FileNameGenerator> ParquetWriterBuilder<T, F> {
impl ParquetWriterBuilder {
/// Create a new `ParquetWriterBuilder`
/// To construct the write result, the schema should contain the `PARQUET_FIELD_ID_META_KEY` metadata for each field.
pub fn new(
props: WriterProperties,
schema: SchemaRef,
file_io: FileIO,
location_generator: T,
file_name_generator: F,
) -> Self {
Self {
props,
schema,
file_io,
location_generator,
file_name_generator,
}
pub fn new(props: WriterProperties, schema: SchemaRef) -> Self {
Self { props, schema }
}
}

impl<T: LocationGenerator, F: FileNameGenerator> FileWriterBuilder for ParquetWriterBuilder<T, F> {
impl FileWriterBuilder for ParquetWriterBuilder {
type R = ParquetWriter;

async fn build(self) -> crate::Result<Self::R> {
async fn build(self, out_file: OutputFile) -> crate::Result<Self::R> {
let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?);
let written_size = Arc::new(AtomicI64::new(0));
let out_file = self.file_io.new_output(
self.location_generator
.generate_location(&self.file_name_generator.generate_file_name()),
)?;
let inner_writer = TrackWriter::new(out_file.writer().await?, written_size.clone());
let async_writer = AsyncFileWriter::new(inner_writer);
let writer =
Expand Down Expand Up @@ -668,14 +647,14 @@ mod tests {
let to_write_null = RecordBatch::try_new(schema.clone(), vec![null_col]).unwrap();

// write data
let output_file = file_io
.new_output(format!("{}/{}", loccation_gen.gen(), file_name_gen.gen()))
.unwrap();
let mut pw = ParquetWriterBuilder::new(
WriterProperties::builder().build(),
Arc::new(to_write.schema().as_ref().try_into().unwrap()),
file_io.clone(),
location_gen,
file_name_gen,
)
.build()
.build(output_file)
.await?;
pw.write(&to_write).await?;
pw.write(&to_write_null).await?;
Expand Down Expand Up @@ -864,15 +843,13 @@ mod tests {
.unwrap();

// write data
let mut pw = ParquetWriterBuilder::new(
WriterProperties::builder().build(),
Arc::new(schema),
file_io.clone(),
location_gen,
file_name_gen,
)
.build()
.await?;
let output_file = file_io
.new_output(format!("{}/{}", loccation_gen.gen(), file_name_gen.gen()))
.unwrap();
let mut pw =
ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(schema))
.build(output_file)
.await?;
pw.write(&to_write).await?;
let res = pw.close().await?;
assert_eq!(res.len(), 1);
Expand Down Expand Up @@ -1054,15 +1031,13 @@ mod tests {
.unwrap();

// write data
let mut pw = ParquetWriterBuilder::new(
WriterProperties::builder().build(),
Arc::new(schema),
file_io.clone(),
loccation_gen,
file_name_gen,
)
.build()
.await?;
let output_file = file_io
.new_output(format!("{}/{}", loccation_gen.gen(), file_name_gen.gen()))
.unwrap();
let mut pw =
ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(schema))
.build(output_file)
.await?;
pw.write(&to_write).await?;
let res = pw.close().await?;
assert_eq!(res.len(), 1);
Expand Down Expand Up @@ -1198,15 +1173,12 @@ mod tests {
.unwrap(),
);
let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
let mut pw = ParquetWriterBuilder::new(
WriterProperties::builder().build(),
schema.clone(),
file_io.clone(),
loccation_gen.clone(),
file_name_gen.clone(),
)
.build()
.await?;
let output_file = file_io
.new_output(format!("{}/{}", loccation_gen.gen(), file_name_gen.gen()))
.unwrap();
let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone())
.build(output_file)
.await?;
let col0 = Arc::new(
Decimal128Array::from(vec![Some(22000000000), Some(11000000000)])
.with_data_type(DataType::Decimal128(28, 10)),
Expand Down Expand Up @@ -1250,15 +1222,12 @@ mod tests {
.unwrap(),
);
let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
let mut pw = ParquetWriterBuilder::new(
WriterProperties::builder().build(),
schema.clone(),
file_io.clone(),
loccation_gen.clone(),
file_name_gen.clone(),
)
.build()
.await?;
let output_file = file_io
.new_output(format!("{}/{}", loccation_gen.gen(), file_name_gen.gen()))
.unwrap();
let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone())
.build(output_file)
.await?;
let col0 = Arc::new(
Decimal128Array::from(vec![Some(-22000000000), Some(-11000000000)])
.with_data_type(DataType::Decimal128(28, 10)),
Expand Down Expand Up @@ -1305,15 +1274,12 @@ mod tests {
.unwrap(),
);
let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
let mut pw = ParquetWriterBuilder::new(
WriterProperties::builder().build(),
schema,
file_io.clone(),
loccation_gen,
file_name_gen,
)
.build()
.await?;
let output_file = file_io
.new_output(format!("{}/{}", loccation_gen.gen(), file_name_gen.gen()))
.unwrap();
let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema)
.build(output_file)
.await?;
let col0 = Arc::new(
Decimal128Array::from(vec![
Some(decimal_max.mantissa()),
Expand Down
Loading
Loading