Skip to content

Commit

Permalink
Deprecate BallistaContext (#1103)
Browse files Browse the repository at this point in the history
  • Loading branch information
milenkovicm authored Oct 31, 2024
1 parent 2b86fff commit 19d829f
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 84 deletions.
5 changes: 3 additions & 2 deletions ballista-cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::Instant;

use ballista::prelude::{BallistaContext, BallistaError, Result};
use ballista::prelude::{BallistaError, Result};

use datafusion::arrow::array::{ArrayRef, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::prelude::SessionContext;

use crate::functions::{display_all_functions, Function};
use crate::print_format::PrintFormat;
Expand All @@ -51,7 +52,7 @@ pub enum OutputFormat {
impl Command {
pub async fn execute(
&self,
ctx: &BallistaContext,
ctx: &SessionContext,
print_options: &mut PrintOptions,
) -> Result<()> {
let now = Instant::now();
Expand Down
18 changes: 7 additions & 11 deletions ballista-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use std::io::BufReader;
use std::sync::Arc;
use std::time::Instant;

use ballista::prelude::{BallistaContext, Result};
use ballista::prelude::Result;
use datafusion::prelude::SessionContext;
use rustyline::error::ReadlineError;
use rustyline::Editor;

Expand All @@ -35,7 +36,7 @@ use crate::{

/// run and execute SQL statements and commands from a file, against a context with the given print options
pub async fn exec_from_lines(
ctx: &BallistaContext,
ctx: &SessionContext,
reader: &mut BufReader<File>,
print_options: &PrintOptions,
) {
Expand Down Expand Up @@ -80,7 +81,7 @@ pub async fn exec_from_lines(

pub async fn exec_from_files(
files: Vec<String>,
ctx: &BallistaContext,
ctx: &SessionContext,
print_options: &PrintOptions,
) {
let files = files
Expand All @@ -94,15 +95,10 @@ pub async fn exec_from_files(
}

/// run and execute SQL statements and commands against a context with the given print options
pub async fn exec_from_repl(ctx: &BallistaContext, print_options: &mut PrintOptions) {
pub async fn exec_from_repl(ctx: &SessionContext, print_options: &mut PrintOptions) {
let mut rl = Editor::new().expect("created editor");
rl.set_helper(Some(CliHelper::new(
&ctx.context()
.task_ctx()
.session_config()
.options()
.sql_parser
.dialect,
&ctx.task_ctx().session_config().options().sql_parser.dialect,
print_options.color,
)));
rl.load_history(".history").ok();
Expand Down Expand Up @@ -171,7 +167,7 @@ pub async fn exec_from_repl(ctx: &BallistaContext, print_options: &mut PrintOpti
}

async fn exec_and_print(
ctx: &BallistaContext,
ctx: &SessionContext,
print_options: &PrintOptions,
sql: String,
) -> Result<()> {
Expand Down
46 changes: 33 additions & 13 deletions ballista-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,21 @@
use std::env;
use std::path::Path;

use ballista::prelude::{BallistaConfig, BallistaContext, Result};
use ballista::{
extension::BallistaSessionConfigExt,
prelude::{
Result, SessionContextExt, BALLISTA_DEFAULT_BATCH_SIZE,
BALLISTA_STANDALONE_PARALLELISM, BALLISTA_WITH_INFORMATION_SCHEMA,
},
};
use ballista_cli::{
exec, print_format::PrintFormat, print_options::PrintOptions, BALLISTA_CLI_VERSION,
};
use clap::Parser;
use datafusion::{
execution::SessionStateBuilder,
prelude::{SessionConfig, SessionContext},
};
use datafusion_cli::print_options::MaxRows;
use mimalloc::MiMalloc;

Expand Down Expand Up @@ -108,29 +118,39 @@ pub async fn main() -> Result<()> {
env::set_current_dir(p).unwrap();
};

let mut ballista_config_builder =
BallistaConfig::builder().set("ballista.with_information_schema", "true");
let mut ballista_config = SessionConfig::new_with_ballista()
.set_str(BALLISTA_WITH_INFORMATION_SCHEMA, "true");

if let Some(batch_size) = args.batch_size {
ballista_config_builder =
ballista_config_builder.set("ballista.batch.size", &batch_size.to_string());
ballista_config =
ballista_config.set_str(BALLISTA_DEFAULT_BATCH_SIZE, &batch_size.to_string());
};

let ballista_config = ballista_config_builder.build()?;

let ctx = match (args.host, args.port) {
(Some(ref host), Some(port)) => {
let address = format!("df://{}:{}", host, port);
let state = SessionStateBuilder::new()
.with_config(ballista_config)
.with_default_features()
.build();

// Distributed execution with Ballista Remote
BallistaContext::remote(host, port, &ballista_config).await?
SessionContext::remote_with_state(&address, state).await?
}
_ => {
let concurrent_tasks = if let Some(concurrent_tasks) = args.concurrent_tasks {
concurrent_tasks
} else {
num_cpus::get()
if let Some(concurrent_tasks) = args.concurrent_tasks {
ballista_config = ballista_config.set_str(
BALLISTA_STANDALONE_PARALLELISM,
&concurrent_tasks.to_string(),
);
};
let state = SessionStateBuilder::new()
.with_config(ballista_config)
.with_default_features()
.build();

// In-process execution with Ballista Standalone
BallistaContext::standalone(&ballista_config, concurrent_tasks).await?
SessionContext::standalone_with_state(state).await?
}
};

Expand Down
3 changes: 2 additions & 1 deletion ballista/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

//! Distributed execution context.
#![allow(deprecated)] // TO BE REMOVED

use datafusion::arrow::datatypes::SchemaRef;
use datafusion::execution::context::DataFilePaths;
Expand Down Expand Up @@ -76,7 +77,7 @@ impl BallistaContextState {
}
}

// #[deprecated]
#[deprecated]
pub struct BallistaContext {
state: Arc<Mutex<BallistaContextState>>,
context: Arc<SessionContext>,
Expand Down
4 changes: 2 additions & 2 deletions ballista/client/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub use ballista_core::{
error::{BallistaError, Result},
};

pub use futures::StreamExt;

#[allow(deprecated)] // TO BE REMOVED
pub use crate::context::BallistaContext;
pub use crate::extension::SessionContextExt;
pub use futures::StreamExt;
60 changes: 32 additions & 28 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
//! Benchmark derived from TPC-H. This is not an official TPC-H benchmark.

use arrow_schema::SchemaBuilder;
use ballista::context::BallistaContext;
use ballista::extension::BallistaSessionConfigExt;
use ballista::prelude::{
BallistaConfig, BALLISTA_COLLECT_STATISTICS, BALLISTA_DEFAULT_BATCH_SIZE,
SessionContextExt, BALLISTA_COLLECT_STATISTICS, BALLISTA_DEFAULT_BATCH_SIZE,
BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, BALLISTA_JOB_NAME,
};
use datafusion::arrow::array::*;
Expand All @@ -30,6 +30,7 @@ use datafusion::datasource::listing::ListingTableUrl;
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionState;
use datafusion::execution::SessionStateBuilder;
use datafusion::logical_expr::LogicalPlan;
use datafusion::logical_expr::{expr::Cast, Expr};
use datafusion::parquet::basic::Compression;
Expand Down Expand Up @@ -354,24 +355,27 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {
println!("Running benchmarks with the following options: {opt:?}");
let mut benchmark_run = BenchmarkRun::new(opt.query);

let config = BallistaConfig::builder()
.set(
let config = SessionConfig::new_with_ballista()
.set_str(
BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
&format!("{}", opt.partitions),
)
.set(
.set_str(
BALLISTA_JOB_NAME,
&format!("Query derived from TPC-H q{}", opt.query),
)
.set(BALLISTA_DEFAULT_BATCH_SIZE, &format!("{}", opt.batch_size))
.set(BALLISTA_COLLECT_STATISTICS, "true")
.build()
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;

let ctx =
BallistaContext::remote(opt.host.unwrap().as_str(), opt.port.unwrap(), &config)
.await
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
.set_str(BALLISTA_DEFAULT_BATCH_SIZE, &format!("{}", opt.batch_size))
.set_str(BALLISTA_COLLECT_STATISTICS, "true");
let state = SessionStateBuilder::new()
.with_default_features()
.with_config(config)
.build();
let address = format!(
"df://{}:{}",
opt.host.clone().unwrap().as_str(),
opt.port.unwrap()
);
let ctx = SessionContext::remote_with_state(&address, state).await?;

// register tables with Ballista context
let path = opt.path.to_str().unwrap();
Expand Down Expand Up @@ -454,29 +458,29 @@ fn write_summary_json(benchmark_run: &mut BenchmarkRun, path: &Path) -> Result<(
async fn loadtest_ballista(opt: BallistaLoadtestOpt) -> Result<()> {
println!("Running loadtest_ballista with the following options: {opt:?}");

let config = BallistaConfig::builder()
.set(
let config = SessionConfig::new_with_ballista()
.set_str(
BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
&format!("{}", opt.partitions),
)
.set(BALLISTA_DEFAULT_BATCH_SIZE, &format!("{}", opt.batch_size))
.build()
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
.set_str(BALLISTA_DEFAULT_BATCH_SIZE, &format!("{}", opt.batch_size));

let state = SessionStateBuilder::new()
.with_default_features()
.with_config(config)
.build();

let concurrency = opt.concurrency;
let request_amount = opt.requests;
let mut clients = vec![];

for _num in 0..concurrency {
clients.push(
BallistaContext::remote(
opt.host.clone().unwrap().as_str(),
opt.port.unwrap(),
&config,
)
.await
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?,
let address = format!(
"df://{}:{}",
opt.host.clone().unwrap().as_str(),
opt.port.unwrap()
);
clients.push(SessionContext::remote_with_state(&address, state.clone()).await?);
}

// register tables with Ballista context
Expand Down Expand Up @@ -566,7 +570,7 @@ fn get_query_sql_by_path(query: usize, mut sql_path: String) -> Result<String> {
async fn register_tables(
path: &str,
file_format: &str,
ctx: &BallistaContext,
ctx: &SessionContext,
debug: bool,
) -> Result<()> {
for table in TABLES {
Expand Down
26 changes: 17 additions & 9 deletions examples/examples/remote-dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,36 @@
// specific language governing permissions and limitations
// under the License.

use ballista::prelude::*;
use datafusion::prelude::{col, lit, ParquetReadOptions};
use ballista::{extension::BallistaSessionConfigExt, prelude::*};
use ballista_examples::test_util;
use datafusion::{
execution::SessionStateBuilder,
prelude::{col, lit, ParquetReadOptions, SessionConfig, SessionContext},
};

/// This example demonstrates executing a simple query against an Arrow data source (Parquet) and
/// fetching results, using the DataFrame trait
#[tokio::main]
async fn main() -> Result<()> {
let config = BallistaConfig::builder()
.set("ballista.shuffle.partitions", "4")
.build()?;
let ctx = BallistaContext::remote("localhost", 50050, &config).await?;
let config = SessionConfig::new_with_ballista()
.set_str(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "4");

let filename = "testdata/alltypes_plain.parquet";
let state = SessionStateBuilder::new()
.with_config(config)
.with_default_features()
.build();

let ctx = SessionContext::remote_with_state("df://localhost:50050", state).await?;

let test_data = test_util::examples_test_data();
let filename = format!("{test_data}/alltypes_plain.parquet");

// define the query using the DataFrame trait
let df = ctx
.read_parquet(filename, ParquetReadOptions::default())
.await?
.select_columns(&["id", "bool_col", "timestamp_col"])?
.filter(col("id").gt(lit(1)))?;

// print the results
df.show().await?;

Ok(())
Expand Down
28 changes: 18 additions & 10 deletions examples/examples/remote-sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,36 @@
// specific language governing permissions and limitations
// under the License.

use ballista::prelude::*;
use datafusion::prelude::CsvReadOptions;
use ballista::{extension::BallistaSessionConfigExt, prelude::*};
use ballista_examples::test_util;
use datafusion::{
execution::SessionStateBuilder,
prelude::{CsvReadOptions, SessionConfig, SessionContext},
};

/// This example demonstrates executing a simple query against an Arrow data source (CSV) and
/// fetching results, using SQL
#[tokio::main]
async fn main() -> Result<()> {
let config = BallistaConfig::builder()
.set("ballista.shuffle.partitions", "4")
.build()?;
let ctx = BallistaContext::remote("localhost", 50050, &config).await?;
let config = SessionConfig::new_with_ballista()
.set_str(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "4");

let state = SessionStateBuilder::new()
.with_config(config)
.with_default_features()
.build();

let ctx = SessionContext::remote_with_state("df://localhost:50050", state).await?;

let test_data = test_util::examples_test_data();

// register csv file with the execution context
ctx.register_csv(
"test",
"testdata/aggregate_test_100.csv",
&format!("{test_data}/aggregate_test_100.csv"),
CsvReadOptions::new(),
)
.await?;

// execute the query
let df = ctx
.sql(
"SELECT c1, MIN(c12), MAX(c12) \
Expand All @@ -45,7 +54,6 @@ async fn main() -> Result<()> {
)
.await?;

// print the results
df.show().await?;

Ok(())
Expand Down
Loading

0 comments on commit 19d829f

Please sign in to comment.