Skip to content

Commit

Permalink
Update root README.md adding example how to use ...
Browse files Browse the repository at this point in the history
and cleanup some of the context.

Relates to #1105
  • Loading branch information
milenkovicm committed Nov 14, 2024
1 parent a542608 commit e7ccbb1
Show file tree
Hide file tree
Showing 9 changed files with 304 additions and 164 deletions.
86 changes: 50 additions & 36 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,64 @@
under the License.
-->

# Ballista: Distributed SQL Query Engine, built on Apache Arrow
# Ballista: Making DataFusion Applications Distributed

Ballista is a distributed SQL query engine powered by the Rust implementation of [Apache Arrow][arrow] and
[Apache Arrow DataFusion][datafusion].
Ballista is a library which makes [Apache DataFusion](https://github.com/apache/datafusion) applications distributed.

If you are looking for documentation for a released version of Ballista, please refer to the
[Ballista User Guide][user-guide].
Existing DataFusion application:

## Overview
```rust
use datafusion::prelude::*;

Ballista implements a similar design to Apache Spark (particularly Spark SQL), but there are some key differences:
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
let ctx = SessionContext::new();

- The choice of Rust as the main execution language avoids the overhead of GC pauses and results in deterministic
processing times.
- Ballista is designed from the ground up to use columnar data, enabling a number of efficiencies such as vectorized
processing (SIMD) and efficient compression. Although Spark does have some columnar support, it is still
largely row-based today.
- The combination of Rust and Arrow provides excellent memory efficiency and memory usage can be 5x - 10x lower than
Apache Spark in some cases, which means that more processing can fit on a single node, reducing the overhead of
distributed compute.
- The use of Apache Arrow as the memory model and network protocol means that data can be exchanged efficiently between
executors using the [Flight Protocol][flight], and between clients and schedulers/executors using the
[Flight SQL Protocol][flight-sql]
// register the table
ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?;

// create a plan to run a SQL query
let df = ctx.sql("SELECT a, MIN(b) FROM example WHERE a <= b GROUP BY a LIMIT 100").await?;

// execute and print results
df.show().await?;
Ok(())
}
```

can be distributed with few lines of code changed:

> [!IMPORTANT]
> There is a gap between DataFusion and Ballista, which may bring incompatibilities. The community is working hard to close this gap
```rust
use ballista::prelude::*;
use datafusion::prelude::*;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
// create DataFusion SessionContext with ballista standalone cluster started
let ctx = SessionContext::standalone();

// register the table
ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?;

// create a plan to run a SQL query
let df = ctx.sql("SELECT a, MIN(b) FROM example WHERE a <= b GROUP BY a LIMIT 100").await?;

// execute and print results
df.show().await?;
Ok(())
}
```

If you are looking for documentation or more examples, please refer to the [Ballista User Guide][user-guide].

## Architecture

A Ballista cluster consists of one or more scheduler processes and one or more executor processes. These processes
can be run as native binaries and are also available as Docker Images, which can be easily deployed with
[Docker Compose](https://datafusion.apache.org/ballista/user-guide/deployment/docker-compose.html) or
[Kubernetes](https://datafusion.apache.org/ballista/user-guide/deployment/kubernetes.html).
[Docker Compose](https://datafusion.apache.org/ballista/user-guide/deployment/docker-compose.html).

The following diagram shows the interaction between clients and the scheduler for submitting jobs, and the interaction
between the executor(s) and the scheduler for fetching tasks and reporting task status.
Expand All @@ -55,15 +83,6 @@ between the executor(s) and the scheduler for fetching tasks and reporting task

See the [architecture guide](docs/source/contributors-guide/architecture.md) for more details.

## Features

- Supports cloud object stores. S3 is supported today and GCS and Azure support is planned.
- DataFrame and SQL APIs available from Python and Rust.
- Clients can connect to a Ballista cluster using [Flight SQL][flight-sql].
- JDBC support via Arrow Flight SQL JDBC Driver
- Scheduler REST UI for monitoring query progress and viewing query plans and metrics.
- Support for Docker, Docker Compose, and Kubernetes deployment, as well as manual deployment on bare metal.

## Performance

We run some simple benchmarks comparing Ballista with Apache Spark to track progress with performance optimizations.
Expand All @@ -81,19 +100,14 @@ that, refer to the [Getting Started Guide](ballista/client/README.md).

## Project Status

Ballista supports a wide range of SQL, including CTEs, Joins, and Subqueries and can execute complex queries at scale.
Ballista supports a wide range of SQL, including CTEs, Joins, and subqueries and can execute complex queries at scale,
but still there is a gap between DataFusion and Ballista which we want to bridge in near future.

Refer to the [DataFusion SQL Reference](https://datafusion.apache.org/user-guide/sql/index.html) for more
information on supported SQL.

Ballista is maturing quickly and is now working towards being production ready. See the [roadmap](ROADMAP.md) for more details.

## Contribution Guide

Please see the [Contribution Guide](CONTRIBUTING.md) for information about contributing to Ballista.

[arrow]: https://arrow.apache.org/
[datafusion]: https://github.com/apache/arrow-datafusion
[flight]: https://arrow.apache.org/blog/2019/10/13/introducing-arrow-flight/
[flight-sql]: https://arrow.apache.org/blog/2022/02/16/introducing-arrow-flight-sql/
[user-guide]: https://datafusion.apache.org/ballista/
92 changes: 60 additions & 32 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,74 @@

# Configuration

## BallistaContext Configuration Settings
## Ballista Configuration Settings

Ballista has a number of configuration settings that can be specified when creating a BallistaContext.
Configuring Ballista is like configuring DataFusion. Apart for a few specific Ballista specific configurations all others are the same like DataFusion.

_Example: Specifying configuration options when creating a context_

```rust
let config = BallistaConfig::builder()
.set("ballista.shuffle.partitions", "200")
.set("ballista.batch.size", "16384")
.build() ?;
use ballista::extension::{SessionConfigExt, SessionContextExt};

let ctx = BallistaContext::remote("localhost", 50050, & config).await?;
let session_config = SessionConfig::new_with_ballista()
.with_information_schema(true)
.with_ballista_job_name("Super Cool Ballista App");

let state = SessionStateBuilder::new()
.with_default_features()
.with_config(session_config)
.build();

let ctx: SessionContext = SessionContext::remote_with_state(&url,state).await?;
```

`SessionConfig::new_with_ballista()` will setup `SessionConfig` for use with ballista. This is not required, `SessionConfig::new` could be used, but it's advised as it will set up some sensible configuration defaults .

`SessionConfigExt` expose set of `SessionConfigExt::with_ballista_` and `SessionConfigExt::ballista_` methods which can tune retrieve ballista specific options.

Notable `SessionConfigExt` configuration methods would be:

```rust
/// Overrides ballista's [LogicalExtensionCodec]
fn with_ballista_logical_extension_codec(
self,
codec: Arc<dyn LogicalExtensionCodec>,
) -> SessionConfig;

/// Overrides ballista's [PhysicalExtensionCodec]
fn with_ballista_physical_extension_codec(
self,
codec: Arc<dyn PhysicalExtensionCodec>,
) -> SessionConfig;

/// Overrides ballista's [QueryPlanner]
fn with_ballista_query_planner(
self,
planner: Arc<dyn QueryPlanner + Send + Sync + 'static>,
) -> SessionConfig;
```

### Ballista Configuration Settings

| key | type | default | description |
| --------------------------------- | ------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| ballista.job.name | Utf8 | N/A | Sets the job name that will appear in the web user interface for any submitted jobs. |
| ballista.shuffle.partitions | UInt16 | 16 | Sets the default number of partitions to create when repartitioning query stages. |
| ballista.batch.size | UInt16 | 8192 | Sets the default batch size. |
| ballista.repartition.joins | Boolean | true | When set to true, Ballista will repartition data using the join keys to execute joins in parallel using the provided `ballista.shuffle.partitions` level. |
| ballista.repartition.aggregations | Boolean | true | When set to true, Ballista will repartition data using the aggregate keys to execute aggregates in parallel using the provided `ballista.shuffle.partitions` level. |
| ballista.repartition.windows | Boolean | true | When set to true, Ballista will repartition data using the partition keys to execute window functions in parallel using the provided `ballista.shuffle.partitions` level. |
| ballista.parquet.pruning | Boolean | true | Determines whether Parquet pruning should be enabled or not. |
| ballista.with_information_schema | Boolean | true | Determines whether the `information_schema` should be created in the context. This is necessary for supporting DDL commands such as `SHOW TABLES`. |

### DataFusion Configuration Settings

In addition to Ballista-specific configuration settings, the following DataFusion settings can also be specified.

| key | type | default | description |
| ----------------------------------------------- | ------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| datafusion.execution.coalesce_batches | Boolean | true | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting 'datafusion.execution.coalesce_target_batch_size'. |
| datafusion.execution.coalesce_target_batch_size | UInt64 | 4096 | Target batch size when coalescing batches. Uses in conjunction with the configuration setting 'datafusion.execution.coalesce_batches'. |
| datafusion.explain.logical_plan_only | Boolean | false | When set to true, the explain statement will only print logical plans. |
| datafusion.explain.physical_plan_only | Boolean | false | When set to true, the explain statement will only print physical plans. |
| datafusion.optimizer.filter_null_join_keys | Boolean | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. |
| datafusion.optimizer.skip_failed_rules | Boolean | true | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail. |
which could be used to change default ballista behavior.

If information schema is enabled all configuration parameters could be retrieved or set using SQL;

```rust
let ctx: SessionContext = SessionContext::remote_with_state(&url, state).await?;

let result = ctx
.sql("select name, value from information_schema.df_settings where name like 'ballista'")
.await?
.collect()
.await?;

let expected = [
"+-------------------+-------------------------+",
"| name | value |",
"+-------------------+-------------------------+",
"| ballista.job.name | Super Cool Ballista App |",
"+-------------------+-------------------------+",
];
```

## Ballista Scheduler Configuration Settings

Expand Down
60 changes: 34 additions & 26 deletions docs/source/user-guide/deployment/quick-start.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@

# Ballista Quickstart

A simple way to start a local cluster for testing purposes is to use cargo to build the project and then run the scheduler and executor binaries directly along with the Ballista UI.
A simple way to start a local cluster for testing purposes is to use cargo to build the project and then run the scheduler and executor binaries directly.

Project Requirements:

- [Rust](https://www.rust-lang.org/tools/install)
- [Protobuf Compiler](https://protobuf.dev/downloads/)
- [Node.js](https://nodejs.org/en/download)
- [Yarn](https://classic.yarnpkg.com/lang/en/docs/install)

### Build the project
## Build the project

From the root of the project, build release binaries.

Expand All @@ -55,39 +53,47 @@ RUST_LOG=info ./target/release/ballista-executor -c 2 -p 50052

The examples can be run using the `cargo run --bin` syntax. Open a new terminal session and run the following commands.

## Running the examples

## Distributed SQL Example
### Distributed SQL Example

```bash
cd examples
cargo run --release --example remote-sql
```

### Source code for distributed SQL example
#### Source code for distributed SQL example

```rust
use ballista::prelude::*;
use datafusion::prelude::CsvReadOptions;
use ballista_examples::test_util;
use datafusion::{
execution::SessionStateBuilder,
prelude::{CsvReadOptions, SessionConfig, SessionContext},
};

/// This example demonstrates executing a simple query against an Arrow data source (CSV) and
/// fetching results, using SQL
#[tokio::main]
async fn main() -> Result<()> {
let config = BallistaConfig::builder()
.set("ballista.shuffle.partitions", "4")
.build()?;
let ctx = BallistaContext::remote("localhost", 50050, &config).await?;
let config = SessionConfig::new_with_ballista()
.with_target_partitions(4)
.with_ballista_job_name("Remote SQL Example");

let state = SessionStateBuilder::new()
.with_config(config)
.with_default_features()
.build();

let ctx = SessionContext::remote_with_state("df://localhost:50050", state).await?;

let test_data = test_util::examples_test_data();

// register csv file with the execution context
ctx.register_csv(
"test",
"testdata/aggregate_test_100.csv",
&format!("{test_data}/aggregate_test_100.csv"),
CsvReadOptions::new(),
)
.await?;

// execute the query
let df = ctx
.sql(
"SELECT c1, MIN(c12), MAX(c12) \
Expand All @@ -97,40 +103,42 @@ async fn main() -> Result<()> {
)
.await?;

// print the results
df.show().await?;

Ok(())
}
```

## Distributed DataFrame Example
### Distributed DataFrame Example

```bash
cd examples
cargo run --release --example remote-dataframe
```

### Source code for distributed DataFrame example
#### Source code for distributed DataFrame example

```rust
use ballista::prelude::*;
use ballista_examples::test_util;
use datafusion::{
prelude::{col, lit, ParquetReadOptions, SessionContext},
};

#[tokio::main]
async fn main() -> Result<()> {
let config = BallistaConfig::builder()
.set("ballista.shuffle.partitions", "4")
.build()?;
let ctx = BallistaContext::remote("localhost", 50050, &config).await?;
// creating SessionContext with default settings
let ctx = SessionContext::remote("df://localhost:50050".await?;

let filename = "testdata/alltypes_plain.parquet";
let test_data = test_util::examples_test_data();
let filename = format!("{test_data}/alltypes_plain.parquet");

// define the query using the DataFrame trait
let df = ctx
.read_parquet(filename, ParquetReadOptions::default())
.await?
.select_columns(&["id", "bool_col", "timestamp_col"])?
.filter(col("id").gt(lit(1)))?;

// print the results
df.show().await?;

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ DataFusion is a library for executing queries in-process using the Apache Arrow
model and computational kernels. It is designed to run within a single process, using threads
for parallel query execution.

Ballista is a distributed compute platform built on DataFusion.
Ballista is a distributed compute platform to DataFusion workloads.
2 changes: 2 additions & 0 deletions docs/source/user-guide/flightsql.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

One of the easiest ways to start with Ballista is to plug it into your existing data infrastructure using support for Arrow Flight SQL JDBC.

> This is optional scheduler feature which should be enabled with `flight-sql` feature
Getting started involves these main steps:

1. [Installing prerequisites](#prereq)
Expand Down
2 changes: 2 additions & 0 deletions docs/source/user-guide/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

## Prometheus

> This is optional scheduler feature which should be enabled with `prometheus-metrics` feature
Built with default features, the ballista scheduler will automatically collect and expose a standard set of prometheus metrics.
The metrics currently collected automatically include:

Expand Down
Loading

0 comments on commit e7ccbb1

Please sign in to comment.