diff --git a/crates/iceberg/src/writer/base_writer/data_file_writer.rs b/crates/iceberg/src/writer/base_writer/data_file_writer.rs index 940aa1584..da1c0b565 100644 --- a/crates/iceberg/src/writer/base_writer/data_file_writer.rs +++ b/crates/iceberg/src/writer/base_writer/data_file_writer.rs @@ -22,34 +22,38 @@ use itertools::Itertools; use crate::spec::{DataContentType, DataFile, Struct}; use crate::writer::file_writer::{FileWriter, FileWriterBuilder}; -use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder}; +use crate::writer::output_file_generator::OutputFileGenerator; +use crate::writer::{CurrentFileStatus, IcebergWriter, SinglePartitionWriterBuilder}; use crate::Result; /// Builder for `DataFileWriter`. #[derive(Clone, Debug)] pub struct DataFileWriterBuilder { inner: B, - partition_value: Option, + outfile_genenerator: OutputFileGenerator, } impl DataFileWriterBuilder { /// Create a new `DataFileWriterBuilder` using a `FileWriterBuilder`. - pub fn new(inner: B, partition_value: Option) -> Self { + pub fn new(inner: B, outfile_genenerator: OutputFileGenerator) -> Self { Self { inner, - partition_value, + outfile_genenerator, } } } #[async_trait::async_trait] -impl IcebergWriterBuilder for DataFileWriterBuilder { +impl SinglePartitionWriterBuilder for DataFileWriterBuilder { type R = DataFileWriter; - async fn build(self) -> Result { + async fn build(self, partition: Option) -> Result { + let output_file = self + .outfile_genenerator + .create_output_file(&partition)?; Ok(DataFileWriter { - inner_writer: Some(self.inner.clone().build().await?), - partition_value: self.partition_value.unwrap_or(Struct::empty()), + inner_writer: Some(self.inner.clone().build(output_file).await?), + partition_value: partition.unwrap_or(Struct::empty()), }) } } @@ -114,7 +118,7 @@ mod test { use crate::writer::file_writer::location_generator::test::MockLocationGenerator; use crate::writer::file_writer::location_generator::DefaultFileNameGenerator; use crate::writer::file_writer::ParquetWriterBuilder; - use crate::writer::{IcebergWriter, IcebergWriterBuilder, RecordBatch}; + use crate::writer::{IcebergWriter, SinglePartitionWriterBuilder, RecordBatch}; use crate::Result; #[tokio::test] diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs index 069928fa8..6eb93f0cf 100644 --- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -28,7 +28,8 @@ use crate::arrow::record_batch_projector::RecordBatchProjector; use crate::arrow::schema_to_arrow_schema; use crate::spec::{DataFile, SchemaRef, Struct}; use crate::writer::file_writer::{FileWriter, FileWriterBuilder}; -use crate::writer::{IcebergWriter, IcebergWriterBuilder}; +use crate::writer::output_file_generator::OutputFileGenerator; +use crate::writer::{IcebergWriter, SinglePartitionWriterBuilder}; use crate::{Error, ErrorKind, Result}; /// Builder for `EqualityDeleteWriter`. @@ -36,12 +37,21 @@ use crate::{Error, ErrorKind, Result}; pub struct EqualityDeleteFileWriterBuilder { inner: B, config: EqualityDeleteWriterConfig, + outfile_genenerator: OutputFileGenerator, } impl EqualityDeleteFileWriterBuilder { /// Create a new `EqualityDeleteFileWriterBuilder` using a `FileWriterBuilder`. - pub fn new(inner: B, config: EqualityDeleteWriterConfig) -> Self { - Self { inner, config } + pub fn new( + inner: B, + config: EqualityDeleteWriterConfig, + outfile_genenerator: OutputFileGenerator, + ) -> Self { + Self { + inner, + config, + outfile_genenerator, + } } } @@ -52,16 +62,11 @@ pub struct EqualityDeleteWriterConfig { equality_ids: Vec, // Projector used to project the data chunk into specific fields. projector: RecordBatchProjector, - partition_value: Struct, } impl EqualityDeleteWriterConfig { /// Create a new `DataFileWriterConfig` with equality ids. - pub fn new( - equality_ids: Vec, - original_schema: SchemaRef, - partition_value: Option, - ) -> Result { + pub fn new(equality_ids: Vec, original_schema: SchemaRef) -> Result { let original_arrow_schema = Arc::new(schema_to_arrow_schema(&original_schema)?); let projector = RecordBatchProjector::new( original_arrow_schema, @@ -97,7 +102,6 @@ impl EqualityDeleteWriterConfig { Ok(Self { equality_ids, projector, - partition_value: partition_value.unwrap_or(Struct::empty()), }) } @@ -108,15 +112,18 @@ impl EqualityDeleteWriterConfig { } #[async_trait::async_trait] -impl IcebergWriterBuilder for EqualityDeleteFileWriterBuilder { +impl SinglePartitionWriterBuilder for EqualityDeleteFileWriterBuilder { type R = EqualityDeleteFileWriter; - async fn build(self) -> Result { + async fn build(self, partition: Option) -> Result { + let output_file = self + .outfile_genenerator + .create_output_file(&partition)?; Ok(EqualityDeleteFileWriter { - inner_writer: Some(self.inner.clone().build().await?), + inner_writer: Some(self.inner.clone().build(output_file).await?), projector: self.config.projector, equality_ids: self.config.equality_ids, - partition_value: self.config.partition_value, + partition_value: partition.unwrap_or(Struct::empty()), }) } } @@ -192,7 +199,7 @@ mod test { use crate::writer::file_writer::location_generator::test::MockLocationGenerator; use crate::writer::file_writer::location_generator::DefaultFileNameGenerator; use crate::writer::file_writer::ParquetWriterBuilder; - use crate::writer::{IcebergWriter, IcebergWriterBuilder}; + use crate::writer::{IcebergWriter, SinglePartitionWriterBuilder}; async fn check_parquet_data_file_with_equality_delete_write( file_io: &FileIO, @@ -385,7 +392,7 @@ mod test { let equality_ids = vec![0_i32, 8]; let equality_config = - EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), None).unwrap(); + EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema)).unwrap(); let delete_schema = arrow_schema_to_schema(equality_config.projected_arrow_schema_ref()).unwrap(); let projector = equality_config.projector.clone(); diff --git a/crates/iceberg/src/writer/file_writer/mod.rs b/crates/iceberg/src/writer/file_writer/mod.rs index 4a0fffcc1..74093ad48 100644 --- a/crates/iceberg/src/writer/file_writer/mod.rs +++ b/crates/iceberg/src/writer/file_writer/mod.rs @@ -21,15 +21,13 @@ use arrow_array::RecordBatch; use futures::Future; use super::CurrentFileStatus; -use crate::spec::DataFileBuilder; +use crate::{io::OutputFile, spec::DataFileBuilder}; use crate::Result; mod parquet_writer; pub use parquet_writer::{ParquetWriter, ParquetWriterBuilder}; mod track_writer; -pub mod location_generator; - type DefaultOutput = Vec; /// File writer builder trait. @@ -37,7 +35,7 @@ pub trait FileWriterBuilder: Send + Clone + 'static { /// The associated file writer type. type R: FileWriter; /// Build file writer. - fn build(self) -> impl Future> + Send; + fn build(self, output_file: OutputFile) -> impl Future> + Send; } /// File writer focus on writing record batch to different physical file format.(Such as parquet. orc) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 5561b1913..dbac69da9 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -31,13 +31,12 @@ use parquet::file::properties::WriterProperties; use parquet::file::statistics::{from_thrift, Statistics}; use parquet::format::FileMetaData; -use super::location_generator::{FileNameGenerator, LocationGenerator}; use super::track_writer::TrackWriter; use super::{FileWriter, FileWriterBuilder}; use crate::arrow::{ get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum, DEFAULT_MAP_FIELD_NAME, }; -use crate::io::{FileIO, FileWrite, OutputFile}; +use crate::io::{FileWrite, OutputFile}; use crate::spec::{ visit_schema, DataFileBuilder, DataFileFormat, Datum, ListType, MapType, NestedFieldRef, PrimitiveType, Schema, SchemaRef, SchemaVisitor, StructType, Type, @@ -47,45 +46,25 @@ use crate::{Error, ErrorKind, Result}; /// ParquetWriterBuilder is used to builder a [`ParquetWriter`] #[derive(Clone, Debug)] -pub struct ParquetWriterBuilder { +pub struct ParquetWriterBuilder { props: WriterProperties, schema: SchemaRef, - - file_io: FileIO, - location_generator: T, - file_name_generator: F, } -impl ParquetWriterBuilder { +impl ParquetWriterBuilder { /// Create a new `ParquetWriterBuilder` /// To construct the write result, the schema should contain the `PARQUET_FIELD_ID_META_KEY` metadata for each field. - pub fn new( - props: WriterProperties, - schema: SchemaRef, - file_io: FileIO, - location_generator: T, - file_name_generator: F, - ) -> Self { - Self { - props, - schema, - file_io, - location_generator, - file_name_generator, - } + pub fn new(props: WriterProperties, schema: SchemaRef) -> Self { + Self { props, schema } } } -impl FileWriterBuilder for ParquetWriterBuilder { +impl FileWriterBuilder for ParquetWriterBuilder { type R = ParquetWriter; - async fn build(self) -> crate::Result { + async fn build(self, out_file: OutputFile) -> crate::Result { let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?); let written_size = Arc::new(AtomicI64::new(0)); - let out_file = self.file_io.new_output( - self.location_generator - .generate_location(&self.file_name_generator.generate_file_name()), - )?; let inner_writer = TrackWriter::new(out_file.writer().await?, written_size.clone()); let async_writer = AsyncFileWriter::new(inner_writer); let writer = @@ -668,14 +647,14 @@ mod tests { let to_write_null = RecordBatch::try_new(schema.clone(), vec![null_col]).unwrap(); // write data + let output_file = file_io + .new_output(format!("{}/{}", loccation_gen.gen(), file_name_gen.gen())) + .unwrap(); let mut pw = ParquetWriterBuilder::new( WriterProperties::builder().build(), Arc::new(to_write.schema().as_ref().try_into().unwrap()), - file_io.clone(), - location_gen, - file_name_gen, ) - .build() + .build(output_file) .await?; pw.write(&to_write).await?; pw.write(&to_write_null).await?; @@ -864,15 +843,13 @@ mod tests { .unwrap(); // write data - let mut pw = ParquetWriterBuilder::new( - WriterProperties::builder().build(), - Arc::new(schema), - file_io.clone(), - location_gen, - file_name_gen, - ) - .build() - .await?; + let output_file = file_io + .new_output(format!("{}/{}", loccation_gen.gen(), file_name_gen.gen())) + .unwrap(); + let mut pw = + ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(schema)) + .build(output_file) + .await?; pw.write(&to_write).await?; let res = pw.close().await?; assert_eq!(res.len(), 1); @@ -1054,15 +1031,13 @@ mod tests { .unwrap(); // write data - let mut pw = ParquetWriterBuilder::new( - WriterProperties::builder().build(), - Arc::new(schema), - file_io.clone(), - loccation_gen, - file_name_gen, - ) - .build() - .await?; + let output_file = file_io + .new_output(format!("{}/{}", loccation_gen.gen(), file_name_gen.gen())) + .unwrap(); + let mut pw = + ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(schema)) + .build(output_file) + .await?; pw.write(&to_write).await?; let res = pw.close().await?; assert_eq!(res.len(), 1); @@ -1198,15 +1173,12 @@ mod tests { .unwrap(), ); let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap()); - let mut pw = ParquetWriterBuilder::new( - WriterProperties::builder().build(), - schema.clone(), - file_io.clone(), - loccation_gen.clone(), - file_name_gen.clone(), - ) - .build() - .await?; + let output_file = file_io + .new_output(format!("{}/{}", loccation_gen.gen(), file_name_gen.gen())) + .unwrap(); + let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone()) + .build(output_file) + .await?; let col0 = Arc::new( Decimal128Array::from(vec![Some(22000000000), Some(11000000000)]) .with_data_type(DataType::Decimal128(28, 10)), @@ -1250,15 +1222,12 @@ mod tests { .unwrap(), ); let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap()); - let mut pw = ParquetWriterBuilder::new( - WriterProperties::builder().build(), - schema.clone(), - file_io.clone(), - loccation_gen.clone(), - file_name_gen.clone(), - ) - .build() - .await?; + let output_file = file_io + .new_output(format!("{}/{}", loccation_gen.gen(), file_name_gen.gen())) + .unwrap(); + let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone()) + .build(output_file) + .await?; let col0 = Arc::new( Decimal128Array::from(vec![Some(-22000000000), Some(-11000000000)]) .with_data_type(DataType::Decimal128(28, 10)), @@ -1305,15 +1274,12 @@ mod tests { .unwrap(), ); let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap()); - let mut pw = ParquetWriterBuilder::new( - WriterProperties::builder().build(), - schema, - file_io.clone(), - loccation_gen, - file_name_gen, - ) - .build() - .await?; + let output_file = file_io + .new_output(format!("{}/{}", loccation_gen.gen(), file_name_gen.gen())) + .unwrap(); + let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema) + .build(output_file) + .await?; let col0 = Arc::new( Decimal128Array::from(vec![ Some(decimal_max.mantissa()), diff --git a/crates/iceberg/src/writer/function_writer/mod.rs b/crates/iceberg/src/writer/function_writer/mod.rs new file mode 100644 index 000000000..1df3f4e2d --- /dev/null +++ b/crates/iceberg/src/writer/function_writer/mod.rs @@ -0,0 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Function writer for Iceberg + +pub mod partition_writer; \ No newline at end of file diff --git a/crates/iceberg/src/writer/function_writer/partition_writer.rs b/crates/iceberg/src/writer/function_writer/partition_writer.rs new file mode 100644 index 000000000..629b9667b --- /dev/null +++ b/crates/iceberg/src/writer/function_writer/partition_writer.rs @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Partition writer + +use crate::spec::Struct; +use crate::writer::{IcebergWriterBuilder, SinglePartitionWriterBuilder}; +use crate::Result; + +#[derive(Clone)] +struct PartitionWriterBuilder { + inner_writer_builer: B, + partition_value: Option, +} + +impl PartitionWriterBuilder { + pub fn new(inner_writer_builer: B, partition_value: Option) -> Self { + Self { + inner_writer_builer, + partition_value, + } + } +} + +#[async_trait::async_trait] +impl IcebergWriterBuilder for PartitionWriterBuilder { + type R = B::R; + + async fn build(self) -> Result { + self.inner_writer_builer.build(self.partition_value).await + } +} diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index 64357a0fe..f53508870 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -45,12 +45,14 @@ //! let data_files = data_file_writer.flush().await.unwrap(); //! ``` +pub mod function_writer; pub mod base_writer; pub mod file_writer; +pub mod output_file_generator; use arrow_array::RecordBatch; -use crate::spec::DataFile; +use crate::spec::{DataFile, Struct}; use crate::Result; type DefaultInput = RecordBatch; @@ -63,10 +65,21 @@ pub trait IcebergWriterBuilder: { /// The associated writer type. type R: IcebergWriter; - /// Build the iceberg writer. + /// Build the iceberg writer with single partition. async fn build(self) -> Result; } +/// The builder for iceberg writer for single partition. +#[async_trait::async_trait] +pub trait SinglePartitionWriterBuilder: + Send + Clone + 'static +{ + /// The associated writer type. + type R: IcebergWriter; + /// Build the iceberg writer with single partition. + async fn build(self, partition: Option) -> Result; +} + /// The iceberg writer used to write data to iceberg table. #[async_trait::async_trait] pub trait IcebergWriter: Send + 'static { diff --git a/crates/iceberg/src/writer/file_writer/location_generator.rs b/crates/iceberg/src/writer/output_file_generator.rs similarity index 63% rename from crates/iceberg/src/writer/file_writer/location_generator.rs rename to crates/iceberg/src/writer/output_file_generator.rs index 3f5d4ee18..7b11857a0 100644 --- a/crates/iceberg/src/writer/file_writer/location_generator.rs +++ b/crates/iceberg/src/writer/output_file_generator.rs @@ -17,18 +17,52 @@ //! This module contains the location generator and file name generator for generating path of data file. +use std::fmt::Debug; use std::sync::atomic::AtomicU64; use std::sync::Arc; -use crate::spec::{DataFileFormat, TableMetadata}; +use crate::io::{FileIO, OutputFile}; +use crate::spec::{DataFileFormat, SchemaId, Struct, StructType, TableMetadata}; use crate::{Error, ErrorKind, Result}; +/// `OutputFileGenerator` used to generate the output file. +#[derive(Clone, Debug)] +pub struct OutputFileGenerator { + location_generator: Arc, + file_name_generator: Arc, + file_io: FileIO, +} + +impl OutputFileGenerator { + /// Create a new `OutputFileGenerator`. + pub fn new( + file_io: FileIO, + location_generator: Arc, + file_name_generator: Arc, + ) -> Self { + Self { + location_generator, + file_name_generator, + file_io, + } + } + + /// Create a new output file. Partition value can be used to generate the location of the file. + pub fn create_output_file(&self, partition_value: &Option) -> Result { + let file_name = self.file_name_generator.generate_file_name(); + let location = self + .location_generator + .generate_location(partition_value, &file_name)?; + self.file_io.new_output(&location) + } +} + /// `LocationGenerator` used to generate the location of data file. -pub trait LocationGenerator: Clone + Send + 'static { +pub trait LocationGenerator: Send + 'static + Debug + Sync { /// Generate an absolute path for the given file name. /// e.g /// For file name "part-00000.parquet", the generated location maybe "/table/data/part-00000.parquet" - fn generate_location(&self, file_name: &str) -> String; + fn generate_location(&self, partition: &Option, file_name: &str) -> Result; } const WRITE_DATA_LOCATION: &str = "write.data.path"; @@ -40,11 +74,44 @@ const DEFAULT_DATA_DIR: &str = "/data"; /// The location is generated based on the table location and the data location in table properties. pub struct DefaultLocationGenerator { dir_path: String, + partition_type: Option, } impl DefaultLocationGenerator { /// Create a new `DefaultLocationGenerator`. pub fn new(table_metadata: TableMetadata) -> Result { + Self::new_inner(table_metadata, None) + } + + /// Create a new `DefaultLocationGenerator` with partition spec. + pub fn new_with_partition_spec( + table_metadata: TableMetadata, + partition_spec_id: i32, + schema_id: SchemaId, + ) -> Result { + let partition_spec = table_metadata + .partition_spec_by_id(partition_spec_id) + .cloned() + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("partition spec id {} not found", partition_spec_id), + ) + })?; + let schema = table_metadata.schema_by_id(schema_id).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("schema id {} not found", schema_id), + ) + })?; + let partition_type = partition_spec.partition_type(&schema)?; + Self::new_inner(table_metadata, Some(partition_type)) + } + + fn new_inner( + table_metadata: TableMetadata, + partition_type: Option, + ) -> Result { let table_location = table_metadata.location(); let rel_dir_path = { let prop = table_metadata.properties(); @@ -68,18 +135,49 @@ impl DefaultLocationGenerator { Ok(Self { dir_path: format!("{}{}", table_location, rel_dir_path), + partition_type, }) } + + /// Following implementation is based on the java implementation: + /// https://github.com/apache/iceberg/blob/d96901b843395fe669f6bd4f618f8e5e46c0eed4/api/src/main/java/org/apache/iceberg/PartitionSpec.java#L206 + fn partition_dir_path(partition: &Struct, partition_type: &StructType) -> Result { + let mut partition_path = String::with_capacity(128); + for (field, field_value) in partition_type.fields().iter().zip(partition.iter()) { + if !field.required && field_value.is_none() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Partition value is not compatible with partition type", + )); + } + partition_path.push_str(&format!( + "/{}={:?}", + field.name, + field_value.as_ref().unwrap() + )); + } + Ok(partition_path) + } } impl LocationGenerator for DefaultLocationGenerator { - fn generate_location(&self, file_name: &str) -> String { - format!("{}/{}", self.dir_path, file_name) + fn generate_location(&self, partition: &Option, file_name: &str) -> Result { + if self.partition_type.is_none() || partition.is_none() { + return Ok(format!("{}/{}", self.dir_path, file_name)); + } + let Some(partition) = partition else { + unreachable!(); + }; + let Some(partition_type) = &self.partition_type else { + unreachable!(); + }; + let partition_path = Self::partition_dir_path(partition, partition_type)?; + Ok(format!("{}{}/{}", self.dir_path, partition_path, file_name)) } } /// `FileNameGeneratorTrait` used to generate file name for data file. The file name can be passed to `LocationGenerator` to generate the location of the file. -pub trait FileNameGenerator: Clone + Send + 'static { +pub trait FileNameGenerator: Send + 'static + Debug + Sync { /// Generate a file name. fn generate_file_name(&self) -> String; } @@ -87,12 +185,12 @@ pub trait FileNameGenerator: Clone + Send + 'static { /// `DefaultFileNameGenerator` used to generate file name for data file. The file name can be /// passed to `LocationGenerator` to generate the location of the file. /// The file name format is "{prefix}-{file_count}[-{suffix}].{file_format}". -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct DefaultFileNameGenerator { prefix: String, suffix: String, format: String, - file_count: Arc, + file_count: AtomicU64, } impl DefaultFileNameGenerator { @@ -108,7 +206,7 @@ impl DefaultFileNameGenerator { prefix, suffix, format: format.to_string(), - file_count: Arc::new(AtomicU64::new(0)), + file_count: AtomicU64::new(0), } } } @@ -133,9 +231,6 @@ pub(crate) mod test { use super::LocationGenerator; use crate::spec::{FormatVersion, PartitionSpec, StructType, TableMetadata}; - use crate::writer::file_writer::location_generator::{ - FileNameGenerator, WRITE_DATA_LOCATION, WRITE_FOLDER_STORAGE_LOCATION, - }; #[derive(Clone)] pub(crate) struct MockLocationGenerator { @@ -191,7 +286,7 @@ pub(crate) mod test { let location_generator = super::DefaultLocationGenerator::new(table_metadata.clone()).unwrap(); let location = - location_generator.generate_location(&file_name_genertaor.generate_file_name()); + location_generator.generate_location(&None, &file_name_genertaor.generate_file_name()); assert_eq!(location, "s3://data.db/table/data/part-00000-test.parquet"); // test custom data location @@ -202,7 +297,7 @@ pub(crate) mod test { let location_generator = super::DefaultLocationGenerator::new(table_metadata.clone()).unwrap(); let location = - location_generator.generate_location(&file_name_genertaor.generate_file_name()); + location_generator.generate_location(&None, &file_name_genertaor.generate_file_name()); assert_eq!( location, "s3://data.db/table/data_1/part-00001-test.parquet" @@ -215,7 +310,7 @@ pub(crate) mod test { let location_generator = super::DefaultLocationGenerator::new(table_metadata.clone()).unwrap(); let location = - location_generator.generate_location(&file_name_genertaor.generate_file_name()); + location_generator.generate_location(&None, &file_name_genertaor.generate_file_name()); assert_eq!( location, "s3://data.db/table/data_2/part-00002-test.parquet" diff --git a/crates/integration_tests/tests/append_data_file_test.rs b/crates/integration_tests/tests/append_data_file_test.rs index 60d4f04c6..8af8ccdeb 100644 --- a/crates/integration_tests/tests/append_data_file_test.rs +++ b/crates/integration_tests/tests/append_data_file_test.rs @@ -29,7 +29,7 @@ use iceberg::writer::file_writer::location_generator::{ DefaultFileNameGenerator, DefaultLocationGenerator, }; use iceberg::writer::file_writer::ParquetWriterBuilder; -use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; +use iceberg::writer::{IcebergWriter, SinglePartitionWriterBuilder}; use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation}; use iceberg_integration_tests::set_test_fixture; use parquet::arrow::arrow_reader::ArrowReaderOptions; diff --git a/crates/integration_tests/tests/append_partition_data_file_test.rs b/crates/integration_tests/tests/append_partition_data_file_test.rs index 103021532..a0a9d515a 100644 --- a/crates/integration_tests/tests/append_partition_data_file_test.rs +++ b/crates/integration_tests/tests/append_partition_data_file_test.rs @@ -33,7 +33,7 @@ use iceberg::writer::file_writer::location_generator::{ DefaultFileNameGenerator, DefaultLocationGenerator, }; use iceberg::writer::file_writer::ParquetWriterBuilder; -use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; +use iceberg::writer::{IcebergWriter, SinglePartitionWriterBuilder}; use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation}; use iceberg_integration_tests::set_test_fixture; use parquet::file::properties::WriterProperties; diff --git a/crates/integration_tests/tests/conflict_commit_test.rs b/crates/integration_tests/tests/conflict_commit_test.rs index 52575d1ce..2ea97b0d5 100644 --- a/crates/integration_tests/tests/conflict_commit_test.rs +++ b/crates/integration_tests/tests/conflict_commit_test.rs @@ -29,7 +29,7 @@ use iceberg::writer::file_writer::location_generator::{ DefaultFileNameGenerator, DefaultLocationGenerator, }; use iceberg::writer::file_writer::ParquetWriterBuilder; -use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; +use iceberg::writer::{IcebergWriter, SinglePartitionWriterBuilder}; use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation}; use iceberg_integration_tests::set_test_fixture; use parquet::file::properties::WriterProperties;