Skip to content

Commit

Permalink
Update root README.md adding example how to use ...
Browse files Browse the repository at this point in the history
and cleanup some of the context.

Relates to apache#1105
  • Loading branch information
milenkovicm committed Nov 13, 2024
1 parent a542608 commit c7bb151
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 68 deletions.
85 changes: 49 additions & 36 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,63 @@
under the License.
-->

# Ballista: Distributed SQL Query Engine, built on Apache Arrow
# Ballista: Making DataFusion Applications Distributed

Ballista is a distributed SQL query engine powered by the Rust implementation of [Apache Arrow][arrow] and
[Apache Arrow DataFusion][datafusion].
Ballista is a library which makes [Apache DataFusion](https://github.com/apache/datafusion) applications distributed.

If you are looking for documentation for a released version of Ballista, please refer to the
[Ballista User Guide][user-guide].
Existing DataFusion application:

## Overview
```rust
use datafusion::prelude::*;

Ballista implements a similar design to Apache Spark (particularly Spark SQL), but there are some key differences:
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
let ctx = SessionContext::new();

- The choice of Rust as the main execution language avoids the overhead of GC pauses and results in deterministic
processing times.
- Ballista is designed from the ground up to use columnar data, enabling a number of efficiencies such as vectorized
processing (SIMD) and efficient compression. Although Spark does have some columnar support, it is still
largely row-based today.
- The combination of Rust and Arrow provides excellent memory efficiency and memory usage can be 5x - 10x lower than
Apache Spark in some cases, which means that more processing can fit on a single node, reducing the overhead of
distributed compute.
- The use of Apache Arrow as the memory model and network protocol means that data can be exchanged efficiently between
executors using the [Flight Protocol][flight], and between clients and schedulers/executors using the
[Flight SQL Protocol][flight-sql]
// register the table
ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?;

// create a plan to run a SQL query
let df = ctx.sql("SELECT a, MIN(b) FROM example WHERE a <= b GROUP BY a LIMIT 100").await?;

// execute and print results
df.show().await?;
Ok(())
}
```

can be distributed with few lines of code changed:

```rust
// add ballista imports
use ballista::prelude::*;
use datafusion::prelude::*;


#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
// create DataFusion SessionContext with ballista standalone cluster started
let ctx = SessionContext::standalone();

// register the table
ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?;

// create a plan to run a SQL query
let df = ctx.sql("SELECT a, MIN(b) FROM example WHERE a <= b GROUP BY a LIMIT 100").await?;

// execute and print results
df.show().await?;
Ok(())
}
```

If you are looking for documentation or more examples, please refer to the [Ballista User Guide][user-guide].

## Architecture

A Ballista cluster consists of one or more scheduler processes and one or more executor processes. These processes
can be run as native binaries and are also available as Docker Images, which can be easily deployed with
[Docker Compose](https://datafusion.apache.org/ballista/user-guide/deployment/docker-compose.html) or
[Kubernetes](https://datafusion.apache.org/ballista/user-guide/deployment/kubernetes.html).
[Docker Compose](https://datafusion.apache.org/ballista/user-guide/deployment/docker-compose.html).

The following diagram shows the interaction between clients and the scheduler for submitting jobs, and the interaction
between the executor(s) and the scheduler for fetching tasks and reporting task status.
Expand All @@ -55,15 +82,6 @@ between the executor(s) and the scheduler for fetching tasks and reporting task

See the [architecture guide](docs/source/contributors-guide/architecture.md) for more details.

## Features

- Supports cloud object stores. S3 is supported today and GCS and Azure support is planned.
- DataFrame and SQL APIs available from Python and Rust.
- Clients can connect to a Ballista cluster using [Flight SQL][flight-sql].
- JDBC support via Arrow Flight SQL JDBC Driver
- Scheduler REST UI for monitoring query progress and viewing query plans and metrics.
- Support for Docker, Docker Compose, and Kubernetes deployment, as well as manual deployment on bare metal.

## Performance

We run some simple benchmarks comparing Ballista with Apache Spark to track progress with performance optimizations.
Expand All @@ -81,19 +99,14 @@ that, refer to the [Getting Started Guide](ballista/client/README.md).

## Project Status

Ballista supports a wide range of SQL, including CTEs, Joins, and Subqueries and can execute complex queries at scale.
Ballista supports a wide range of SQL, including CTEs, Joins, and subqueries and can execute complex queries at scale,
but still there is a gap between DataFusion and Ballista which we want to bridge in near future.

Refer to the [DataFusion SQL Reference](https://datafusion.apache.org/user-guide/sql/index.html) for more
information on supported SQL.

Ballista is maturing quickly and is now working towards being production ready. See the [roadmap](ROADMAP.md) for more details.

## Contribution Guide

Please see the [Contribution Guide](CONTRIBUTING.md) for information about contributing to Ballista.

[arrow]: https://arrow.apache.org/
[datafusion]: https://github.com/apache/arrow-datafusion
[flight]: https://arrow.apache.org/blog/2019/10/13/introducing-arrow-flight/
[flight-sql]: https://arrow.apache.org/blog/2022/02/16/introducing-arrow-flight-sql/
[user-guide]: https://datafusion.apache.org/ballista/
100 changes: 68 additions & 32 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

This directory contains examples for executing distributed queries with Ballista.

# Standalone Examples
## Standalone Examples

The standalone example is the easiest to get started with. Ballista supports a standalone mode where a scheduler
and executor are started in-process.
Expand All @@ -33,18 +33,35 @@ cargo run --example standalone_sql --features="ballista/standalone"
### Source code for standalone SQL example

```rust
use ballista::{
extension::SessionConfigExt,
prelude::*
};
use datafusion::{
execution::{options::ParquetReadOptions, SessionStateBuilder},
prelude::{SessionConfig, SessionContext},
};

#[tokio::main]
async fn main() -> Result<()> {
let config = BallistaConfig::builder()
.set("ballista.shuffle.partitions", "1")
.build()?;
let config = SessionConfig::new_with_ballista()
.with_target_partitions(1)
.with_ballista_standalone_parallelism(2);

let ctx = BallistaContext::standalone(&config, 2).await?;
let state = SessionStateBuilder::new()
.with_config(config)
.with_default_features()
.build();

ctx.register_csv(
let ctx = SessionContext::standalone_with_state(state).await?;

let test_data = test_util::examples_test_data();

// register parquet file with the execution context
ctx.register_parquet(
"test",
"testdata/aggregate_test_100.csv",
CsvReadOptions::new(),
&format!("{test_data}/alltypes_plain.parquet"),
ParquetReadOptions::default(),
)
.await?;

Expand All @@ -56,12 +73,12 @@ async fn main() -> Result<()> {

```

# Distributed Examples
## Distributed Examples

For background information on the Ballista architecture, refer to
the [Ballista README](../ballista/client/README.md).

## Start a standalone cluster
### Start a standalone cluster

From the root of the project, build release binaries.

Expand All @@ -83,40 +100,49 @@ RUST_LOG=info ./target/release/ballista-executor -c 2 -p 50051
RUST_LOG=info ./target/release/ballista-executor -c 2 -p 50052
```

## Running the examples
### Running the examples

The examples can be run using the `cargo run --bin` syntax.

## Distributed SQL Example
### Distributed SQL Example

```bash
cargo run --release --example remote-sql
```

### Source code for distributed SQL example
#### Source code for distributed SQL example

```rust
use ballista::prelude::*;
use datafusion::prelude::CsvReadOptions;
use ballista::{extension::SessionConfigExt, prelude::*};
use datafusion::{
execution::SessionStateBuilder,
prelude::{CsvReadOptions, SessionConfig, SessionContext},
};

/// This example demonstrates executing a simple query against an Arrow data source (CSV) and
/// fetching results, using SQL
#[tokio::main]
async fn main() -> Result<()> {
let config = BallistaConfig::builder()
.set("ballista.shuffle.partitions", "4")
.build()?;
let ctx = BallistaContext::remote("localhost", 50050, &config).await?;
let config = SessionConfig::new_with_ballista()
.with_target_partitions(4)
.with_ballista_job_name("Remote SQL Example");

let state = SessionStateBuilder::new()
.with_config(config)
.with_default_features()
.build();

let ctx = SessionContext::remote_with_state("df://localhost:50050", state).await?;

let test_data = test_util::examples_test_data();

// register csv file with the execution context
ctx.register_csv(
"test",
"testdata/aggregate_test_100.csv",
&format!("{test_data}/aggregate_test_100.csv"),
CsvReadOptions::new(),
)
.await?;

// execute the query
let df = ctx
.sql(
"SELECT c1, MIN(c12), MAX(c12) \
Expand All @@ -126,39 +152,49 @@ async fn main() -> Result<()> {
)
.await?;

// print the results
df.show().await?;

Ok(())
}
```

## Distributed DataFrame Example
### Distributed DataFrame Example

```bash
cargo run --release --example remote-dataframe
```

### Source code for distributed DataFrame example
#### Source code for distributed DataFrame example

```rust
use ballista::{extension::SessionConfigExt, prelude::*};
use datafusion::{
execution::SessionStateBuilder,
prelude::{col, lit, ParquetReadOptions, SessionConfig, SessionContext},
};

/// This example demonstrates executing a simple query against an Arrow data source (Parquet) and
/// fetching results, using the DataFrame trait
#[tokio::main]
async fn main() -> Result<()> {
let config = BallistaConfig::builder()
.set("ballista.shuffle.partitions", "4")
.build()?;
let ctx = BallistaContext::remote("localhost", 50050, &config).await?;
let config = SessionConfig::new_with_ballista().with_target_partitions(4);

let state = SessionStateBuilder::new()
.with_config(config)
.with_default_features()
.build();

let ctx = SessionContext::remote_with_state("df://localhost:50050", state).await?;

let filename = "testdata/alltypes_plain.parquet";
let test_data = test_util::examples_test_data();
let filename = format!("{test_data}/alltypes_plain.parquet");

// define the query using the DataFrame trait
let df = ctx
.read_parquet(filename, ParquetReadOptions::default())
.await?
.select_columns(&["id", "bool_col", "timestamp_col"])?
.filter(col("id").gt(lit(1)))?;

// print the results
df.show().await?;

Ok(())
Expand Down

0 comments on commit c7bb151

Please sign in to comment.