Skip to content

Commit

Permalink
Minor: remove duplication in create_writer
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Aug 8, 2023
1 parent 3d917a0 commit 11cb2b1
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 133 deletions.
83 changes: 17 additions & 66 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,13 @@ use bytes::{Buf, Bytes};
use futures::stream::BoxStream;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore};
use tokio::io::AsyncWrite;

use super::{stateless_serialize_and_write_files, FileFormat};
use super::{create_writer, stateless_serialize_and_write_files, FileFormat};
use crate::datasource::file_format::file_type::FileCompressionType;
use crate::datasource::file_format::FileWriterMode;
use crate::datasource::file_format::{
AbortMode, AbortableWrite, AsyncPutWriter, BatchSerializer, MultiPart,
DEFAULT_SCHEMA_INFER_MAX_RECORD,
};
use crate::datasource::file_format::{BatchSerializer, DEFAULT_SCHEMA_INFER_MAX_RECORD};
use crate::datasource::physical_plan::{
CsvExec, FileGroupDisplay, FileMeta, FileScanConfig, FileSinkConfig,
CsvExec, FileGroupDisplay, FileScanConfig, FileSinkConfig,
};
use crate::error::Result;
use crate::execution::context::SessionState;
Expand Down Expand Up @@ -494,56 +490,6 @@ impl CsvSink {
file_compression_type,
}
}

// Create a write for Csv files
async fn create_writer(
&self,
file_meta: FileMeta,
object_store: Arc<dyn ObjectStore>,
) -> Result<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>> {
let object = &file_meta.object_meta;
match self.config.writer_mode {
// If the mode is append, call the store's append method and return wrapped in
// a boxed trait object.
FileWriterMode::Append => {
let writer = object_store
.append(&object.location)
.await
.map_err(DataFusionError::ObjectStore)?;
let writer = AbortableWrite::new(
self.file_compression_type.convert_async_writer(writer)?,
AbortMode::Append,
);
Ok(writer)
}
// If the mode is put, create a new AsyncPut writer and return it wrapped in
// a boxed trait object
FileWriterMode::Put => {
let writer = Box::new(AsyncPutWriter::new(object.clone(), object_store));
let writer = AbortableWrite::new(
self.file_compression_type.convert_async_writer(writer)?,
AbortMode::Put,
);
Ok(writer)
}
// If the mode is put multipart, call the store's put_multipart method and
// return the writer wrapped in a boxed trait object.
FileWriterMode::PutMultipart => {
let (multipart_id, writer) = object_store
.put_multipart(&object.location)
.await
.map_err(DataFusionError::ObjectStore)?;
Ok(AbortableWrite::new(
self.file_compression_type.convert_async_writer(writer)?,
AbortMode::MultiPart(MultiPart::new(
object_store,
multipart_id,
object.location.clone(),
)),
))
}
}
}
}

#[async_trait]
Expand Down Expand Up @@ -577,12 +523,13 @@ impl DataSink for CsvSink {
serializers.push(Box::new(serializer));

let file = file_group.clone();
let writer = self
.create_writer(
file.object_meta.clone().into(),
object_store.clone(),
)
.await?;
let writer = create_writer(
self.config.writer_mode,
self.file_compression_type,
file.object_meta.clone().into(),
object_store.clone(),
)
.await?;
writers.push(writer);
}
}
Expand Down Expand Up @@ -612,9 +559,13 @@ impl DataSink for CsvSink {
size: 0,
e_tag: None,
};
let writer = self
.create_writer(object_meta.into(), object_store.clone())
.await?;
let writer = create_writer(
self.config.writer_mode,
self.file_compression_type,
object_meta.into(),
object_store.clone(),
)
.await?;
writers.push(writer);
}
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/file_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub trait GetExt {
}

/// Readable file compression type
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct FileCompressionType {
variant: CompressionTypeVariant,
}
Expand Down
80 changes: 15 additions & 65 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use std::fmt;
use std::fmt::Debug;
use std::io::BufReader;
use std::sync::Arc;
use tokio::io::AsyncWrite;

use arrow::datatypes::Schema;
use arrow::datatypes::SchemaRef;
Expand All @@ -43,21 +42,17 @@ use datafusion_physical_expr::PhysicalExpr;
use object_store::{GetResult, ObjectMeta, ObjectStore};

use crate::datasource::physical_plan::FileGroupDisplay;
use crate::datasource::physical_plan::FileMeta;
use crate::physical_plan::insert::DataSink;
use crate::physical_plan::insert::InsertExec;
use crate::physical_plan::SendableRecordBatchStream;
use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};

use super::create_writer;
use super::stateless_serialize_and_write_files;
use super::AbortMode;
use super::AbortableWrite;
use super::AsyncPutWriter;
use super::BatchSerializer;
use super::FileFormat;
use super::FileScanConfig;
use super::FileWriterMode;
use super::MultiPart;
use crate::datasource::file_format::file_type::FileCompressionType;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::datasource::physical_plan::FileSinkConfig;
Expand Down Expand Up @@ -266,56 +261,6 @@ impl JsonSink {
file_compression_type,
}
}

// Create a write for Json files
async fn create_writer(
&self,
file_meta: FileMeta,
object_store: Arc<dyn ObjectStore>,
) -> Result<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>> {
let object = &file_meta.object_meta;
match self.config.writer_mode {
// If the mode is append, call the store's append method and return wrapped in
// a boxed trait object.
FileWriterMode::Append => {
let writer = object_store
.append(&object.location)
.await
.map_err(DataFusionError::ObjectStore)?;
let writer = AbortableWrite::new(
self.file_compression_type.convert_async_writer(writer)?,
AbortMode::Append,
);
Ok(writer)
}
// If the mode is put, create a new AsyncPut writer and return it wrapped in
// a boxed trait object
FileWriterMode::Put => {
let writer = Box::new(AsyncPutWriter::new(object.clone(), object_store));
let writer = AbortableWrite::new(
self.file_compression_type.convert_async_writer(writer)?,
AbortMode::Put,
);
Ok(writer)
}
// If the mode is put multipart, call the store's put_multipart method and
// return the writer wrapped in a boxed trait object.
FileWriterMode::PutMultipart => {
let (multipart_id, writer) = object_store
.put_multipart(&object.location)
.await
.map_err(DataFusionError::ObjectStore)?;
Ok(AbortableWrite::new(
self.file_compression_type.convert_async_writer(writer)?,
AbortMode::MultiPart(MultiPart::new(
object_store,
multipart_id,
object.location.clone(),
)),
))
}
}
}
}

#[async_trait]
Expand All @@ -341,12 +286,13 @@ impl DataSink for JsonSink {
serializers.push(Box::new(serializer));

let file = file_group.clone();
let writer = self
.create_writer(
file.object_meta.clone().into(),
object_store.clone(),
)
.await?;
let writer = create_writer(
self.config.writer_mode,
self.file_compression_type,
file.object_meta.clone().into(),
object_store.clone(),
)
.await?;
writers.push(writer);
}
}
Expand All @@ -372,9 +318,13 @@ impl DataSink for JsonSink {
size: 0,
e_tag: None,
};
let writer = self
.create_writer(object_meta.into(), object_store.clone())
.await?;
let writer = create_writer(
self.config.writer_mode,
self.file_compression_type,
object_meta.into(),
object_store.clone(),
)
.await?;
writers.push(writer);
}
}
Expand Down
59 changes: 58 additions & 1 deletion datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ use futures::{ready, StreamExt};
use object_store::path::Path;
use object_store::{MultipartId, ObjectMeta, ObjectStore};
use tokio::io::{AsyncWrite, AsyncWriteExt};

use self::file_type::FileCompressionType;

use super::physical_plan::FileMeta;
/// This trait abstracts all the file format specific implementations
/// from the [`TableProvider`]. This helps code re-utilization across
/// providers that support the the same file formats.
Expand Down Expand Up @@ -235,7 +239,7 @@ pub(crate) enum AbortMode {
}

/// A wrapper struct with abort method and writer
struct AbortableWrite<W: AsyncWrite + Unpin + Send> {
pub(crate) struct AbortableWrite<W: AsyncWrite + Unpin + Send> {
writer: W,
mode: AbortMode,
}
Expand Down Expand Up @@ -306,6 +310,59 @@ pub enum FileWriterMode {
/// Data is written to a new file in multiple parts.
PutMultipart,
}

/// return an [`AbortableWrite`] that writes to the specified object
/// store location and compression
pub(crate) async fn create_writer(
writer_mode: FileWriterMode,
file_compression_type: FileCompressionType,
file_meta: FileMeta,
object_store: Arc<dyn ObjectStore>,
) -> Result<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>> {
let object = &file_meta.object_meta;
match writer_mode {
// If the mode is append, call the store's append method and return wrapped in
// a boxed trait object.
FileWriterMode::Append => {
let writer = object_store
.append(&object.location)
.await
.map_err(DataFusionError::ObjectStore)?;
let writer = AbortableWrite::new(
file_compression_type.convert_async_writer(writer)?,
AbortMode::Append,
);
Ok(writer)
}
// If the mode is put, create a new AsyncPut writer and return it wrapped in
// a boxed trait object
FileWriterMode::Put => {
let writer = Box::new(AsyncPutWriter::new(object.clone(), object_store));
let writer = AbortableWrite::new(
file_compression_type.convert_async_writer(writer)?,
AbortMode::Put,
);
Ok(writer)
}
// If the mode is put multipart, call the store's put_multipart method and
// return the writer wrapped in a boxed trait object.
FileWriterMode::PutMultipart => {
let (multipart_id, writer) = object_store
.put_multipart(&object.location)
.await
.map_err(DataFusionError::ObjectStore)?;
Ok(AbortableWrite::new(
file_compression_type.convert_async_writer(writer)?,
AbortMode::MultiPart(MultiPart::new(
object_store,
multipart_id,
object.location.clone(),
)),
))
}
}
}

/// A trait that defines the methods required for a RecordBatch serializer.
#[async_trait]
pub trait BatchSerializer: Unpin + Send {
Expand Down

0 comments on commit 11cb2b1

Please sign in to comment.