Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into alamb/sort-merge-acco…
Browse files Browse the repository at this point in the history
…unting
  • Loading branch information
alamb committed Aug 8, 2023
2 parents 3505dba + 627abd7 commit 741eca3
Show file tree
Hide file tree
Showing 69 changed files with 5,138 additions and 1,530 deletions.
4 changes: 3 additions & 1 deletion .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ github:
features:
issues: true
protected_branches:
main: { }
main:
required_pull_request_reviews:
required_approving_review_count: 1

# publishes the content of the `asf-site` branch to
# https://arrow.apache.org/datafusion/
Expand Down
18 changes: 0 additions & 18 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -272,24 +272,6 @@ SUBCOMMANDS:

```

## NYC Taxi Benchmark

These benchmarks are based on the [New York Taxi and Limousine Commission][2] data set.

```bash
cargo run --release --bin nyctaxi -- --iterations 3 --path /mnt/nyctaxi/csv --format csv --batch-size 4096
```

Example output:

```bash
Running benchmarks with the following options: Opt { debug: false, iterations: 3, batch_size: 4096, path: "/mnt/nyctaxi/csv", file_format: "csv" }
Executing 'fare_amt_by_passenger'
Query 'fare_amt_by_passenger' iteration 0 took 7138 ms
Query 'fare_amt_by_passenger' iteration 1 took 7599 ms
Query 'fare_amt_by_passenger' iteration 2 took 7969 ms
```
## h2o benchmarks

```bash
Expand Down
12 changes: 6 additions & 6 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ run_tpch() {
RESULTS_FILE="${RESULTS_DIR}/tpch.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running tpch benchmark..."
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 10 --path "${TPCH_DIR}" --format parquet -o ${RESULTS_FILE}
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --format parquet -o ${RESULTS_FILE}
}

# Runs the tpch in memory
Expand All @@ -319,23 +319,23 @@ run_tpch_mem() {
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running tpch_mem benchmark..."
# -m means in memory
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 10 --path "${TPCH_DIR}" -m --format parquet -o ${RESULTS_FILE}
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" -m --format parquet -o ${RESULTS_FILE}
}

# Runs the parquet filter benchmark
run_parquet() {
RESULTS_FILE="${RESULTS_DIR}/parquet.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running parquet filter benchmark..."
$CARGO_COMMAND --bin parquet -- filter --path "${DATA_DIR}" --scale-factor 1.0 --iterations 10 -o ${RESULTS_FILE}
$CARGO_COMMAND --bin parquet -- filter --path "${DATA_DIR}" --scale-factor 1.0 --iterations 5 -o ${RESULTS_FILE}
}

# Runs the sort benchmark
run_sort() {
RESULTS_FILE="${RESULTS_DIR}/sort.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running sort benchmark..."
$CARGO_COMMAND --bin parquet -- sort --path "${DATA_DIR}" --scale-factor 1.0 --iterations 10 -o ${RESULTS_FILE}
$CARGO_COMMAND --bin parquet -- sort --path "${DATA_DIR}" --scale-factor 1.0 --iterations 5 -o ${RESULTS_FILE}
}


Expand Down Expand Up @@ -389,15 +389,15 @@ run_clickbench_1() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_1.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (1 file) benchmark..."
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 10 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
}

# Runs the clickbench benchmark with a single large parquet file
run_clickbench_partitioned() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_1.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (partitioned, 100 files) benchmark..."
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 10 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
}

compare_benchmarks() {
Expand Down
166 changes: 0 additions & 166 deletions benchmarks/src/bin/nyctaxi.rs

This file was deleted.

48 changes: 48 additions & 0 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,33 @@ use crate::physical_plan::{collect, collect_partitioned};
use crate::physical_plan::{execute_stream, execute_stream_partitioned, ExecutionPlan};
use crate::prelude::SessionContext;

/// Contains options that control how data is
/// written out from a DataFrame
pub struct DataFrameWriteOptions {
/// Controls if existing data should be overwritten
overwrite: bool, // TODO, enable DataFrame COPY TO write without TableProvider
// settings such as LOCATION and FILETYPE can be set here
// e.g. add location: Option<Path>
}

impl DataFrameWriteOptions {
/// Create a new DataFrameWriteOptions with default values
pub fn new() -> Self {
DataFrameWriteOptions { overwrite: false }
}
/// Set the overwrite option to true or false
pub fn with_overwrite(mut self, overwrite: bool) -> Self {
self.overwrite = overwrite;
self
}
}

impl Default for DataFrameWriteOptions {
fn default() -> Self {
Self::new()
}
}

/// DataFrame represents a logical set of rows with the same named columns.
/// Similar to a [Pandas DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html) or
/// [Spark DataFrame](https://spark.apache.org/docs/latest/sql-programming-guide.html)
Expand Down Expand Up @@ -925,6 +952,27 @@ impl DataFrame {
))
}

/// Write this DataFrame to the referenced table
/// This method uses on the same underlying implementation
/// as the SQL Insert Into statement.
/// Unlike most other DataFrame methods, this method executes
/// eagerly, writing data, and returning the count of rows written.
pub async fn write_table(
self,
table_name: &str,
write_options: DataFrameWriteOptions,
) -> Result<Vec<RecordBatch>, DataFusionError> {
let arrow_schema = Schema::from(self.schema());
let plan = LogicalPlanBuilder::insert_into(
self.plan,
table_name.to_owned(),
&arrow_schema,
write_options.overwrite,
)?
.build()?;
DataFrame::new(self.session_state, plan).collect().await
}

/// Write a `DataFrame` to a CSV file.
pub async fn write_csv(self, path: &str) -> Result<()> {
let plan = self.session_state.create_physical_plan(&self.plan).await?;
Expand Down
Loading

0 comments on commit 741eca3

Please sign in to comment.