Skip to content

Commit

Permalink
add few more comments on object_store test
Browse files Browse the repository at this point in the history
  • Loading branch information
milenkovicm committed Nov 4, 2024
1 parent ad32ac0 commit 2a26ad9
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 12 deletions.
2 changes: 1 addition & 1 deletion ballista/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ testcontainers-modules = { version = "0.11", features = ["minio"] }

[features]
azure = ["ballista-core/azure"]
default = []
default = ["standalone", "testcontainers"]
s3 = ["ballista-core/s3"]
standalone = ["ballista-executor", "ballista-scheduler"]
testcontainers = []
151 changes: 148 additions & 3 deletions ballista/client/tests/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,11 @@ mod remote {
}
}

// this test shows how to configure external ObjectStoreRegistry and configure it
// this test shows how to register external ObjectStoreRegistry and configure it
// using infrastructure provided by ballista.
//
// it relies on ballista configuration integration with SessionConfig, and
// SessionConfig propagation across ballista cluster.

#[cfg(test)]
#[cfg(feature = "testcontainers")]
Expand Down Expand Up @@ -238,9 +242,13 @@ mod custom_s3_config {
use crate::common::{ACCESS_KEY_ID, SECRET_KEY};

#[tokio::test]
async fn should_execute_sql_write() -> datafusion::error::Result<()> {
async fn should_configure_s3_execute_sql_write_remote(
) -> datafusion::error::Result<()> {
let test_data = crate::common::example_test_data();

//
// Minio cluster setup
//
let container = crate::common::create_minio_container();
let node = container.start().await.unwrap();

Expand All @@ -250,11 +258,24 @@ mod custom_s3_config {

let endpoint_port = node.get_host_port_ipv4(9000).await.unwrap();

//
// Session Context and Ballista cluster setup
//

// Setting up configuration producer
//
// configuration producer registers user defined config extension
// S3Option with relevant S3 configuration
let config_producer = Arc::new(|| {
SessionConfig::new_with_ballista()
.with_information_schema(true)
.with_option_extension(S3Options::default())
});
// Setting up runtime producer
//
// Runtime producer creates object store registry
// which can create object store connecter based on
// S3Option configuration.
let runtime_producer: RuntimeProducer =
Arc::new(|session_config: &SessionConfig| {
let s3options = session_config
Expand All @@ -272,10 +293,16 @@ mod custom_s3_config {
Ok(Arc::new(RuntimeEnv::new(config)?))
});

let session_builder = Arc::new(produce_state);
// Session builder creates SessionState
//
// which is configured using runtime and configuration producer,
// producing same runtime environment, and providing same
// object store registry.

let session_builder = Arc::new(produce_state);
let state = session_builder(config_producer());

// setting up ballista cluster with new runtime, configuration, and session state producers
let (host, port) = crate::common::setup_test_cluster_with_builders(
config_producer,
runtime_producer,
Expand All @@ -284,7 +311,124 @@ mod custom_s3_config {
.await;
let url = format!("df://{host}:{port}");

// establishing cluster connection,
let ctx: SessionContext = SessionContext::remote_with_state(&url, state).await?;

// setting up relevant S3 options
ctx.sql("SET s3.allow_http = true").await?.show().await?;
ctx.sql(&format!("SET s3.access_key_id = '{}'", ACCESS_KEY_ID))
.await?
.show()
.await?;
ctx.sql(&format!("SET s3.secret_access_key = '{}'", SECRET_KEY))
.await?
.show()
.await?;
ctx.sql(&format!(
"SET s3.endpoint = 'http://localhost:{}'",
endpoint_port
))
.await?
.show()
.await?;
ctx.sql("SET s3.allow_http = true").await?.show().await?;

// verifying that we have set S3Options
ctx.sql("select name, value from information_schema.df_settings where name like 's3.%'").await?.show().await?;

ctx.register_parquet(
"test",
&format!("{test_data}/alltypes_plain.parquet"),
Default::default(),
)
.await?;

let write_dir_path =
&format!("s3://{}/write_test.parquet", crate::common::BUCKET);

ctx.sql("select * from test")
.await?
.write_parquet(write_dir_path, Default::default(), Default::default())
.await?;

ctx.register_parquet("written_table", write_dir_path, Default::default())
.await?;

let result = ctx
.sql("select id, string_col, timestamp_col from written_table where id > 4")
.await?
.collect()
.await?;
let expected = [
"+----+------------+---------------------+",
"| id | string_col | timestamp_col |",
"+----+------------+---------------------+",
"| 5 | 31 | 2009-03-01T00:01:00 |",
"| 6 | 30 | 2009-04-01T00:00:00 |",
"| 7 | 31 | 2009-04-01T00:01:00 |",
"+----+------------+---------------------+",
];

assert_batches_eq!(expected, &result);
Ok(())
}

// this test shows how to register external ObjectStoreRegistry and configure it
// using infrastructure provided by ballista standalone.
//
// it relies on ballista configuration integration with SessionConfig, and
// SessionConfig propagation across ballista cluster.

#[tokio::test]
#[cfg(feature = "standalone")]
async fn should_configure_s3_execute_sql_write_standalone(
) -> datafusion::error::Result<()> {
let test_data = crate::common::example_test_data();

//
// Minio cluster setup
//
let container = crate::common::create_minio_container();
let node = container.start().await.unwrap();

node.exec(crate::common::create_bucket_command())
.await
.unwrap();

let endpoint_port = node.get_host_port_ipv4(9000).await.unwrap();

//
// Session Context and Ballista cluster setup
//

// Setting up configuration producer
//
// configuration producer registers user defined config extension
// S3Option with relevant S3 configuration
let config_producer = Arc::new(|| {
SessionConfig::new_with_ballista()
.with_information_schema(true)
.with_option_extension(S3Options::default())
});

// Session builder creates SessionState
//
// which is configured using runtime and configuration producer,
// producing same runtime environment, and providing same
// object store registry.

let session_builder = Arc::new(produce_state);
let state = session_builder(config_producer());

// // setting up ballista cluster with new runtime, configuration, and session state producers
// let (host, port) =
// crate::common::setup_test_cluster_with_state(state.clone()).await;
// let url = format!("df://{host}:{port}");

// // establishing cluster connection,
let ctx: SessionContext = SessionContext::standalone_with_state(state).await?;

// setting up relevant S3 options
ctx.sql("SET s3.allow_http = true").await?.show().await?;
ctx.sql(&format!("SET s3.access_key_id = '{}'", ACCESS_KEY_ID))
.await?
Expand All @@ -303,6 +447,7 @@ mod custom_s3_config {
.await?;
ctx.sql("SET s3.allow_http = true").await?.show().await?;

// verifying that we have set S3Options
ctx.sql("select name, value from information_schema.df_settings where name like 's3.%'").await?.show().await?;

ctx.register_parquet(
Expand Down
2 changes: 1 addition & 1 deletion ballista/core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ impl SessionConfigExt for SessionConfig {
.iter()
.filter(|v| v.value.is_some())
.map(
// TODO MM make value optional
// TODO MM make `value` optional value
|datafusion::config::ConfigEntry { key, value, .. }| {
log::trace!(
"sending configuration key: `{}`, value`{:?}`",
Expand Down
2 changes: 1 addition & 1 deletion ballista/scheduler/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ pub trait JobState: Send + Sync {
session_id: &str,
) -> Result<Option<Arc<SessionContext>>>;

// TODO MM not sure this is the best place
// TODO MM not sure this is the best place to put config producer
fn produce_config(&self) -> SessionConfig;
}

Expand Down
12 changes: 7 additions & 5 deletions ballista/scheduler/src/scheduler_server/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use std::sync::Arc;
use crate::cluster::{bind_task_bias, bind_task_round_robin};
use crate::config::TaskDistributionPolicy;
use crate::scheduler_server::event::QueryStageSchedulerEvent;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion::prelude::SessionContext;
use std::time::{SystemTime, UNIX_EPOCH};
use tonic::{Request, Response, Status};

Expand Down Expand Up @@ -285,7 +285,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc

Ok(Response::new(UpdateTaskStatusResult { success: true }))
}

// TODO MM do we need this method, can it be removed
async fn get_file_metadata(
&self,
request: Request<GetFileMetadataParams>,
Expand Down Expand Up @@ -422,7 +422,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
match self.state.session_manager.get_session(&session_id).await {
Ok(ctx) => {
// [SessionConfig] will be updated from received properties
// TODO MM can we do something better here

// TODO MM can we do something better here?
// move this to update session and use .update_session(&session_params.session_id, &session_config)
// instead of get_session

let state = ctx.state_ref();
let mut state = state.write();
Expand All @@ -446,8 +449,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
}
_ => {
// Create default config
// TODO MM: this one should be from a factory
let session_config = SessionConfig::new_with_ballista();
let session_config = self.state.session_manager.produce_config();
let session_config =
session_config.update_from_key_value_pair(&settings);

Expand Down
2 changes: 1 addition & 1 deletion ballista/scheduler/src/state/session_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ pub fn create_datafusion_context(
let session_state = if session_config.round_robin_repartition() {
let session_config = session_config
.clone()
// TODO MM should we disable catalog on the scheduler side
// should we disable catalog on the scheduler side
.with_round_robin_repartition(false);

log::warn!("session manager will override `datafusion.optimizer.enable_round_robin_repartition` to `false` ");
Expand Down

0 comments on commit 2a26ad9

Please sign in to comment.