Skip to content

Commit

Permalink
add testcontainer to verify s3 access
Browse files Browse the repository at this point in the history
  • Loading branch information
milenkovicm committed Oct 26, 2024
1 parent 1406be3 commit 5d990a7
Show file tree
Hide file tree
Showing 12 changed files with 199 additions and 19 deletions.
5 changes: 4 additions & 1 deletion ballista/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,12 @@ ballista-executor = { path = "../executor", version = "0.12.0" }
ballista-scheduler = { path = "../scheduler", version = "0.12.0" }
ctor = { version = "0.2" }
env_logger = { workspace = true }
object_store = { workspace = true, features = ["aws"] }
testcontainers-modules = { version = "0.11", features = ["minio"] }

[features]
azure = ["ballista-core/azure"]
default = ["standalone"]
default = ["standalone", "testcontainers"]
s3 = ["ballista-core/s3"]
standalone = ["ballista-executor", "ballista-scheduler"]
testcontainers = []
25 changes: 22 additions & 3 deletions ballista/client/src/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,28 @@ impl Extension {
>,
session_state: Option<&SessionState>,
) -> datafusion::error::Result<(String, String)> {
let addr = ballista_scheduler::standalone::new_standalone_scheduler()
.await
.map_err(|e| DataFusionError::Configuration(e.to_string()))?;
use std::sync::Arc;

use datafusion::{execution::SessionStateBuilder, prelude::SessionConfig};

let addr = match session_state {
None => ballista_scheduler::standalone::new_standalone_scheduler()
.await
.map_err(|e| DataFusionError::Configuration(e.to_string()))?,
Some(session_state) => {
let session_state = session_state.clone();
let builder = move |c: SessionConfig| {
SessionStateBuilder::new_from_existing(session_state.clone())
.with_config(c)
.build()
};
ballista_scheduler::standalone::new_standalone_scheduler_from_builder(
Arc::new(builder),
)
.await
.map_err(|e| DataFusionError::Configuration(e.to_string()))?
}
};

let scheduler_url = format!("http://localhost:{}", addr.port());

Expand Down
46 changes: 46 additions & 0 deletions ballista/client/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,52 @@ use ballista::prelude::BallistaConfig;
use ballista_core::serde::{
protobuf::scheduler_grpc_client::SchedulerGrpcClient, BallistaCodec,
};
use object_store::aws::AmazonS3Builder;
use testcontainers_modules::minio::MinIO;
use testcontainers_modules::testcontainers::core::{CmdWaitFor, ExecCommand};
use testcontainers_modules::testcontainers::ContainerRequest;
use testcontainers_modules::{minio, testcontainers::ImageExt};

pub const REGION: &str = "LOCAL";
pub const BUCKET: &str = "ballista";
pub const ACCESS_KEY_ID: &str = "MINIO";
pub const SECRET_KEY: &str = "MINIOMINIO";

#[allow(dead_code)]
pub fn create_s3_store(
port: u16,
) -> std::result::Result<object_store::aws::AmazonS3, object_store::Error> {
AmazonS3Builder::new()
.with_endpoint(format!("http://localhost:{port}"))
.with_region(REGION)
.with_bucket_name(BUCKET)
.with_access_key_id(ACCESS_KEY_ID)
.with_secret_access_key(SECRET_KEY)
.with_allow_http(true)
.build()
}

#[allow(dead_code)]
pub fn create_minio_container() -> ContainerRequest<minio::MinIO> {
MinIO::default()
.with_env_var("MINIO_ACCESS_KEY", ACCESS_KEY_ID)
.with_env_var("MINIO_SECRET_KEY", SECRET_KEY)
}

#[allow(dead_code)]
pub fn create_bucket_command() -> ExecCommand {
// this is hack to create a bucket without creating s3 client.
// this works with current testcontainer (and image) version 'RELEASE.2022-02-07T08-17-33Z'.
// (testcontainer does not await properly on latest image version)
//
// if testcontainer image version change to something newer we should use "mc mb /data/ballista"
// to crate a bucket.
ExecCommand::new(vec![
"mkdir".to_string(),
format!("/data/{}", crate::common::BUCKET),
])
.with_cmd_ready_condition(CmdWaitFor::seconds(1))
}

// /// Remote ballista cluster to be used for local testing.
// static BALLISTA_CLUSTER: tokio::sync::OnceCell<(String, u16)> =
Expand Down
103 changes: 103 additions & 0 deletions ballista/client/tests/object_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

mod common;

#[cfg(test)]
#[cfg(feature = "standalone")]
#[cfg(feature = "testcontainers")]
mod standalone {

use ballista::extension::SessionContextExt;
use datafusion::{assert_batches_eq, prelude::SessionContext};
use datafusion::{
error::DataFusionError,
execution::{
runtime_env::{RuntimeConfig, RuntimeEnv},
SessionStateBuilder,
},
};
use std::sync::Arc;
use testcontainers_modules::testcontainers::runners::AsyncRunner;

#[tokio::test]
async fn should_execute_sql_write() -> datafusion::error::Result<()> {
let container = crate::common::create_minio_container();
let node = container.start().await.unwrap();
// tokio::time::sleep(Duration::from_secs(300)).await;
node.exec(crate::common::create_bucket_command())
.await
.unwrap();

let port = node.get_host_port_ipv4(9000).await.unwrap();
//let port = 9000;

let object_store = crate::common::create_s3_store(port)
.map_err(|e| DataFusionError::External(e.into()))?;

let test_data = crate::common::example_test_data();
let config = RuntimeConfig::new();
let runtime_env = RuntimeEnv::new(config)?;

runtime_env.register_object_store(
&format!("s3://{}", crate::common::BUCKET)
.as_str()
.try_into()
.unwrap(),
Arc::new(object_store),
);
let state = SessionStateBuilder::new()
.with_runtime_env(runtime_env.into())
.build();

let ctx: SessionContext = SessionContext::standalone_with_state(state).await?;
ctx.register_parquet(
"test",
&format!("{test_data}/alltypes_plain.parquet"),
Default::default(),
)
.await?;

let write_dir_path = "s3://ballista/write_test.parquet";

ctx.sql("select * from test")
.await?
.write_parquet(write_dir_path, Default::default(), Default::default())
.await?;

ctx.register_parquet("written_table", write_dir_path, Default::default())
.await?;

let result = ctx
.sql("select id, string_col, timestamp_col from written_table where id > 4")
.await?
.collect()
.await?;
let expected = [
"+----+------------+---------------------+",
"| id | string_col | timestamp_col |",
"+----+------------+---------------------+",
"| 5 | 31 | 2009-03-01T00:01:00 |",
"| 6 | 30 | 2009-04-01T00:00:00 |",
"| 7 | 31 | 2009-04-01T00:01:00 |",
"+----+------------+---------------------+",
];

assert_batches_eq!(expected, &result);
Ok(())
}
}
1 change: 1 addition & 0 deletions ballista/core/src/object_store_registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use std::sync::Arc;
use url::Url;

/// Get a RuntimeConfig with specific ObjectStoreRegistry
// TODO: #[deprecated] this method
pub fn with_object_store_registry(config: RuntimeConfig) -> RuntimeConfig {
let registry = Arc::new(BallistaObjectStoreRegistry::default());
config.with_object_store_registry(registry)
Expand Down
2 changes: 1 addition & 1 deletion ballista/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ anyhow = "1"
arrow = { workspace = true }
arrow-flight = { workspace = true }
async-trait = { workspace = true }
ballista-core = { path = "../core", version = "0.12.0", features = ["s3"] }
ballista-core = { path = "../core", version = "0.12.0" }
configure_me = { workspace = true }
dashmap = { workspace = true }
datafusion = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion ballista/scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ anyhow = "1"
arrow-flight = { workspace = true }
async-trait = { workspace = true }
axum = "0.7.7"
ballista-core = { path = "../core", version = "0.12.0", features = ["s3"] }
ballista-core = { path = "../core", version = "0.12.0" }
base64 = { version = "0.22" }
clap = { workspace = true }
configure_me = { workspace = true }
Expand Down
18 changes: 10 additions & 8 deletions ballista/scheduler/src/cluster/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ impl JobState for InMemoryJobState {
&self,
config: &BallistaConfig,
) -> Result<Arc<SessionContext>> {
let session = create_datafusion_context(config, self.session_builder);
let session = create_datafusion_context(config, self.session_builder.clone());
self.sessions.insert(session.session_id(), session.clone());

Ok(session)
Expand All @@ -412,7 +412,7 @@ impl JobState for InMemoryJobState {
session_id: &str,
config: &BallistaConfig,
) -> Result<Arc<SessionContext>> {
let session = create_datafusion_context(config, self.session_builder);
let session = create_datafusion_context(config, self.session_builder.clone());
self.sessions
.insert(session_id.to_string(), session.clone());

Expand Down Expand Up @@ -486,6 +486,8 @@ impl JobState for InMemoryJobState {

#[cfg(test)]
mod test {
use std::sync::Arc;

use crate::cluster::memory::InMemoryJobState;
use crate::cluster::test_util::{test_job_lifecycle, test_job_planning_failure};
use crate::test_utils::{
Expand All @@ -497,17 +499,17 @@ mod test {
#[tokio::test]
async fn test_in_memory_job_lifecycle() -> Result<()> {
test_job_lifecycle(
InMemoryJobState::new("", default_session_builder),
InMemoryJobState::new("", Arc::new(default_session_builder)),
test_aggregation_plan(4).await,
)
.await?;
test_job_lifecycle(
InMemoryJobState::new("", default_session_builder),
InMemoryJobState::new("", Arc::new(default_session_builder)),
test_two_aggregations_plan(4).await,
)
.await?;
test_job_lifecycle(
InMemoryJobState::new("", default_session_builder),
InMemoryJobState::new("", Arc::new(default_session_builder)),
test_join_plan(4).await,
)
.await?;
Expand All @@ -518,17 +520,17 @@ mod test {
#[tokio::test]
async fn test_in_memory_job_planning_failure() -> Result<()> {
test_job_planning_failure(
InMemoryJobState::new("", default_session_builder),
InMemoryJobState::new("", Arc::new(default_session_builder)),
test_aggregation_plan(4).await,
)
.await?;
test_job_planning_failure(
InMemoryJobState::new("", default_session_builder),
InMemoryJobState::new("", Arc::new(default_session_builder)),
test_two_aggregations_plan(4).await,
)
.await?;
test_job_planning_failure(
InMemoryJobState::new("", default_session_builder),
InMemoryJobState::new("", Arc::new(default_session_builder)),
test_join_plan(4).await,
)
.await?;
Expand Down
2 changes: 1 addition & 1 deletion ballista/scheduler/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl BallistaCluster {
match &config.cluster_storage {
ClusterStorageConfig::Memory => Ok(BallistaCluster::new_memory(
scheduler,
default_session_builder,
Arc::new(default_session_builder),
)),
}
}
Expand Down
2 changes: 1 addition & 1 deletion ballista/scheduler/src/scheduler_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ mod external_scaler;
mod grpc;
pub(crate) mod query_stage_scheduler;

pub(crate) type SessionBuilder = fn(SessionConfig) -> SessionState;
pub(crate) type SessionBuilder = Arc<dyn Fn(SessionConfig) -> SessionState + Send + Sync>;

#[derive(Clone)]
pub struct SchedulerServer<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> {
Expand Down
10 changes: 8 additions & 2 deletions ballista/scheduler/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,15 @@ use std::sync::Arc;
use tokio::net::TcpListener;

pub async fn new_standalone_scheduler() -> Result<SocketAddr> {
let metrics_collector = default_metrics_collector()?;
new_standalone_scheduler_from_builder(Arc::new(default_session_builder)).await
}

let cluster = BallistaCluster::new_memory("localhost:50050", default_session_builder);
pub async fn new_standalone_scheduler_from_builder(
session_builder: crate::scheduler_server::SessionBuilder,
) -> Result<SocketAddr> {
let cluster = BallistaCluster::new_memory("localhost:50050", session_builder);

let metrics_collector = default_metrics_collector()?;

let mut scheduler_server: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
SchedulerServer::new(
Expand Down
2 changes: 1 addition & 1 deletion ballista/scheduler/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ pub async fn await_condition<Fut: Future<Output = Result<bool>>, F: Fn() -> Fut>
}

pub fn test_cluster_context() -> BallistaCluster {
BallistaCluster::new_memory(TEST_SCHEDULER_NAME, default_session_builder)
BallistaCluster::new_memory(TEST_SCHEDULER_NAME, Arc::new(default_session_builder))
}

pub async fn datafusion_test_context(path: &str) -> Result<SessionContext> {
Expand Down

0 comments on commit 5d990a7

Please sign in to comment.