From 407540afc6f4bae289a86633a3e723f20b740ab2 Mon Sep 17 00:00:00 2001 From: tbar4 Date: Thu, 14 Nov 2024 21:59:26 -0800 Subject: [PATCH] Updating BallistaContext and Config, calling it for the night, will complete tomorrow --- python/ballista/__init__.py | 14 ++-- python/ballista/context.py | 48 +++++++++---- python/examples/example.py | 19 ++--- python/src/lib.rs | 139 ++++++++++++++++++++++++------------ 4 files changed, 143 insertions(+), 77 deletions(-) diff --git a/python/ballista/__init__.py b/python/ballista/__init__.py index d9f4c816a..99486ae55 100644 --- a/python/ballista/__init__.py +++ b/python/ballista/__init__.py @@ -27,16 +27,18 @@ from .ballista_internal import ( Ballista, - SessionConfig, - SessionStateBuilder, - SessionState + BallistaBuilder, + #SessionConfig, + #SessionStateBuilder, + #SessionState ) __version__ = importlib_metadata.version(__name__) __all__ = [ "Ballista", - "SessionConfig", - "SessionStateBuilder", - "SessionState" + "BallistaBuilder", + #"SessionConfig", + #"SessionStateBuilder", + #"SessionState" ] \ No newline at end of file diff --git a/python/ballista/context.py b/python/ballista/context.py index e6da21cce..a09451438 100644 --- a/python/ballista/context.py +++ b/python/ballista/context.py @@ -17,10 +17,41 @@ from _typeshed import Self from datafusion import SessionContext -from ballista import SessionConfig, SessionStateBuilder, SessionState, Ballista +from ballista import Ballista, BallistaBuilder from typing import List, Any + +class BallistaBuilder: + def __new__(cls): + return super().__new__(cls) + + def __init__(self) -> None: + self = {} + + def set(self, k, v) -> Self: + return self.set(k, v) + + +class Ballista: + def __new__(cls): + return super().__new__(cls) + + def __init__(self) -> None: + self.state = Ballista() + + def standalone(self): + self.standalone() + + def builder(self) -> BallistaBuilder: + return BallistaBuilder() + +""" +### Future State Implementation +class SessionState: + def __new__(cls): + return super().__new__(cls) + class SessionConfig: def __new__(cls): return super().__new__(cls) @@ -45,17 +76,4 @@ def with_config(self, config: SessionConfig) -> SessionStateBuilder: def build(self) -> SessionState: self.build() - -class SessionState: - def __new__(cls): - return super().__new__(cls) - -class Ballista: - def __new__(cls): - return super().__new__(cls) - - def __init__(self) -> None: - self.state = Ballista() - - def standalone(self): - self.standalone() \ No newline at end of file +""" \ No newline at end of file diff --git a/python/examples/example.py b/python/examples/example.py index 00caaf777..dcaa3a087 100644 --- a/python/examples/example.py +++ b/python/examples/example.py @@ -15,25 +15,26 @@ # specific language governing permissions and limitations # under the License. -from ballista import Ballista, SessionConfig, SessionStateBuilder +from ballista import Ballista, BallistaBuilder from datafusion.context import SessionContext # Ballista will initiate with an empty config -ballista = Ballista() -config = SessionConfig()\ - .set_str("BALLISTA_DEFAULT_SHUFFLE_PARTITIONS", "4") - -# Build the state -state = SessionStateBuilder()\ - .with_config(config)\ +ballista = BallistaBuilder()\ + .set("ballista.job.name", "example ballista")\ + .set("ballista.shuffle.partitions", "4")\ + .set("ballista.executor.cpus", "4")\ .build() + +print(ballista) +print(ballista.show_config()) +""" # Create the context ctx: SessionContext = Ballista().standalone() ctx.sql("SELECT 1").show() -""" + # Define custom settings job_settings = { "BALLISTA_JOB_NAME": "Example Ballista Job", diff --git a/python/src/lib.rs b/python/src/lib.rs index cb206e3a1..48ab40e56 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -23,8 +23,10 @@ use datafusion::execution::{SessionState, SessionStateBuilder}; use datafusion::prelude::*; use datafusion_python::context::PySessionContext as DataFusionPythonSessionContext; use datafusion_python::utils::wait_for_future; -use std::borrow::BorrowMut; -use std::cell::RefCell; + +use std::collections::HashMap; +use std::fmt::Formatter; +use std::path::Display; use pyo3::prelude::*; mod utils; @@ -34,15 +36,102 @@ fn ballista_internal(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { pyo3_log::init(); // Ballista structs m.add_class::()?; + m.add_class::()?; // DataFusion structs m.add_class::()?; // Ballista Config + /* + // Future implementation will include more state and config options m.add_class::()?; m.add_class::()?; m.add_class::()?; + */ Ok(()) } +// Ballista Builder will take a HasMap/Dict Cionfg +#[pyclass(name = "BallistaBuilder", module = "ballista", subclass)] +pub struct PyBallistaBuilder(HashMap); + +#[pymethods] +impl PyBallistaBuilder { + #[new] + pub fn new() -> Self { + Self(HashMap::new()) + } + + pub fn set(mut slf: PyRefMut<'_, Self>, k: &str, v: &str, py: Python) -> PyResult { + slf.0.insert(k.into(), v.into()); + + Ok(slf.into_py(py)) + } + + pub fn show_config(&self) { + println!("Ballista Config:"); + for ele in self.0.iter() { + println!(" {}: {}", ele.0, ele.1) + } + } + + pub fn build(slf: PyRef<'_, Self>) -> PyBallista { + PyBallista { + conf: PyBallistaBuilder(slf.0.clone()) + } + } +} + +#[pyclass(name = "Ballista", module = "ballista", subclass)] +pub struct PyBallista { + pub conf: PyBallistaBuilder, +} + +#[pymethods] +impl PyBallista { + #[new] + pub fn new() -> Self { + Self { + conf: PyBallistaBuilder::new(), + } + } + + pub fn show_config(&self) { + println!("Ballista Config:"); + for ele in self.conf.0.clone() { + println!("{:4}: {}", ele.0, ele.1) + } + } + + /// Construct the standalone instance from the SessionContext + pub fn standalone( + &self, + concurrent_tasks: usize, + py: Python, + ) -> PyResult { + // Build the config + let config = BallistaConfig::with_settings(self.conf.0).unwrap(); + // Define the SessionContext + let session_context = BallistaContext::standalone(&config, concurrent_tasks); + // SessionContext is an async function + let ctx = wait_for_future(py, session_context).unwrap(); + + // Convert the SessionContext into a Python SessionContext + Ok(ctx.context().into()) + } + + /// Construct the remote instance from the SessionContext + pub fn remote(url: &str, py: Python) -> PyResult { + let session_context = SessionContext::remote(url); + let ctx = wait_for_future(py, session_context)?; + + // Convert the SessionContext into a Python SessionContext + Ok(ctx.into()) + } +} + + +/* +Plan to implement Session Config and State in a future issue + /// Ballista Session Extension builder #[pyclass(name = "SessionConfig", module = "ballista", subclass)] #[derive(Clone)] @@ -94,48 +183,4 @@ impl PySessionStateBuilder { } } } - -#[pyclass(name = "Ballista", module = "ballista", subclass)] -pub struct PyBallista { - pub state: RefCell, -} - -#[pymethods] -impl PyBallista { - #[new] - pub fn new() -> Self { - Self { - state: RefCell::new(SessionStateBuilder::new()), - } - } - - pub fn update_state(&mut self, state: &PyCell) { - self.state = state.borrow_mut().state - } - - /// Construct the standalone instance from the SessionContext - pub fn standalone( - slf: PyRef<'_, Self>, - state: PySessionStateBuilder, - py: Python, - ) -> PyResult { - let take_state = state.take().build(); - // Define the SessionContext - let session_context = SessionContext::standalone_with_state(take_state); - // SessionContext is an async function - let ctx = wait_for_future(py, session_context).unwrap(); - - // Convert the SessionContext into a Python SessionContext - Ok(ctx.into()) - } - - #[staticmethod] - /// Construct the remote instance from the SessionContext - pub fn remote(url: &str, py: Python) -> PyResult { - let session_context = SessionContext::remote(url); - let ctx = wait_for_future(py, session_context)?; - - // Convert the SessionContext into a Python SessionContext - Ok(ctx.into()) - } -} +*/