Skip to content

Commit

Permalink
Trim down BallistaConfig (apache#1108)
Browse files Browse the repository at this point in the history
* config re-org

* propagate session config

* round of cleanups

* minor cleanup

* propagate session context to tasks

* propagate session config

* scheduler exposes config provider

* remove public references to configs

* remove configuration options

* fix clippy

* add test to show object store registry configuration

* fix tests

* make test not failing due to environment settings

* fix python compile error

* add few more comments on object_store test

* disable testcontainers and standalone

* fix compile issue in flight_sql

* minor cleanup

* minor changes in examples
  • Loading branch information
milenkovicm authored Nov 13, 2024
1 parent 9e8dfb5 commit a542608
Show file tree
Hide file tree
Showing 36 changed files with 1,254 additions and 543 deletions.
18 changes: 6 additions & 12 deletions ballista-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ use std::path::Path;

use ballista::{
extension::SessionConfigExt,
prelude::{
Result, SessionContextExt, BALLISTA_DEFAULT_BATCH_SIZE,
BALLISTA_STANDALONE_PARALLELISM, BALLISTA_WITH_INFORMATION_SCHEMA,
},
prelude::{Result, SessionContextExt},
};
use ballista_cli::{
exec, print_format::PrintFormat, print_options::PrintOptions, BALLISTA_CLI_VERSION,
Expand Down Expand Up @@ -118,12 +115,11 @@ pub async fn main() -> Result<()> {
env::set_current_dir(p).unwrap();
};

let mut ballista_config = SessionConfig::new_with_ballista()
.set_str(BALLISTA_WITH_INFORMATION_SCHEMA, "true");
let mut ballista_config =
SessionConfig::new_with_ballista().with_information_schema(true);

if let Some(batch_size) = args.batch_size {
ballista_config =
ballista_config.set_str(BALLISTA_DEFAULT_BATCH_SIZE, &batch_size.to_string());
ballista_config = ballista_config.with_batch_size(batch_size);
};

let ctx = match (args.host, args.port) {
Expand All @@ -139,10 +135,8 @@ pub async fn main() -> Result<()> {
}
_ => {
if let Some(concurrent_tasks) = args.concurrent_tasks {
ballista_config = ballista_config.set_str(
BALLISTA_STANDALONE_PARALLELISM,
&concurrent_tasks.to_string(),
);
ballista_config =
ballista_config.with_target_partitions(concurrent_tasks);
};
let state = SessionStateBuilder::new()
.with_config(ballista_config)
Expand Down
4 changes: 1 addition & 3 deletions ballista/client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,7 @@ use datafusion::functions_aggregate::{min_max::min, min_max::max, sum::sum, aver
#[tokio::main]
async fn main() -> Result<()> {
// create configuration
let config = BallistaConfig::builder()
.set("ballista.shuffle.partitions", "4")
.build()?;
let config = BallistaConfig::default();
// connect to Ballista scheduler
let ctx = BallistaContext::remote("localhost", 50050, &config).await?;
Expand Down
59 changes: 14 additions & 45 deletions ballista/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use ballista_core::serde::protobuf::scheduler_grpc_client::SchedulerGrpcClient;
use ballista_core::serde::protobuf::{CreateSessionParams, KeyValuePair};
use ballista_core::utils::{
create_df_ctx_with_ballista_query_planner, create_grpc_client_connection,
SessionConfigExt,
};
use datafusion_proto::protobuf::LogicalPlanNode;

Expand Down Expand Up @@ -360,11 +361,8 @@ impl BallistaContext {
let is_show = self.is_show_statement(sql).await?;
// the show tables、 show columns sql can not run at scheduler because the tables is store at client
if is_show {
let state = self.state.lock();
ctx = Arc::new(SessionContext::new_with_config(
SessionConfig::new().with_information_schema(
state.config.default_with_information_schema(),
),
SessionConfig::new_with_ballista(),
));
}

Expand Down Expand Up @@ -485,13 +483,11 @@ impl BallistaContext {
#[cfg(test)]
#[cfg(feature = "standalone")]
mod standalone_tests {
use ballista_core::config::BallistaConfig;
use datafusion::arrow;
use datafusion::arrow::util::pretty::pretty_format_batches;

use crate::context::BallistaContext;
use ballista_core::config::{
BallistaConfigBuilder, BALLISTA_WITH_INFORMATION_SCHEMA,
};
use ballista_core::error::Result;
use datafusion::config::TableParquetOptions;
use datafusion::dataframe::DataFrameWriteOptions;
Expand All @@ -502,7 +498,7 @@ mod standalone_tests {
#[tokio::test]
async fn test_standalone_mode() {
use super::*;
let context = BallistaContext::standalone(&BallistaConfig::new().unwrap(), 1)
let context = BallistaContext::standalone(&BallistaConfig::default(), 1)
.await
.unwrap();
let df = context.sql("SELECT 1;").await.unwrap();
Expand All @@ -512,8 +508,7 @@ mod standalone_tests {
#[tokio::test]
async fn test_write_parquet() -> Result<()> {
use super::*;
let context =
BallistaContext::standalone(&BallistaConfig::new().unwrap(), 1).await?;
let context = BallistaContext::standalone(&BallistaConfig::default(), 1).await?;
let df = context.sql("SELECT 1;").await?;
let tmp_dir = TempDir::new().unwrap();
let file_path = format!(
Expand All @@ -532,8 +527,7 @@ mod standalone_tests {
#[tokio::test]
async fn test_write_csv() -> Result<()> {
use super::*;
let context =
BallistaContext::standalone(&BallistaConfig::new().unwrap(), 1).await?;
let context = BallistaContext::standalone(&BallistaConfig::default(), 1).await?;
let df = context.sql("SELECT 1;").await?;
let tmp_dir = TempDir::new().unwrap();
let file_path =
Expand All @@ -549,7 +543,7 @@ mod standalone_tests {
use std::fs::File;
use std::io::Write;
use tempfile::TempDir;
let context = BallistaContext::standalone(&BallistaConfig::new().unwrap(), 1)
let context = BallistaContext::standalone(&BallistaConfig::default(), 1)
.await
.unwrap();

Expand Down Expand Up @@ -587,18 +581,14 @@ mod standalone_tests {
}

#[tokio::test]
#[ignore = "this one fails after config change (will be removed)"]
async fn test_show_tables_not_with_information_schema() {
use super::*;
use ballista_core::config::{
BallistaConfigBuilder, BALLISTA_WITH_INFORMATION_SCHEMA,
};

use std::fs::File;
use std::io::Write;
use tempfile::TempDir;
let config = BallistaConfigBuilder::default()
.set(BALLISTA_WITH_INFORMATION_SCHEMA, "true")
.build()
.unwrap();
let config = BallistaConfig::default();
let context = BallistaContext::standalone(&config, 1).await.unwrap();

let data = "Jorge,2018-12-13T12:12:10.011Z\n\
Expand Down Expand Up @@ -643,13 +633,7 @@ mod standalone_tests {
ListingOptions, ListingTable, ListingTableConfig,
};

use ballista_core::config::{
BallistaConfigBuilder, BALLISTA_WITH_INFORMATION_SCHEMA,
};
let config = BallistaConfigBuilder::default()
.set(BALLISTA_WITH_INFORMATION_SCHEMA, "true")
.build()
.unwrap();
let config = BallistaConfig::default();
let context = BallistaContext::standalone(&config, 1).await.unwrap();

context
Expand Down Expand Up @@ -711,14 +695,8 @@ mod standalone_tests {
#[tokio::test]
async fn test_empty_exec_with_one_row() {
use crate::context::BallistaContext;
use ballista_core::config::{
BallistaConfigBuilder, BALLISTA_WITH_INFORMATION_SCHEMA,
};

let config = BallistaConfigBuilder::default()
.set(BALLISTA_WITH_INFORMATION_SCHEMA, "true")
.build()
.unwrap();
let config = BallistaConfig::default();
let context = BallistaContext::standalone(&config, 1).await.unwrap();

let sql = "select EXTRACT(year FROM to_timestamp('2020-09-08T12:13:14+00:00'));";
Expand All @@ -730,14 +708,8 @@ mod standalone_tests {
#[tokio::test]
async fn test_union_and_union_all() {
use super::*;
use ballista_core::config::{
BallistaConfigBuilder, BALLISTA_WITH_INFORMATION_SCHEMA,
};
use datafusion::arrow::util::pretty::pretty_format_batches;
let config = BallistaConfigBuilder::default()
.set(BALLISTA_WITH_INFORMATION_SCHEMA, "true")
.build()
.unwrap();
let config = BallistaConfig::default();
let context = BallistaContext::standalone(&config, 1).await.unwrap();

let df = context
Expand Down Expand Up @@ -1056,10 +1028,7 @@ mod standalone_tests {
);
}
async fn create_test_context() -> BallistaContext {
let config = BallistaConfigBuilder::default()
.set(BALLISTA_WITH_INFORMATION_SCHEMA, "true")
.build()
.unwrap();
let config = BallistaConfig::default();
let context = BallistaContext::standalone(&config, 4).await.unwrap();

context
Expand Down
54 changes: 18 additions & 36 deletions ballista/client/src/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@

pub use ballista_core::utils::SessionConfigExt;
use ballista_core::{
config::BallistaConfig,
serde::protobuf::{
scheduler_grpc_client::SchedulerGrpcClient, CreateSessionParams, KeyValuePair,
},
serde::protobuf::{scheduler_grpc_client::SchedulerGrpcClient, CreateSessionParams},
utils::{create_grpc_client_connection, SessionStateExt},
};
use datafusion::{
error::DataFusionError, execution::SessionState, prelude::SessionContext,
error::DataFusionError,
execution::SessionState,
prelude::{SessionConfig, SessionContext},
};
use url::Url;

Expand Down Expand Up @@ -100,7 +99,7 @@ impl SessionContextExt for SessionContext {
url: &str,
state: SessionState,
) -> datafusion::error::Result<SessionContext> {
let config = state.ballista_config();
let config = state.config();

let scheduler_url = Extension::parse_url(url)?;
log::info!(
Expand All @@ -120,15 +119,14 @@ impl SessionContextExt for SessionContext {
}

async fn remote(url: &str) -> datafusion::error::Result<SessionContext> {
let config = BallistaConfig::new()
.map_err(|e| DataFusionError::Configuration(e.to_string()))?;
let config = SessionConfig::new_with_ballista();
let scheduler_url = Extension::parse_url(url)?;
log::info!(
"Connecting to Ballista scheduler at {}",
scheduler_url.clone()
);
let remote_session_id =
Extension::setup_remote(config, scheduler_url.clone()).await?;
Extension::setup_remote(&config, scheduler_url.clone()).await?;
log::info!(
"Server side SessionContext created with session id: {}",
remote_session_id
Expand All @@ -143,10 +141,8 @@ impl SessionContextExt for SessionContext {
async fn standalone_with_state(
state: SessionState,
) -> datafusion::error::Result<SessionContext> {
let config = state.ballista_config();

let (remote_session_id, scheduler_url) =
Extension::setup_standalone(config, Some(&state)).await?;
Extension::setup_standalone(Some(&state)).await?;

let session_state =
state.upgrade_for_ballista(scheduler_url, remote_session_id.clone())?;
Expand All @@ -162,11 +158,9 @@ impl SessionContextExt for SessionContext {
#[cfg(feature = "standalone")]
async fn standalone() -> datafusion::error::Result<Self> {
log::info!("Running in local mode. Scheduler will be run in-proc");
let config = BallistaConfig::new()
.map_err(|e| DataFusionError::Configuration(e.to_string()))?;

let (remote_session_id, scheduler_url) =
Extension::setup_standalone(config, None).await?;
Extension::setup_standalone(None).await?;

let session_state =
SessionState::new_ballista_state(scheduler_url, remote_session_id.clone())?;
Expand Down Expand Up @@ -197,10 +191,9 @@ impl Extension {

#[cfg(feature = "standalone")]
async fn setup_standalone(
config: BallistaConfig,
session_state: Option<&SessionState>,
) -> datafusion::error::Result<(String, String)> {
use ballista_core::serde::BallistaCodec;
use ballista_core::{serde::BallistaCodec, utils::default_config_producer};

let addr = match session_state {
None => ballista_scheduler::standalone::new_standalone_scheduler()
Expand All @@ -214,6 +207,9 @@ impl Extension {
.map_err(|e| DataFusionError::Configuration(e.to_string()))?
}
};
let config = session_state
.map(|s| s.config().clone())
.unwrap_or_else(default_config_producer);

let scheduler_url = format!("http://localhost:{}", addr.port());

Expand All @@ -229,21 +225,14 @@ impl Extension {

let remote_session_id = scheduler
.create_session(CreateSessionParams {
settings: config
.settings()
.iter()
.map(|(k, v)| KeyValuePair {
key: k.to_owned(),
value: v.to_owned(),
})
.collect::<Vec<_>>(),
settings: config.to_key_value_pairs(),
})
.await
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?
.into_inner()
.session_id;

let concurrent_tasks = config.default_standalone_parallelism();
let concurrent_tasks = config.ballista_standalone_parallelism();

match session_state {
None => {
Expand All @@ -269,28 +258,21 @@ impl Extension {
}

async fn setup_remote(
config: BallistaConfig,
config: &SessionConfig,
scheduler_url: String,
) -> datafusion::error::Result<String> {
let connection = create_grpc_client_connection(scheduler_url.clone())
.await
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;

let limit = config.default_grpc_client_max_message_size();
let limit = config.ballista_grpc_client_max_message_size();
let mut scheduler = SchedulerGrpcClient::new(connection)
.max_encoding_message_size(limit)
.max_decoding_message_size(limit);

let remote_session_id = scheduler
.create_session(CreateSessionParams {
settings: config
.settings()
.iter()
.map(|(k, v)| KeyValuePair {
key: k.to_owned(),
value: v.to_owned(),
})
.collect::<Vec<_>>(),
settings: config.to_key_value_pairs(),
})
.await
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?
Expand Down
8 changes: 1 addition & 7 deletions ballista/client/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,7 @@
//! Ballista Prelude (common imports)

pub use ballista_core::{
config::{
BallistaConfig, BALLISTA_COLLECT_STATISTICS, BALLISTA_DEFAULT_BATCH_SIZE,
BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, BALLISTA_GRPC_CLIENT_MAX_MESSAGE_SIZE,
BALLISTA_JOB_NAME, BALLISTA_PARQUET_PRUNING, BALLISTA_REPARTITION_AGGREGATIONS,
BALLISTA_REPARTITION_JOINS, BALLISTA_REPARTITION_WINDOWS,
BALLISTA_STANDALONE_PARALLELISM, BALLISTA_WITH_INFORMATION_SCHEMA,
},
config::BallistaConfig,
error::{BallistaError, Result},
};

Expand Down
Loading

0 comments on commit a542608

Please sign in to comment.