Skip to content

Commit

Permalink
Adding config to ballista context
Browse files Browse the repository at this point in the history
  • Loading branch information
Trevor Barnes committed Nov 15, 2024
1 parent 0a3cbd0 commit 16e6c67
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 59 deletions.
47 changes: 9 additions & 38 deletions python/examples/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,50 +19,21 @@
from datafusion.context import SessionContext

# Ballista will initiate with an empty config
# set config variables with `set()`
ballista = BallistaBuilder()\
.set("ballista.job.name", "example ballista")\
.set("ballista.shuffle.partitions", "4")\
.set("ballista.shuffle.partitions", "16")\
.set("ballista.executor.cpus", "4")\
.build()

print(ballista)
# Show the Ballista Config
print(ballista.show_config())

"""
# Create the context
ctx: SessionContext = Ballista().standalone()
# Build a standalone Cluster (use `remote()`)
# for remote cluster
ctx: SessionContext = ballista.standalone()
#ctx_remote: SessionContext = ballista.remote("remote_ip", 50050)

# Select 1 to verify its working
ctx.sql("SELECT 1").show()
# Define custom settings
job_settings = {
"BALLISTA_JOB_NAME": "Example Ballista Job",
"DEFAULT_SHUFFLE_PARTITIONS": "2"
}
ballista.configuration(job_settings)
# But you can also set your own config
print("New Ballista Config: ", ballista.settings())
# Or you can check default settings in BallistaConfig
print("Default Shuffle Partitions: ", ballista.default_shuffle_partitions())
# Create the Ballista Context [standalone or remote]
ctx: SessionContext = ballista.standalone() # Ballista.remote()
# Register our parquet file to perform SQL operations
ctx.register_parquet("test_parquet", "./testdata/test.parquet")
# Select the data from our test parquet file
test_parquet = ctx.sql("SELECT * FROM test_parquet")
# Show our test parquet data
print(test_parquet.show())
# To perform dataframe operations, read in data
test_csv = ctx.read_csv("./testdata/test.csv", has_header=False)
# Show the dataframe
test_csv.show()
"""
#ctx_remote.sql("SELECT 2").show()
50 changes: 29 additions & 21 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,15 @@
// specific language governing permissions and limitations
// under the License.

use ballista::extension::SessionConfigExt;
use ballista::prelude::*;
use ballista_core::utils::SessionStateExt;
use datafusion::catalog::Session;
use datafusion::execution::{SessionState, SessionStateBuilder};
use datafusion::prelude::*;
use datafusion_python::context::PySessionContext as DataFusionPythonSessionContext;
use datafusion_python::utils::wait_for_future;

use std::collections::HashMap;
use std::fmt::Formatter;
use std::path::Display;

use pyo3::prelude::*;
mod utils;
use utils::to_pyerr;

#[pymodule]
fn ballista_internal(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> {
Expand All @@ -51,13 +45,15 @@ fn ballista_internal(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> {

// Ballista Builder will take a HasMap/Dict Cionfg
#[pyclass(name = "BallistaBuilder", module = "ballista", subclass)]
pub struct PyBallistaBuilder(HashMap<String, String>);
pub struct PyBallistaBuilder {
conf: HashMap<String, String>,
}

#[pymethods]
impl PyBallistaBuilder {
#[new]
pub fn new() -> Self {
Self(HashMap::new())
Self { conf: HashMap::new() }
}

pub fn set(
Expand All @@ -66,21 +62,23 @@ impl PyBallistaBuilder {
v: &str,
py: Python,
) -> PyResult<PyObject> {
slf.0.insert(k.into(), v.into());
slf.conf.insert(k.into(), v.into());

Ok(slf.into_py(py))
}

pub fn show_config(&self) {
println!("Ballista Config:");
for ele in self.0.iter() {
println!(" {}: {}", ele.0, ele.1)
for ele in self.conf.iter() {
println!("\t{}: {}", ele.0, ele.1)
}
}

pub fn build(slf: PyRef<'_, Self>) -> PyBallista {
PyBallista {
conf: PyBallistaBuilder(slf.0.clone()),
conf: PyBallistaBuilder {
conf: slf.conf.clone(),
},
}
}
}
Expand All @@ -101,41 +99,51 @@ impl PyBallista {

pub fn show_config(&self) {
println!("Ballista Config:");
for ele in self.conf.0.clone() {
for ele in self.conf.conf.clone() {
println!("{:4}: {}", ele.0, ele.1)
}
}

/// Construct the standalone instance from the SessionContext
#[pyo3(signature = (concurrent_tasks = 4))]
pub fn standalone(
&self,
concurrent_tasks: usize,
py: Python,
) -> PyResult<DataFusionPythonSessionContext> {
// Build the config
let config = BallistaConfig::with_settings(self.conf.0.clone()).unwrap();
let config = &BallistaConfig::with_settings(self.conf.conf.clone()).unwrap();
// Define the SessionContext
let session_context = BallistaContext::standalone(&config, concurrent_tasks);
// SessionContext is an async function
let ctx = wait_for_future(py, session_context)
.unwrap()
.map_err(to_pyerr)?
.context()
.clone();

// Convert the SessionContext into a Python SessionContext
Ok(ctx.into())
}

/*
/// Construct the remote instance from the SessionContext
pub fn remote(url: &str, py: Python) -> PyResult<DataFusionPythonSessionContext> {
let session_context = SessionContext::remote(url);
let ctx = wait_for_future(py, session_context)?;
pub fn remote(
&self,
host: &str,
port: u16,
py: Python,
) -> PyResult<DataFusionPythonSessionContext> {
// Build the config
let config = &BallistaConfig::with_settings(self.conf.conf.clone()).unwrap();
// Create the BallistaContext
let session_context = BallistaContext::remote(host, port, config);
let ctx = wait_for_future(py, session_context)
.map_err(to_pyerr)?
.context()
.clone();

// Convert the SessionContext into a Python SessionContext
Ok(ctx.into())
}
*/
}

/*
Expand Down

0 comments on commit 16e6c67

Please sign in to comment.