Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update root README.md and other documentation with latest changes #1113

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 51 additions & 37 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,53 +17,72 @@
under the License.
-->

# Ballista: Distributed SQL Query Engine, built on Apache Arrow
# Ballista: Making DataFusion Applications Distributed

Ballista is a distributed SQL query engine powered by the Rust implementation of [Apache Arrow][arrow] and
[Apache Arrow DataFusion][datafusion].
Ballista is a library which makes [Apache DataFusion](https://github.com/apache/datafusion) applications distributed.
milenkovicm marked this conversation as resolved.
Show resolved Hide resolved

If you are looking for documentation for a released version of Ballista, please refer to the
[Ballista User Guide][user-guide].
Existing DataFusion application:

## Overview
```rust
use datafusion::prelude::*;

Ballista implements a similar design to Apache Spark (particularly Spark SQL), but there are some key differences:
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
let ctx = SessionContext::new();

- The choice of Rust as the main execution language avoids the overhead of GC pauses and results in deterministic
processing times.
- Ballista is designed from the ground up to use columnar data, enabling a number of efficiencies such as vectorized
processing (SIMD) and efficient compression. Although Spark does have some columnar support, it is still
largely row-based today.
- The combination of Rust and Arrow provides excellent memory efficiency and memory usage can be 5x - 10x lower than
Apache Spark in some cases, which means that more processing can fit on a single node, reducing the overhead of
distributed compute.
- The use of Apache Arrow as the memory model and network protocol means that data can be exchanged efficiently between
executors using the [Flight Protocol][flight], and between clients and schedulers/executors using the
[Flight SQL Protocol][flight-sql]
// register the table
ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?;

// create a plan to run a SQL query
let df = ctx.sql("SELECT a, MIN(b) FROM example WHERE a <= b GROUP BY a LIMIT 100").await?;

// execute and print results
df.show().await?;
Ok(())
}
```

can be distributed with few lines of code changed:

> [!IMPORTANT]
> There is a gap between DataFusion and Ballista, which may bring incompatibilities. The community is working hard to close this gap

```rust
use ballista::prelude::*;
use datafusion::prelude::*;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
// create DataFusion SessionContext with ballista standalone cluster started
let ctx = SessionContext::standalone();

// register the table
ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?;

// create a plan to run a SQL query
let df = ctx.sql("SELECT a, MIN(b) FROM example WHERE a <= b GROUP BY a LIMIT 100").await?;

// execute and print results
df.show().await?;
Ok(())
}
```

If you are looking for documentation or more examples, please refer to the [Ballista User Guide][user-guide].

## Architecture

A Ballista cluster consists of one or more scheduler processes and one or more executor processes. These processes
can be run as native binaries and are also available as Docker Images, which can be easily deployed with
[Docker Compose](https://datafusion.apache.org/ballista/user-guide/deployment/docker-compose.html) or
[Kubernetes](https://datafusion.apache.org/ballista/user-guide/deployment/kubernetes.html).
milenkovicm marked this conversation as resolved.
Show resolved Hide resolved
[Docker Compose](https://datafusion.apache.org/ballista/user-guide/deployment/docker-compose.html).

The following diagram shows the interaction between clients and the scheduler for submitting jobs, and the interaction
between the executor(s) and the scheduler for fetching tasks and reporting task status.

![Ballista Cluster Diagram](docs/source/contributors-guide/ballista.drawio.png)
![Ballista Cluster Diagram](docs/source/contributors-guide/ballista_architecture.excalidraw.svg)

See the [architecture guide](docs/source/contributors-guide/architecture.md) for more details.

## Features

- Supports cloud object stores. S3 is supported today and GCS and Azure support is planned.
- DataFrame and SQL APIs available from Python and Rust.
- Clients can connect to a Ballista cluster using [Flight SQL][flight-sql].
- JDBC support via Arrow Flight SQL JDBC Driver
- Scheduler REST UI for monitoring query progress and viewing query plans and metrics.
- Support for Docker, Docker Compose, and Kubernetes deployment, as well as manual deployment on bare metal.

## Performance

We run some simple benchmarks comparing Ballista with Apache Spark to track progress with performance optimizations.
Expand All @@ -81,19 +100,14 @@ that, refer to the [Getting Started Guide](ballista/client/README.md).

## Project Status

Ballista supports a wide range of SQL, including CTEs, Joins, and Subqueries and can execute complex queries at scale.
Ballista supports a wide range of SQL, including CTEs, Joins, and subqueries and can execute complex queries at scale,
but still there is a gap between DataFusion and Ballista which we want to bridge in near future.

Refer to the [DataFusion SQL Reference](https://datafusion.apache.org/user-guide/sql/index.html) for more
information on supported SQL.

Ballista is maturing quickly and is now working towards being production ready. See the [roadmap](ROADMAP.md) for more details.

## Contribution Guide

Please see the [Contribution Guide](CONTRIBUTING.md) for information about contributing to Ballista.

[arrow]: https://arrow.apache.org/
[datafusion]: https://github.com/apache/arrow-datafusion
[flight]: https://arrow.apache.org/blog/2019/10/13/introducing-arrow-flight/
[flight-sql]: https://arrow.apache.org/blog/2022/02/16/introducing-arrow-flight-sql/
[user-guide]: https://datafusion.apache.org/ballista/
2 changes: 1 addition & 1 deletion docs/source/contributors-guide/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ can be run as native binaries and are also available as Docker Images, which can
The following diagram shows the interaction between clients and the scheduler for submitting jobs, and the interaction
between the executor(s) and the scheduler for fetching tasks and reporting task status.

![Ballista Cluster Diagram](ballista.drawio.png)
![Ballista Cluster Diagram](ballista_architecture.excalidraw.svg)

### Scheduler

Expand Down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still have a rest implementation that works? I was testing out the other day and couln't get a response from the REST endpoint. I may have misconfigured the cluster on AWS, though. Was able to run jobs against it, just not use the REST API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's a optional feature (rest-api) of scheduler crate and its disabled by default. should work when enabled, otherwise its a bug and should be fixed

milenkovicm marked this conversation as resolved.
Show resolved Hide resolved
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
92 changes: 60 additions & 32 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,74 @@

# Configuration

## BallistaContext Configuration Settings
## Ballista Configuration Settings

Ballista has a number of configuration settings that can be specified when creating a BallistaContext.
Configuring Ballista is like configuring DataFusion. Apart for a few specific Ballista specific configurations all others are the same like DataFusion.
milenkovicm marked this conversation as resolved.
Show resolved Hide resolved

_Example: Specifying configuration options when creating a context_

```rust
let config = BallistaConfig::builder()
.set("ballista.shuffle.partitions", "200")
.set("ballista.batch.size", "16384")
.build() ?;
use ballista::extension::{SessionConfigExt, SessionContextExt};

let ctx = BallistaContext::remote("localhost", 50050, & config).await?;
let session_config = SessionConfig::new_with_ballista()
.with_information_schema(true)
.with_ballista_job_name("Super Cool Ballista App");

let state = SessionStateBuilder::new()
.with_default_features()
.with_config(session_config)
.build();

let ctx: SessionContext = SessionContext::remote_with_state(&url,state).await?;
```

`SessionConfig::new_with_ballista()` will setup `SessionConfig` for use with ballista. This is not required, `SessionConfig::new` could be used, but it's advised as it will set up some sensible configuration defaults .

`SessionConfigExt` expose set of `SessionConfigExt::with_ballista_` and `SessionConfigExt::ballista_` methods which can tune retrieve ballista specific options.

Notable `SessionConfigExt` configuration methods would be:

```rust
/// Overrides ballista's [LogicalExtensionCodec]
fn with_ballista_logical_extension_codec(
self,
codec: Arc<dyn LogicalExtensionCodec>,
) -> SessionConfig;

/// Overrides ballista's [PhysicalExtensionCodec]
fn with_ballista_physical_extension_codec(
self,
codec: Arc<dyn PhysicalExtensionCodec>,
) -> SessionConfig;

/// Overrides ballista's [QueryPlanner]
fn with_ballista_query_planner(
self,
planner: Arc<dyn QueryPlanner + Send + Sync + 'static>,
) -> SessionConfig;
```

### Ballista Configuration Settings

| key | type | default | description |
| --------------------------------- | ------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| ballista.job.name | Utf8 | N/A | Sets the job name that will appear in the web user interface for any submitted jobs. |
| ballista.shuffle.partitions | UInt16 | 16 | Sets the default number of partitions to create when repartitioning query stages. |
| ballista.batch.size | UInt16 | 8192 | Sets the default batch size. |
| ballista.repartition.joins | Boolean | true | When set to true, Ballista will repartition data using the join keys to execute joins in parallel using the provided `ballista.shuffle.partitions` level. |
| ballista.repartition.aggregations | Boolean | true | When set to true, Ballista will repartition data using the aggregate keys to execute aggregates in parallel using the provided `ballista.shuffle.partitions` level. |
| ballista.repartition.windows | Boolean | true | When set to true, Ballista will repartition data using the partition keys to execute window functions in parallel using the provided `ballista.shuffle.partitions` level. |
| ballista.parquet.pruning | Boolean | true | Determines whether Parquet pruning should be enabled or not. |
| ballista.with_information_schema | Boolean | true | Determines whether the `information_schema` should be created in the context. This is necessary for supporting DDL commands such as `SHOW TABLES`. |

### DataFusion Configuration Settings

In addition to Ballista-specific configuration settings, the following DataFusion settings can also be specified.

| key | type | default | description |
| ----------------------------------------------- | ------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| datafusion.execution.coalesce_batches | Boolean | true | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting 'datafusion.execution.coalesce_target_batch_size'. |
| datafusion.execution.coalesce_target_batch_size | UInt64 | 4096 | Target batch size when coalescing batches. Uses in conjunction with the configuration setting 'datafusion.execution.coalesce_batches'. |
| datafusion.explain.logical_plan_only | Boolean | false | When set to true, the explain statement will only print logical plans. |
| datafusion.explain.physical_plan_only | Boolean | false | When set to true, the explain statement will only print physical plans. |
| datafusion.optimizer.filter_null_join_keys | Boolean | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. |
| datafusion.optimizer.skip_failed_rules | Boolean | true | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail. |
which could be used to change default ballista behavior.

If information schema is enabled all configuration parameters could be retrieved or set using SQL;

```rust
let ctx: SessionContext = SessionContext::remote_with_state(&url, state).await?;

let result = ctx
.sql("select name, value from information_schema.df_settings where name like 'ballista'")
.await?
.collect()
.await?;

let expected = [
"+-------------------+-------------------------+",
"| name | value |",
"+-------------------+-------------------------+",
"| ballista.job.name | Super Cool Ballista App |",
"+-------------------+-------------------------+",
];
```

## Ballista Scheduler Configuration Settings

Expand Down
60 changes: 34 additions & 26 deletions docs/source/user-guide/deployment/quick-start.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@

# Ballista Quickstart

A simple way to start a local cluster for testing purposes is to use cargo to build the project and then run the scheduler and executor binaries directly along with the Ballista UI.
A simple way to start a local cluster for testing purposes is to use cargo to build the project and then run the scheduler and executor binaries directly.

Project Requirements:

- [Rust](https://www.rust-lang.org/tools/install)
- [Protobuf Compiler](https://protobuf.dev/downloads/)
- [Node.js](https://nodejs.org/en/download)
- [Yarn](https://classic.yarnpkg.com/lang/en/docs/install)

### Build the project
## Build the project

From the root of the project, build release binaries.

Expand All @@ -55,39 +53,47 @@ RUST_LOG=info ./target/release/ballista-executor -c 2 -p 50052

The examples can be run using the `cargo run --bin` syntax. Open a new terminal session and run the following commands.

## Running the examples

## Distributed SQL Example
### Distributed SQL Example

```bash
cd examples
cargo run --release --example remote-sql
```

### Source code for distributed SQL example
#### Source code for distributed SQL example

```rust
use ballista::prelude::*;
use datafusion::prelude::CsvReadOptions;
use ballista_examples::test_util;
use datafusion::{
execution::SessionStateBuilder,
prelude::{CsvReadOptions, SessionConfig, SessionContext},
};

/// This example demonstrates executing a simple query against an Arrow data source (CSV) and
/// fetching results, using SQL
#[tokio::main]
async fn main() -> Result<()> {
let config = BallistaConfig::builder()
.set("ballista.shuffle.partitions", "4")
.build()?;
let ctx = BallistaContext::remote("localhost", 50050, &config).await?;
let config = SessionConfig::new_with_ballista()
.with_target_partitions(4)
.with_ballista_job_name("Remote SQL Example");

let state = SessionStateBuilder::new()
.with_config(config)
.with_default_features()
.build();

let ctx = SessionContext::remote_with_state("df://localhost:50050", state).await?;

let test_data = test_util::examples_test_data();

// register csv file with the execution context
ctx.register_csv(
"test",
"testdata/aggregate_test_100.csv",
&format!("{test_data}/aggregate_test_100.csv"),
CsvReadOptions::new(),
)
.await?;

// execute the query
let df = ctx
.sql(
"SELECT c1, MIN(c12), MAX(c12) \
Expand All @@ -97,40 +103,42 @@ async fn main() -> Result<()> {
)
.await?;

// print the results
df.show().await?;

Ok(())
}
```

## Distributed DataFrame Example
### Distributed DataFrame Example

```bash
cd examples
cargo run --release --example remote-dataframe
```

### Source code for distributed DataFrame example
#### Source code for distributed DataFrame example

```rust
use ballista::prelude::*;
use ballista_examples::test_util;
use datafusion::{
prelude::{col, lit, ParquetReadOptions, SessionContext},
};

#[tokio::main]
async fn main() -> Result<()> {
let config = BallistaConfig::builder()
.set("ballista.shuffle.partitions", "4")
.build()?;
let ctx = BallistaContext::remote("localhost", 50050, &config).await?;
// creating SessionContext with default settings
let ctx = SessionContext::remote("df://localhost:50050".await?;

let filename = "testdata/alltypes_plain.parquet";
let test_data = test_util::examples_test_data();
let filename = format!("{test_data}/alltypes_plain.parquet");

// define the query using the DataFrame trait
let df = ctx
.read_parquet(filename, ParquetReadOptions::default())
.await?
.select_columns(&["id", "bool_col", "timestamp_col"])?
.filter(col("id").gt(lit(1)))?;

// print the results
df.show().await?;

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ DataFusion is a library for executing queries in-process using the Apache Arrow
model and computational kernels. It is designed to run within a single process, using threads
for parallel query execution.

Ballista is a distributed compute platform built on DataFusion.
Ballista is a distributed compute platform to DataFusion workloads.
milenkovicm marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 2 additions & 0 deletions docs/source/user-guide/flightsql.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

One of the easiest ways to start with Ballista is to plug it into your existing data infrastructure using support for Arrow Flight SQL JDBC.

> This is optional scheduler feature which should be enabled with `flight-sql` feature

Getting started involves these main steps:

1. [Installing prerequisites](#prereq)
Expand Down
2 changes: 2 additions & 0 deletions docs/source/user-guide/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

## Prometheus

> This is optional scheduler feature which should be enabled with `prometheus-metrics` feature

Built with default features, the ballista scheduler will automatically collect and expose a standard set of prometheus metrics.
The metrics currently collected automatically include:

Expand Down
Loading
Loading