Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
Lordworms committed Jan 13, 2025
1 parent 17446ad commit 769a585
Show file tree
Hide file tree
Showing 12 changed files with 524 additions and 50 deletions.
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ mod file_stream;
mod json;
#[cfg(feature = "parquet")]
pub mod parquet;
mod row_serde;
mod statistics;

pub(crate) use self::csv::plan_to_csv;
pub(crate) use self::json::plan_to_json;
#[cfg(feature = "parquet")]
pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory};

pub use arrow_file::ArrowExec;
pub use avro::AvroExec;
pub use csv::{CsvConfig, CsvExec, CsvExecBuilder, CsvOpener};
Expand Down
219 changes: 219 additions & 0 deletions datafusion/core/src/datasource/physical_plan/row_serde.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use arrow::row::Row;
use arrow::row::RowConverter;
use arrow::row::RowConverter;
use arrow::row::Rows;
use bzip2::{read::BzDecoder, write::BzEncoder};
use datafusion_common::error::DataFusionError;
use datafusion_common::Result;
use datafusion_common::{exec_err, parsers::CompressionTypeVariant};
use flate2::{read::GzDecoder, write::GzEncoder, Compression};
use itertools::Itertools;
use std::io::{self, Read, Write};
use std::sync::Arc;
use xz2::{read::XzDecoder, write::XzEncoder};
use zstd::stream::{Decoder as ZstdDecoder, Encoder as ZstdEncoder};
/// used for spill Rows
pub trait RowDataWriter {
type Data;
fn write(&mut self, rows: &Self::Data) -> Result<(), DataFusionError>;
fn compression(&self) -> CompressionTypeVariant;
fn compress(&self, data: &[u8]) -> Result<Vec<u8>, DataFusionError> {
match self.compression() {
CompressionTypeVariant::UNCOMPRESSED => Ok(data.to_vec()),
CompressionTypeVariant::GZIP => {
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(data).map_err(DataFusionError::IoError)?;
Ok(encoder.finish().map_err(DataFusionError::IoError)?)
}
CompressionTypeVariant::BZIP2 => {
let mut encoder = BzEncoder::new(Vec::new(), bzip2::Compression::Default);
encoder.write_all(data).map_err(DataFusionError::IoError)?;
Ok(encoder.finish().map_err(DataFusionError::IoError)?)
}
CompressionTypeVariant::XZ => {
let mut encoder = XzEncoder::new(Vec::new(), 9);
encoder.write_all(data).map_err(DataFusionError::IoError)?;
Ok(encoder.finish().map_err(DataFusionError::IoError)?)
}
CompressionTypeVariant::ZSTD => {
let mut encoder =
ZstdEncoder::new(Vec::new(), 0).map_err(DataFusionError::IoError)?;
encoder.write_all(data).map_err(DataFusionError::IoError)?;
Ok(encoder.finish().map_err(DataFusionError::IoError)?)
}
}
}
}

pub trait RowDataReader {
type Data;
fn read_all(&mut self) -> Result<Vec<Self::Data>, DataFusionError>;
fn compression(&self) -> CompressionTypeVariant;
fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, DataFusionError> {
match self.compression() {
CompressionTypeVariant::UNCOMPRESSED => Ok(data.to_vec()),
CompressionTypeVariant::GZIP => {
let mut decoder = GzDecoder::new(data);
let mut decompressed = Vec::new();
decoder
.read_to_end(&mut decompressed)
.map_err(DataFusionError::IoError)?;
Ok(decompressed)
}
CompressionTypeVariant::BZIP2 => {
let mut decoder = BzDecoder::new(data);
let mut decompressed = Vec::new();
decoder
.read_to_end(&mut decompressed)
.map_err(DataFusionError::IoError)?;
Ok(decompressed)
}
CompressionTypeVariant::XZ => {
let mut decoder = XzDecoder::new(data);
let mut decompressed = Vec::new();
decoder
.read_to_end(&mut decompressed)
.map_err(DataFusionError::IoError)?;
Ok(decompressed)
}
CompressionTypeVariant::ZSTD => {
let mut decoder =
ZstdDecoder::new(data).map_err(DataFusionError::IoError)?;
let mut decompressed = Vec::new();
decoder
.read_to_end(&mut decompressed)
.map_err(DataFusionError::IoError)?;
Ok(decompressed)
}
}
}
}

fn serilize_row(row: Row<'_>) -> Result<Vec<u8>> {
let mut serilized_row = Vec::new();
let len = row.data().len();
serialize
}

pub struct CommonRowWriter {
compression: CompressionTypeVariant,
}

impl CommonRowWriter {
pub fn new(compression: CompressionTypeVariant) -> Self {
Self { compression }
}
}

impl RowDataWriter for CommonRowWriter {
type Data = Rows;

fn write(&mut self, rows: &Self::Data) -> Result<(), DataFusionError> {
let mut buffer: Vec<u8> = Vec::new();
let mut current_offset = 0;
let mut offsets = Vec::new();
for row in rows.into_iter() {
offsets.push(current_offset);
current_offset += row.data().len();
}
let offsets_len = offsets.len() as u32;
buffer.extend_from_slice(&offsets_len.to_le_bytes());

for &offset in &offsets {
buffer.extend_from_slice(&(offset as u32).to_le_bytes());
}

for row in rows.into_iter() {
buffer.extend_from_slice(row.data());
}
let compressed_data = self.compress(&buffer)?;
Ok(())
}

fn compression(&self) -> CompressionTypeVariant {
self.compression
}
}

pub struct CommonRowReader {
compression: CompressionTypeVariant,
converter: Arc<RowConverter>,
}

impl CommonRowReader {
pub fn new(
compression: CompressionTypeVariant,
converter: Arc<RowConverter>,
) -> Self {
Self {
compression,
converter,
}
}
}

impl RowDataReader for CommonRowReader {
type Data = Rows;

fn read_all(&mut self) -> Result<Vec<Self::Data>, DataFusionError> {
let compressed_data = vec![];
let decompressed_data = self.decompress(&compressed_data)?;
let mut cursor = std::io::Cursor::new(decompressed_data);

let mut offsets_len_buf = [0u8; 4];
cursor
.read_exact(&mut offsets_len_buf)
.map_err(DataFusionError::IoError)?;
let offsets_len = u32::from_le_bytes(offsets_len_buf) as usize;
let mut offsets = Vec::with_capacity(offsets_len);
for _ in 0..offsets_len {
let mut offset_buf = [0u8; 4];
cursor
.read_exact(&mut offset_buf)
.map_err(DataFusionError::IoError)?;
offsets.push(u32::from_le_bytes(offset_buf) as usize);
}
let mut buffer = Vec::new();
cursor
.read_to_end(&mut buffer)
.map_err(DataFusionError::IoError)?;

let parser = self.converter.parser();
let mut rows = self.converter.empty_rows(0, 0);

for i in 0..offsets.len() {
let start = offsets[i];
let end = if i + 1 < offsets.len() {
offsets[i + 1]
} else {
buffer.len()
};

let row_data = &buffer[start..end];
let row = parser.parse(row_data);
rows.push(row);
}

Ok(vec![rows])
}

fn compression(&self) -> CompressionTypeVariant {
self.compression
}
}
1 change: 1 addition & 0 deletions datafusion/execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ parking_lot = { workspace = true }
rand = { workspace = true }
tempfile = { workspace = true }
url = { workspace = true }
pin-project-lite = "^0.2.7"

[dev-dependencies]
chrono = { workspace = true }
5 changes: 4 additions & 1 deletion datafusion/execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,8 @@ pub mod registry {

pub use disk_manager::DiskManager;
pub use registry::FunctionRegistry;
pub use stream::{RecordBatchStream, SendableRecordBatchStream};
pub use stream::{
RecordBatchStream, RowOrColumn, RowOrColumnStream, RowOrColumnStreamAdapter,
SendableRecordBatchStream,
};
pub use task::TaskContext;
64 changes: 60 additions & 4 deletions datafusion/execution/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
// specific language governing permissions and limitations
// under the License.

use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use datafusion_common::Result;
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch, row::Rows};
use datafusion_common::{DataFusionError, Result};
use futures::Stream;
use std::pin::Pin;

use pin_project_lite::pin_project;
use std::{
fmt,
pin::Pin,
task::{Context, Poll},
};
/// Trait for types that stream [RecordBatch]
///
/// See [`SendableRecordBatchStream`] for more details.
Expand Down Expand Up @@ -51,3 +55,55 @@ pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
/// [`Stream`]s there is no mechanism to prevent callers polling so returning
/// `Ready(None)` is recommended.
pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;

pub enum RowOrColumn {
Row(Rows),
Column(RecordBatch),
}

/// Contains a Rows or a Recordbatch
pub type RowOrColumnStream = Pin<Box<dyn Stream<Item = Result<RowOrColumn>> + Send>>;

pin_project! {
pub struct RowOrColumnStreamAdapter<S> {
#[pin]
stream: S,
}
}

impl<S> RowOrColumnStreamAdapter<S> {
pub fn new(stream: S) -> Self {
Self { stream }
}
}

impl<S> fmt::Debug for RowOrColumnStreamAdapter<S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RowOrColumnStreamAdapter").finish()
}
}

impl<S> Stream for RowOrColumnStreamAdapter<S>
where
S: Stream<Item = Result<RowOrColumn>> + Unpin,
{
type Item = Result<RowOrColumn, ()>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
this.stream.poll_next(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
}
}

impl<S> From<RowOrColumnStreamAdapter<S>> for RowOrColumnStream
where
S: Stream<Item = Result<RowOrColumn, DataFusionError>> + Send + 'static,
{
fn from(adapter: RowOrColumnStreamAdapter<S>) -> Self {
Box::pin(adapter)
}
}
12 changes: 7 additions & 5 deletions datafusion/physical-plan/src/sorts/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use crate::spill::get_record_batch_memory_size;
use arrow::compute::interleave;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use arrow::row::Rows;
use arrow_array::ArrayRef;
use datafusion_common::Result;
use datafusion_execution::memory_pool::MemoryReservation;
use std::sync::Arc;
Expand All @@ -31,14 +33,14 @@ struct BatchCursor {
row_idx: usize,
}

/// Provides an API to incrementally build a [`RecordBatch`] from partitioned [`RecordBatch`]
/// Provides an API to incrementally build a [`RecordBatch`] from partitioned [`Rows`]
#[derive(Debug)]
pub struct BatchBuilder {
/// The schema of the RecordBatches yielded by this stream
schema: SchemaRef,

/// Maintain a list of [`RecordBatch`] and their corresponding stream
batches: Vec<(usize, RecordBatch)>,
rows: Vec<(usize, Vec<ArrayRef>)>,

/// Accounts for memory used by buffered batches
reservation: MemoryReservation,
Expand All @@ -61,7 +63,7 @@ impl BatchBuilder {
) -> Self {
Self {
schema,
batches: Vec::with_capacity(stream_count * 2),
rows: Vec::with_capacity(stream_count * 2),
cursors: vec![BatchCursor::default(); stream_count],
indices: Vec::with_capacity(batch_size),
reservation,
Expand All @@ -73,7 +75,7 @@ impl BatchBuilder {
self.reservation
.try_grow(get_record_batch_memory_size(&batch))?;
let batch_idx = self.batches.len();
self.batches.push((stream_idx, batch));
self.rows.push((stream_idx, batch));
self.cursors[stream_idx] = BatchCursor {
batch_idx,
row_idx: 0,
Expand Down Expand Up @@ -117,7 +119,7 @@ impl BatchBuilder {
let columns = (0..self.schema.fields.len())
.map(|column_idx| {
let arrays: Vec<_> = self
.batches
.rows
.iter()
.map(|(_, batch)| batch.column(column_idx).as_ref())
.collect();
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/sorts/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ impl RowValues {
_reservation: reservation,
}
}
/// get the size of row_values
pub fn size(&self) -> usize {
self.rows.size()
}
}

impl CursorValues for RowValues {
Expand Down
1 change: 0 additions & 1 deletion datafusion/physical-plan/src/sorts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,4 @@ pub mod sort;
pub mod sort_preserving_merge;
mod stream;
pub mod streaming_merge;

pub use index::RowIndex;
Loading

0 comments on commit 769a585

Please sign in to comment.