Skip to content

Commit

Permalink
feat: Upgrade to arrow/parquet 15 and datafusion 9 (#652)
Browse files Browse the repository at this point in the history
Co-authored-by: Tim Van Wassenhove <[email protected]>
  • Loading branch information
xianwill and timvw authored Jun 21, 2022
1 parent 422fbef commit fd0b2da
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 77 deletions.
89 changes: 70 additions & 19 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ async-stream = { version = "0.3.2", default-features = true, optional = true }
# High-level writer
parquet-format = "~4.0.0"

arrow = "13"
parquet = "13"
arrow = "15"
parquet = "15"

crossbeam = { version = "0", optional = true }

Expand All @@ -69,7 +69,7 @@ async-trait = "0.1"
# rust-dataframe = {version = "0.*", optional = true }

[dependencies.datafusion]
version = "8"
version = "9"
optional = true

[features]
Expand Down
7 changes: 3 additions & 4 deletions rust/src/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use lazy_static::lazy_static;
use log::*;
use parquet::arrow::ArrowWriter;
use parquet::errors::ParquetError;
use parquet::file::writer::InMemoryWriteableCursor;
use regex::Regex;
use serde_json::Value;
use std::collections::HashMap;
Expand Down Expand Up @@ -402,8 +401,8 @@ fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<Vec<u8>, Checkpoi

debug!("Writing to checkpoint parquet buffer...");
// Write the Checkpoint parquet file.
let writeable_cursor = InMemoryWriteableCursor::default();
let mut writer = ArrowWriter::try_new(writeable_cursor.clone(), arrow_schema.clone(), None)?;
let mut bytes = vec![];
let mut writer = ArrowWriter::try_new(&mut bytes, arrow_schema.clone(), None)?;
let options = DecoderOptions::new().with_batch_size(CHECKPOINT_RECORD_BATCH_SIZE);
let decoder = Decoder::new(arrow_schema, options);
while let Some(batch) = decoder.next_batch(&mut jsons)? {
Expand All @@ -412,7 +411,7 @@ fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<Vec<u8>, Checkpoi
let _ = writer.close()?;
debug!("Finished writing checkpoint parquet buffer.");

Ok(writeable_cursor.data())
Ok(bytes)
}

fn checkpoint_add_from_state(
Expand Down
10 changes: 7 additions & 3 deletions rust/src/delta_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ use std::sync::Arc;

use arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema, TimeUnit};
use async_trait::async_trait;
use datafusion::datafusion_data_access::object_store::local::LocalFileSystem;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::execution::context::SessionState;
use datafusion::logical_plan::Expr;
use datafusion::physical_plan::file_format::FileScanConfig;
use datafusion::physical_plan::ExecutionPlan;
Expand Down Expand Up @@ -232,6 +233,7 @@ impl TableProvider for delta::DeltaTable {

async fn scan(
&self,
_: &SessionState,
projection: &Option<Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
Expand All @@ -252,11 +254,13 @@ impl TableProvider for delta::DeltaTable {
})
.collect::<datafusion::error::Result<_>>()?;

let df_object_store = Arc::new(LocalFileSystem {});
let dt_object_store_url = ObjectStoreUrl::parse(&self.table_uri)
.unwrap_or_else(|_| ObjectStoreUrl::local_filesystem());

ParquetFormat::default()
.create_physical_plan(
FileScanConfig {
object_store: df_object_store,
object_store_url: dt_object_store_url,
file_schema: schema,
file_groups: partitions,
statistics: self.datafusion_table_statistics(),
Expand Down
Loading

0 comments on commit fd0b2da

Please sign in to comment.