diff --git a/python/examples/example.py b/python/examples/example.py index dcaa3a087..63962352e 100644 --- a/python/examples/example.py +++ b/python/examples/example.py @@ -19,50 +19,21 @@ from datafusion.context import SessionContext # Ballista will initiate with an empty config +# set config variables with `set()` ballista = BallistaBuilder()\ .set("ballista.job.name", "example ballista")\ - .set("ballista.shuffle.partitions", "4")\ + .set("ballista.shuffle.partitions", "16")\ .set("ballista.executor.cpus", "4")\ .build() -print(ballista) +# Show the Ballista Config print(ballista.show_config()) -""" -# Create the context -ctx: SessionContext = Ballista().standalone() +# Build a standalone Cluster (use `remote()`) +# for remote cluster +ctx: SessionContext = ballista.standalone() +#ctx_remote: SessionContext = ballista.remote("remote_ip", 50050) +# Select 1 to verify its working ctx.sql("SELECT 1").show() - - -# Define custom settings -job_settings = { - "BALLISTA_JOB_NAME": "Example Ballista Job", - "DEFAULT_SHUFFLE_PARTITIONS": "2" -} - -ballista.configuration(job_settings) - -# But you can also set your own config -print("New Ballista Config: ", ballista.settings()) - -# Or you can check default settings in BallistaConfig -print("Default Shuffle Partitions: ", ballista.default_shuffle_partitions()) -# Create the Ballista Context [standalone or remote] -ctx: SessionContext = ballista.standalone() # Ballista.remote() - -# Register our parquet file to perform SQL operations -ctx.register_parquet("test_parquet", "./testdata/test.parquet") - -# Select the data from our test parquet file -test_parquet = ctx.sql("SELECT * FROM test_parquet") - -# Show our test parquet data -print(test_parquet.show()) - -# To perform dataframe operations, read in data -test_csv = ctx.read_csv("./testdata/test.csv", has_header=False) - -# Show the dataframe -test_csv.show() -""" +#ctx_remote.sql("SELECT 2").show() \ No newline at end of file diff --git a/python/src/lib.rs b/python/src/lib.rs index c00ae8961..7b623456f 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -15,21 +15,15 @@ // specific language governing permissions and limitations // under the License. -use ballista::extension::SessionConfigExt; use ballista::prelude::*; -use ballista_core::utils::SessionStateExt; -use datafusion::catalog::Session; -use datafusion::execution::{SessionState, SessionStateBuilder}; -use datafusion::prelude::*; use datafusion_python::context::PySessionContext as DataFusionPythonSessionContext; use datafusion_python::utils::wait_for_future; use std::collections::HashMap; -use std::fmt::Formatter; -use std::path::Display; use pyo3::prelude::*; mod utils; +use utils::to_pyerr; #[pymodule] fn ballista_internal(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { @@ -51,13 +45,15 @@ fn ballista_internal(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { // Ballista Builder will take a HasMap/Dict Cionfg #[pyclass(name = "BallistaBuilder", module = "ballista", subclass)] -pub struct PyBallistaBuilder(HashMap); +pub struct PyBallistaBuilder { + conf: HashMap, +} #[pymethods] impl PyBallistaBuilder { #[new] pub fn new() -> Self { - Self(HashMap::new()) + Self { conf: HashMap::new() } } pub fn set( @@ -66,21 +62,23 @@ impl PyBallistaBuilder { v: &str, py: Python, ) -> PyResult { - slf.0.insert(k.into(), v.into()); + slf.conf.insert(k.into(), v.into()); Ok(slf.into_py(py)) } pub fn show_config(&self) { println!("Ballista Config:"); - for ele in self.0.iter() { - println!(" {}: {}", ele.0, ele.1) + for ele in self.conf.iter() { + println!("\t{}: {}", ele.0, ele.1) } } pub fn build(slf: PyRef<'_, Self>) -> PyBallista { PyBallista { - conf: PyBallistaBuilder(slf.0.clone()), + conf: PyBallistaBuilder { + conf: slf.conf.clone(), + }, } } } @@ -101,24 +99,25 @@ impl PyBallista { pub fn show_config(&self) { println!("Ballista Config:"); - for ele in self.conf.0.clone() { + for ele in self.conf.conf.clone() { println!("{:4}: {}", ele.0, ele.1) } } /// Construct the standalone instance from the SessionContext + #[pyo3(signature = (concurrent_tasks = 4))] pub fn standalone( &self, concurrent_tasks: usize, py: Python, ) -> PyResult { // Build the config - let config = BallistaConfig::with_settings(self.conf.0.clone()).unwrap(); + let config = &BallistaConfig::with_settings(self.conf.conf.clone()).unwrap(); // Define the SessionContext let session_context = BallistaContext::standalone(&config, concurrent_tasks); // SessionContext is an async function let ctx = wait_for_future(py, session_context) - .unwrap() + .map_err(to_pyerr)? .context() .clone(); @@ -126,16 +125,25 @@ impl PyBallista { Ok(ctx.into()) } - /* /// Construct the remote instance from the SessionContext - pub fn remote(url: &str, py: Python) -> PyResult { - let session_context = SessionContext::remote(url); - let ctx = wait_for_future(py, session_context)?; + pub fn remote( + &self, + host: &str, + port: u16, + py: Python, + ) -> PyResult { + // Build the config + let config = &BallistaConfig::with_settings(self.conf.conf.clone()).unwrap(); + // Create the BallistaContext + let session_context = BallistaContext::remote(host, port, config); + let ctx = wait_for_future(py, session_context) + .map_err(to_pyerr)? + .context() + .clone(); // Convert the SessionContext into a Python SessionContext Ok(ctx.into()) } - */ } /*