diff --git a/README.md b/README.md index df30641ee..2286761e4 100644 --- a/README.md +++ b/README.md @@ -17,36 +17,64 @@ under the License. --> -# Ballista: Distributed SQL Query Engine, built on Apache Arrow +# Ballista: Making DataFusion Applications Distributed -Ballista is a distributed SQL query engine powered by the Rust implementation of [Apache Arrow][arrow] and -[Apache Arrow DataFusion][datafusion]. +Ballista is a library which makes [Apache DataFusion](https://github.com/apache/datafusion) applications distributed. -If you are looking for documentation for a released version of Ballista, please refer to the -[Ballista User Guide][user-guide]. +Existing DataFusion application: -## Overview +```rust +use datafusion::prelude::*; -Ballista implements a similar design to Apache Spark (particularly Spark SQL), but there are some key differences: +#[tokio::main] +async fn main() -> datafusion::error::Result<()> { + let ctx = SessionContext::new(); -- The choice of Rust as the main execution language avoids the overhead of GC pauses and results in deterministic - processing times. -- Ballista is designed from the ground up to use columnar data, enabling a number of efficiencies such as vectorized - processing (SIMD) and efficient compression. Although Spark does have some columnar support, it is still - largely row-based today. -- The combination of Rust and Arrow provides excellent memory efficiency and memory usage can be 5x - 10x lower than - Apache Spark in some cases, which means that more processing can fit on a single node, reducing the overhead of - distributed compute. -- The use of Apache Arrow as the memory model and network protocol means that data can be exchanged efficiently between - executors using the [Flight Protocol][flight], and between clients and schedulers/executors using the - [Flight SQL Protocol][flight-sql] + // register the table + ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?; + + // create a plan to run a SQL query + let df = ctx.sql("SELECT a, MIN(b) FROM example WHERE a <= b GROUP BY a LIMIT 100").await?; + + // execute and print results + df.show().await?; + Ok(()) +} +``` + +can be distributed with few lines of code changed: + +> [!IMPORTANT] +> There is a gap between DataFusion and Ballista, which may bring incompatibilities. The community is working hard to close this gap + +```rust +use ballista::prelude::*; +use datafusion::prelude::*; + +#[tokio::main] +async fn main() -> datafusion::error::Result<()> { + // create DataFusion SessionContext with ballista standalone cluster started + let ctx = SessionContext::standalone(); + + // register the table + ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?; + + // create a plan to run a SQL query + let df = ctx.sql("SELECT a, MIN(b) FROM example WHERE a <= b GROUP BY a LIMIT 100").await?; + + // execute and print results + df.show().await?; + Ok(()) +} +``` + +If you are looking for documentation or more examples, please refer to the [Ballista User Guide][user-guide]. ## Architecture A Ballista cluster consists of one or more scheduler processes and one or more executor processes. These processes can be run as native binaries and are also available as Docker Images, which can be easily deployed with -[Docker Compose](https://datafusion.apache.org/ballista/user-guide/deployment/docker-compose.html) or -[Kubernetes](https://datafusion.apache.org/ballista/user-guide/deployment/kubernetes.html). +[Docker Compose](https://datafusion.apache.org/ballista/user-guide/deployment/docker-compose.html). The following diagram shows the interaction between clients and the scheduler for submitting jobs, and the interaction between the executor(s) and the scheduler for fetching tasks and reporting task status. @@ -55,15 +83,6 @@ between the executor(s) and the scheduler for fetching tasks and reporting task See the [architecture guide](docs/source/contributors-guide/architecture.md) for more details. -## Features - -- Supports cloud object stores. S3 is supported today and GCS and Azure support is planned. -- DataFrame and SQL APIs available from Python and Rust. -- Clients can connect to a Ballista cluster using [Flight SQL][flight-sql]. -- JDBC support via Arrow Flight SQL JDBC Driver -- Scheduler REST UI for monitoring query progress and viewing query plans and metrics. -- Support for Docker, Docker Compose, and Kubernetes deployment, as well as manual deployment on bare metal. - ## Performance We run some simple benchmarks comparing Ballista with Apache Spark to track progress with performance optimizations. @@ -81,19 +100,14 @@ that, refer to the [Getting Started Guide](ballista/client/README.md). ## Project Status -Ballista supports a wide range of SQL, including CTEs, Joins, and Subqueries and can execute complex queries at scale. +Ballista supports a wide range of SQL, including CTEs, Joins, and subqueries and can execute complex queries at scale, +but still there is a gap between DataFusion and Ballista which we want to bridge in near future. Refer to the [DataFusion SQL Reference](https://datafusion.apache.org/user-guide/sql/index.html) for more information on supported SQL. -Ballista is maturing quickly and is now working towards being production ready. See the [roadmap](ROADMAP.md) for more details. - ## Contribution Guide Please see the [Contribution Guide](CONTRIBUTING.md) for information about contributing to Ballista. -[arrow]: https://arrow.apache.org/ -[datafusion]: https://github.com/apache/arrow-datafusion -[flight]: https://arrow.apache.org/blog/2019/10/13/introducing-arrow-flight/ -[flight-sql]: https://arrow.apache.org/blog/2022/02/16/introducing-arrow-flight-sql/ [user-guide]: https://datafusion.apache.org/ballista/ diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 880ee4901..2fd35157c 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -19,46 +19,74 @@ # Configuration -## BallistaContext Configuration Settings +## Ballista Configuration Settings -Ballista has a number of configuration settings that can be specified when creating a BallistaContext. +Configuring Ballista is like configuring DataFusion. Apart for a few specific Ballista specific configurations all others are the same like DataFusion. _Example: Specifying configuration options when creating a context_ ```rust -let config = BallistaConfig::builder() -.set("ballista.shuffle.partitions", "200") -.set("ballista.batch.size", "16384") -.build() ?; +use ballista::extension::{SessionConfigExt, SessionContextExt}; -let ctx = BallistaContext::remote("localhost", 50050, & config).await?; +let session_config = SessionConfig::new_with_ballista() + .with_information_schema(true) + .with_ballista_job_name("Super Cool Ballista App"); + +let state = SessionStateBuilder::new() + .with_default_features() + .with_config(session_config) + .build(); + +let ctx: SessionContext = SessionContext::remote_with_state(&url,state).await?; +``` + +`SessionConfig::new_with_ballista()` will setup `SessionConfig` for use with ballista. This is not required, `SessionConfig::new` could be used, but it's advised as it will set up some sensible configuration defaults . + +`SessionConfigExt` expose set of `SessionConfigExt::with_ballista_` and `SessionConfigExt::ballista_` methods which can tune retrieve ballista specific options. + +Notable `SessionConfigExt` configuration methods would be: + +```rust +/// Overrides ballista's [LogicalExtensionCodec] +fn with_ballista_logical_extension_codec( + self, + codec: Arc, +) -> SessionConfig; + +/// Overrides ballista's [PhysicalExtensionCodec] +fn with_ballista_physical_extension_codec( + self, + codec: Arc, +) -> SessionConfig; + +/// Overrides ballista's [QueryPlanner] +fn with_ballista_query_planner( + self, + planner: Arc, +) -> SessionConfig; ``` -### Ballista Configuration Settings - -| key | type | default | description | -| --------------------------------- | ------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| ballista.job.name | Utf8 | N/A | Sets the job name that will appear in the web user interface for any submitted jobs. | -| ballista.shuffle.partitions | UInt16 | 16 | Sets the default number of partitions to create when repartitioning query stages. | -| ballista.batch.size | UInt16 | 8192 | Sets the default batch size. | -| ballista.repartition.joins | Boolean | true | When set to true, Ballista will repartition data using the join keys to execute joins in parallel using the provided `ballista.shuffle.partitions` level. | -| ballista.repartition.aggregations | Boolean | true | When set to true, Ballista will repartition data using the aggregate keys to execute aggregates in parallel using the provided `ballista.shuffle.partitions` level. | -| ballista.repartition.windows | Boolean | true | When set to true, Ballista will repartition data using the partition keys to execute window functions in parallel using the provided `ballista.shuffle.partitions` level. | -| ballista.parquet.pruning | Boolean | true | Determines whether Parquet pruning should be enabled or not. | -| ballista.with_information_schema | Boolean | true | Determines whether the `information_schema` should be created in the context. This is necessary for supporting DDL commands such as `SHOW TABLES`. | - -### DataFusion Configuration Settings - -In addition to Ballista-specific configuration settings, the following DataFusion settings can also be specified. - -| key | type | default | description | -| ----------------------------------------------- | ------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| datafusion.execution.coalesce_batches | Boolean | true | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting 'datafusion.execution.coalesce_target_batch_size'. | -| datafusion.execution.coalesce_target_batch_size | UInt64 | 4096 | Target batch size when coalescing batches. Uses in conjunction with the configuration setting 'datafusion.execution.coalesce_batches'. | -| datafusion.explain.logical_plan_only | Boolean | false | When set to true, the explain statement will only print logical plans. | -| datafusion.explain.physical_plan_only | Boolean | false | When set to true, the explain statement will only print physical plans. | -| datafusion.optimizer.filter_null_join_keys | Boolean | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | -| datafusion.optimizer.skip_failed_rules | Boolean | true | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail. | +which could be used to change default ballista behavior. + +If information schema is enabled all configuration parameters could be retrieved or set using SQL; + +```rust +let ctx: SessionContext = SessionContext::remote_with_state(&url, state).await?; + +let result = ctx + .sql("select name, value from information_schema.df_settings where name like 'ballista'") + .await? + .collect() + .await?; + +let expected = [ + "+-------------------+-------------------------+", + "| name | value |", + "+-------------------+-------------------------+", + "| ballista.job.name | Super Cool Ballista App |", + "+-------------------+-------------------------+", +]; +``` ## Ballista Scheduler Configuration Settings diff --git a/docs/source/user-guide/deployment/quick-start.md b/docs/source/user-guide/deployment/quick-start.md index d94de0475..df8434beb 100644 --- a/docs/source/user-guide/deployment/quick-start.md +++ b/docs/source/user-guide/deployment/quick-start.md @@ -19,16 +19,14 @@ # Ballista Quickstart -A simple way to start a local cluster for testing purposes is to use cargo to build the project and then run the scheduler and executor binaries directly along with the Ballista UI. +A simple way to start a local cluster for testing purposes is to use cargo to build the project and then run the scheduler and executor binaries directly. Project Requirements: - [Rust](https://www.rust-lang.org/tools/install) - [Protobuf Compiler](https://protobuf.dev/downloads/) -- [Node.js](https://nodejs.org/en/download) -- [Yarn](https://classic.yarnpkg.com/lang/en/docs/install) -### Build the project +## Build the project From the root of the project, build release binaries. @@ -55,39 +53,47 @@ RUST_LOG=info ./target/release/ballista-executor -c 2 -p 50052 The examples can be run using the `cargo run --bin` syntax. Open a new terminal session and run the following commands. -## Running the examples - -## Distributed SQL Example +### Distributed SQL Example ```bash cd examples cargo run --release --example remote-sql ``` -### Source code for distributed SQL example +#### Source code for distributed SQL example ```rust use ballista::prelude::*; -use datafusion::prelude::CsvReadOptions; +use ballista_examples::test_util; +use datafusion::{ + execution::SessionStateBuilder, + prelude::{CsvReadOptions, SessionConfig, SessionContext}, +}; /// This example demonstrates executing a simple query against an Arrow data source (CSV) and /// fetching results, using SQL #[tokio::main] async fn main() -> Result<()> { - let config = BallistaConfig::builder() - .set("ballista.shuffle.partitions", "4") - .build()?; - let ctx = BallistaContext::remote("localhost", 50050, &config).await?; + let config = SessionConfig::new_with_ballista() + .with_target_partitions(4) + .with_ballista_job_name("Remote SQL Example"); + + let state = SessionStateBuilder::new() + .with_config(config) + .with_default_features() + .build(); + + let ctx = SessionContext::remote_with_state("df://localhost:50050", state).await?; + + let test_data = test_util::examples_test_data(); - // register csv file with the execution context ctx.register_csv( "test", - "testdata/aggregate_test_100.csv", + &format!("{test_data}/aggregate_test_100.csv"), CsvReadOptions::new(), ) .await?; - // execute the query let df = ctx .sql( "SELECT c1, MIN(c12), MAX(c12) \ @@ -97,40 +103,42 @@ async fn main() -> Result<()> { ) .await?; - // print the results df.show().await?; Ok(()) } ``` -## Distributed DataFrame Example +### Distributed DataFrame Example ```bash cd examples cargo run --release --example remote-dataframe ``` -### Source code for distributed DataFrame example +#### Source code for distributed DataFrame example ```rust +use ballista::prelude::*; +use ballista_examples::test_util; +use datafusion::{ + prelude::{col, lit, ParquetReadOptions, SessionContext}, +}; + #[tokio::main] async fn main() -> Result<()> { - let config = BallistaConfig::builder() - .set("ballista.shuffle.partitions", "4") - .build()?; - let ctx = BallistaContext::remote("localhost", 50050, &config).await?; + // creating SessionContext with default settings + let ctx = SessionContext::remote("df://localhost:50050".await?; - let filename = "testdata/alltypes_plain.parquet"; + let test_data = test_util::examples_test_data(); + let filename = format!("{test_data}/alltypes_plain.parquet"); - // define the query using the DataFrame trait let df = ctx .read_parquet(filename, ParquetReadOptions::default()) .await? .select_columns(&["id", "bool_col", "timestamp_col"])? .filter(col("id").gt(lit(1)))?; - // print the results df.show().await?; Ok(()) diff --git a/docs/source/user-guide/faq.md b/docs/source/user-guide/faq.md index 5cfd9fe8b..32462e77e 100644 --- a/docs/source/user-guide/faq.md +++ b/docs/source/user-guide/faq.md @@ -25,4 +25,4 @@ DataFusion is a library for executing queries in-process using the Apache Arrow model and computational kernels. It is designed to run within a single process, using threads for parallel query execution. -Ballista is a distributed compute platform built on DataFusion. +Ballista is a distributed compute platform to DataFusion workloads. diff --git a/docs/source/user-guide/flightsql.md b/docs/source/user-guide/flightsql.md index 4572eef91..b6bb71f4a 100644 --- a/docs/source/user-guide/flightsql.md +++ b/docs/source/user-guide/flightsql.md @@ -21,6 +21,8 @@ One of the easiest ways to start with Ballista is to plug it into your existing data infrastructure using support for Arrow Flight SQL JDBC. +> This is optional scheduler feature which should be enabled with `flight-sql` feature + Getting started involves these main steps: 1. [Installing prerequisites](#prereq) diff --git a/docs/source/user-guide/metrics.md b/docs/source/user-guide/metrics.md index 7e831dcab..c061f3d44 100644 --- a/docs/source/user-guide/metrics.md +++ b/docs/source/user-guide/metrics.md @@ -21,6 +21,8 @@ ## Prometheus +> This is optional scheduler feature which should be enabled with `prometheus-metrics` feature + Built with default features, the ballista scheduler will automatically collect and expose a standard set of prometheus metrics. The metrics currently collected automatically include: diff --git a/docs/source/user-guide/rust.md b/docs/source/user-guide/rust.md index e3015e763..22416e243 100644 --- a/docs/source/user-guide/rust.md +++ b/docs/source/user-guide/rust.md @@ -17,78 +17,126 @@ under the License. --> -# Ballista Rust Client +# Distributing DataFusion with Ballista -To connect to a Ballista cluster from Rust, first start by creating a `BallistaContext`. +To connect to a Ballista cluster from Rust, first start by creating a `SessionContext` connected to remote scheduler server. ```rust -let config = BallistaConfig::builder() - .set("ballista.shuffle.partitions", "4") - .build()?; +use ballista::prelude::*; +use datafusion::{ + execution::SessionStateBuilder, + prelude::{SessionConfig, SessionContext}, +}; + +let config = SessionConfig::new_with_ballista() + .with_target_partitions(4) + .with_ballista_job_name("Remote SQL Example"); + +let state = SessionStateBuilder::new() + .with_config(config) + .with_default_features() + .build(); + +let ctx = SessionContext::remote_with_state("df://localhost:50050", state).await?; +``` + +For testing purposes, standalone, in process cluster could be started with: + +```rust +use ballista::prelude::*; +use datafusion::{ + execution::SessionStateBuilder, + prelude::{SessionConfig, SessionContext}, +}; +let config = SessionConfig::new_with_ballista() + .with_target_partitions(1) + .with_ballista_standalone_parallelism(2); + +let state = SessionStateBuilder::new() + .with_config(config) + .with_default_features() + .build(); + +let ctx = SessionContext::standalone_with_state(state).await?; -// connect to Ballista scheduler -let ctx = BallistaContext::remote("localhost", 50050, &config); ``` -Here is a full example using the DataFrame API. +Following examples require running remove scheduler and executor nodes. + +Full example using the DataFrame API. ```rust +use ballista::prelude::*; +use ballista_examples::test_util; +use datafusion::{ + prelude::{col, lit, ParquetReadOptions, SessionContext}, +}; + +/// This example demonstrates executing a simple query against an Arrow data source (Parquet) and +/// fetching results, using the DataFrame trait #[tokio::main] async fn main() -> Result<()> { - let config = BallistaConfig::builder() - .set("ballista.shuffle.partitions", "4") - .build()?; + // creating SessionContext with default settings + let ctx = SessionContext::remote("df://localhost:50050".await?; - // connect to Ballista scheduler - let ctx = BallistaContext::remote("localhost", 50050, &config); + let test_data = test_util::examples_test_data(); + let filename = format!("{test_data}/alltypes_plain.parquet"); - let testdata = datafusion::test_util::parquet_test_data(); - - let filename = &format!("{}/alltypes_plain.parquet", testdata); - - // define the query using the DataFrame trait let df = ctx - .read_parquet(filename)? + .read_parquet(filename, ParquetReadOptions::default()) + .await? .select_columns(&["id", "bool_col", "timestamp_col"])? .filter(col("id").gt(lit(1)))?; - // print the results df.show().await?; Ok(()) } ``` -Here is a full example demonstrating SQL usage. +Here is a full example demonstrating SQL usage, with user specific `SessionConfig`: ```rust +use ballista::prelude::*; +use ballista_examples::test_util; +use datafusion::{ + execution::SessionStateBuilder, + prelude::{CsvReadOptions, SessionConfig, SessionContext}, +}; + +/// This example demonstrates executing a simple query against an Arrow data source (CSV) and +/// fetching results, using SQL #[tokio::main] async fn main() -> Result<()> { - let config = BallistaConfig::builder() - .set("ballista.shuffle.partitions", "4") - .build()?; + let config = SessionConfig::new_with_ballista() + .with_target_partitions(4) + .with_ballista_job_name("Remote SQL Example"); - // connect to Ballista scheduler - let ctx = BallistaContext::remote("localhost", 50050, &config); + let state = SessionStateBuilder::new() + .with_config(config) + .with_default_features() + .build(); - let testdata = datafusion::test_util::arrow_test_data(); + let ctx = SessionContext::remote_with_state("df://localhost:50050", state).await?; + + let test_data = test_util::examples_test_data(); - // register csv file with the execution context ctx.register_csv( - "aggregate_test_100", - &format!("{}/csv/aggregate_test_100.csv", testdata), + "test", + &format!("{test_data}/aggregate_test_100.csv"), CsvReadOptions::new(), - )?; + ) + .await?; - // execute the query - let df = ctx.sql( - "SELECT c1, MIN(c12), MAX(c12) \ - FROM aggregate_test_100 \ + let df = ctx + .sql( + "SELECT c1, MIN(c12), MAX(c12) \ + FROM test \ WHERE c11 > 0.1 AND c11 < 0.9 \ GROUP BY c1", - )?; + ) + .await?; - // print the results df.show().await?; Ok(()) diff --git a/docs/source/user-guide/scheduler.md b/docs/source/user-guide/scheduler.md index f6a3ca6a8..80bc752e8 100644 --- a/docs/source/user-guide/scheduler.md +++ b/docs/source/user-guide/scheduler.md @@ -23,6 +23,8 @@ The scheduler also provides a REST API that allows jobs to be monitored. +> This is optional scheduler feature which should be enabled with `rest-api` feature + | API | Method | Description | | --------------------- | ------ | ----------------------------------------------------------- | | /api/jobs | GET | Get a list of jobs that have been submitted to the cluster. | diff --git a/examples/README.md b/examples/README.md index 4eaaac1e2..14604ac2b 100644 --- a/examples/README.md +++ b/examples/README.md @@ -21,7 +21,7 @@ This directory contains examples for executing distributed queries with Ballista. -# Standalone Examples +## Standalone Examples The standalone example is the easiest to get started with. Ballista supports a standalone mode where a scheduler and executor are started in-process. @@ -33,18 +33,35 @@ cargo run --example standalone_sql --features="ballista/standalone" ### Source code for standalone SQL example ```rust +use ballista::{ + extension::SessionConfigExt, + prelude::* +}; +use datafusion::{ + execution::{options::ParquetReadOptions, SessionStateBuilder}, + prelude::{SessionConfig, SessionContext}, +}; + #[tokio::main] async fn main() -> Result<()> { - let config = BallistaConfig::builder() - .set("ballista.shuffle.partitions", "1") - .build()?; + let config = SessionConfig::new_with_ballista() + .with_target_partitions(1) + .with_ballista_standalone_parallelism(2); - let ctx = BallistaContext::standalone(&config, 2).await?; + let state = SessionStateBuilder::new() + .with_config(config) + .with_default_features() + .build(); - ctx.register_csv( + let ctx = SessionContext::standalone_with_state(state).await?; + + let test_data = test_util::examples_test_data(); + + // register parquet file with the execution context + ctx.register_parquet( "test", - "testdata/aggregate_test_100.csv", - CsvReadOptions::new(), + &format!("{test_data}/alltypes_plain.parquet"), + ParquetReadOptions::default(), ) .await?; @@ -56,12 +73,12 @@ async fn main() -> Result<()> { ``` -# Distributed Examples +## Distributed Examples For background information on the Ballista architecture, refer to the [Ballista README](../ballista/client/README.md). -## Start a standalone cluster +### Start a standalone cluster From the root of the project, build release binaries. @@ -83,40 +100,49 @@ RUST_LOG=info ./target/release/ballista-executor -c 2 -p 50051 RUST_LOG=info ./target/release/ballista-executor -c 2 -p 50052 ``` -## Running the examples +### Running the examples The examples can be run using the `cargo run --bin` syntax. -## Distributed SQL Example +### Distributed SQL Example ```bash cargo run --release --example remote-sql ``` -### Source code for distributed SQL example +#### Source code for distributed SQL example ```rust -use ballista::prelude::*; -use datafusion::prelude::CsvReadOptions; +use ballista::{extension::SessionConfigExt, prelude::*}; +use datafusion::{ + execution::SessionStateBuilder, + prelude::{CsvReadOptions, SessionConfig, SessionContext}, +}; /// This example demonstrates executing a simple query against an Arrow data source (CSV) and /// fetching results, using SQL #[tokio::main] async fn main() -> Result<()> { - let config = BallistaConfig::builder() - .set("ballista.shuffle.partitions", "4") - .build()?; - let ctx = BallistaContext::remote("localhost", 50050, &config).await?; + let config = SessionConfig::new_with_ballista() + .with_target_partitions(4) + .with_ballista_job_name("Remote SQL Example"); + + let state = SessionStateBuilder::new() + .with_config(config) + .with_default_features() + .build(); + + let ctx = SessionContext::remote_with_state("df://localhost:50050", state).await?; + + let test_data = test_util::examples_test_data(); - // register csv file with the execution context ctx.register_csv( "test", - "testdata/aggregate_test_100.csv", + &format!("{test_data}/aggregate_test_100.csv"), CsvReadOptions::new(), ) .await?; - // execute the query let df = ctx .sql( "SELECT c1, MIN(c12), MAX(c12) \ @@ -126,39 +152,49 @@ async fn main() -> Result<()> { ) .await?; - // print the results df.show().await?; Ok(()) } ``` -## Distributed DataFrame Example +### Distributed DataFrame Example ```bash cargo run --release --example remote-dataframe ``` -### Source code for distributed DataFrame example +#### Source code for distributed DataFrame example ```rust +use ballista::{extension::SessionConfigExt, prelude::*}; +use datafusion::{ + execution::SessionStateBuilder, + prelude::{col, lit, ParquetReadOptions, SessionConfig, SessionContext}, +}; + +/// This example demonstrates executing a simple query against an Arrow data source (Parquet) and +/// fetching results, using the DataFrame trait #[tokio::main] async fn main() -> Result<()> { - let config = BallistaConfig::builder() - .set("ballista.shuffle.partitions", "4") - .build()?; - let ctx = BallistaContext::remote("localhost", 50050, &config).await?; + let config = SessionConfig::new_with_ballista().with_target_partitions(4); + + let state = SessionStateBuilder::new() + .with_config(config) + .with_default_features() + .build(); + + let ctx = SessionContext::remote_with_state("df://localhost:50050", state).await?; - let filename = "testdata/alltypes_plain.parquet"; + let test_data = test_util::examples_test_data(); + let filename = format!("{test_data}/alltypes_plain.parquet"); - // define the query using the DataFrame trait let df = ctx .read_parquet(filename, ParquetReadOptions::default()) .await? .select_columns(&["id", "bool_col", "timestamp_col"])? .filter(col("id").gt(lit(1)))?; - // print the results df.show().await?; Ok(())