Skip to content

Commit

Permalink
Updating BallistaContext and Config, calling it for the night, will c…
Browse files Browse the repository at this point in the history
…omplete tomorrow
  • Loading branch information
tbar4 committed Nov 15, 2024
1 parent ca9d60d commit 407540a
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 77 deletions.
14 changes: 8 additions & 6 deletions python/ballista/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,18 @@

from .ballista_internal import (
Ballista,
SessionConfig,
SessionStateBuilder,
SessionState
BallistaBuilder,
#SessionConfig,
#SessionStateBuilder,
#SessionState
)

__version__ = importlib_metadata.version(__name__)

__all__ = [
"Ballista",
"SessionConfig",
"SessionStateBuilder",
"SessionState"
"BallistaBuilder",
#"SessionConfig",
#"SessionStateBuilder",
#"SessionState"
]
48 changes: 33 additions & 15 deletions python/ballista/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,41 @@

from _typeshed import Self
from datafusion import SessionContext
from ballista import SessionConfig, SessionStateBuilder, SessionState, Ballista
from ballista import Ballista, BallistaBuilder

from typing import List, Any


class BallistaBuilder:
def __new__(cls):
return super().__new__(cls)

def __init__(self) -> None:
self = {}

def set(self, k, v) -> Self:
return self.set(k, v)


class Ballista:
def __new__(cls):
return super().__new__(cls)

def __init__(self) -> None:
self.state = Ballista()

def standalone(self):
self.standalone()

def builder(self) -> BallistaBuilder:
return BallistaBuilder()

"""
### Future State Implementation
class SessionState:
def __new__(cls):
return super().__new__(cls)
class SessionConfig:
def __new__(cls):
return super().__new__(cls)
Expand All @@ -45,17 +76,4 @@ def with_config(self, config: SessionConfig) -> SessionStateBuilder:
def build(self) -> SessionState:
self.build()

class SessionState:
def __new__(cls):
return super().__new__(cls)

class Ballista:
def __new__(cls):
return super().__new__(cls)

def __init__(self) -> None:
self.state = Ballista()

def standalone(self):
self.standalone()
"""
19 changes: 10 additions & 9 deletions python/examples/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,26 @@
# specific language governing permissions and limitations
# under the License.

from ballista import Ballista, SessionConfig, SessionStateBuilder
from ballista import Ballista, BallistaBuilder
from datafusion.context import SessionContext

# Ballista will initiate with an empty config
ballista = Ballista()
config = SessionConfig()\
.set_str("BALLISTA_DEFAULT_SHUFFLE_PARTITIONS", "4")

# Build the state
state = SessionStateBuilder()\
.with_config(config)\
ballista = BallistaBuilder()\
.set("ballista.job.name", "example ballista")\
.set("ballista.shuffle.partitions", "4")\
.set("ballista.executor.cpus", "4")\
.build()

print(ballista)
print(ballista.show_config())

"""
# Create the context
ctx: SessionContext = Ballista().standalone()
ctx.sql("SELECT 1").show()
"""
# Define custom settings
job_settings = {
"BALLISTA_JOB_NAME": "Example Ballista Job",
Expand Down
139 changes: 92 additions & 47 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ use datafusion::execution::{SessionState, SessionStateBuilder};
use datafusion::prelude::*;
use datafusion_python::context::PySessionContext as DataFusionPythonSessionContext;
use datafusion_python::utils::wait_for_future;
use std::borrow::BorrowMut;
use std::cell::RefCell;

use std::collections::HashMap;
use std::fmt::Formatter;
use std::path::Display;

use pyo3::prelude::*;
mod utils;
Expand All @@ -34,15 +36,102 @@ fn ballista_internal(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> {
pyo3_log::init();
// Ballista structs
m.add_class::<PyBallista>()?;
m.add_class::<PyBallistaBuilder>()?;
// DataFusion structs
m.add_class::<datafusion_python::dataframe::PyDataFrame>()?;
// Ballista Config
/*
// Future implementation will include more state and config options
m.add_class::<PySessionStateBuilder>()?;
m.add_class::<PySessionState>()?;
m.add_class::<PySessionConfig>()?;
*/
Ok(())
}

// Ballista Builder will take a HasMap/Dict Cionfg
#[pyclass(name = "BallistaBuilder", module = "ballista", subclass)]
pub struct PyBallistaBuilder(HashMap<String, String>);

#[pymethods]
impl PyBallistaBuilder {
#[new]
pub fn new() -> Self {
Self(HashMap::new())
}

pub fn set(mut slf: PyRefMut<'_, Self>, k: &str, v: &str, py: Python) -> PyResult<PyObject> {
slf.0.insert(k.into(), v.into());

Ok(slf.into_py(py))
}

pub fn show_config(&self) {
println!("Ballista Config:");
for ele in self.0.iter() {
println!(" {}: {}", ele.0, ele.1)
}
}

pub fn build(slf: PyRef<'_, Self>) -> PyBallista {
PyBallista {
conf: PyBallistaBuilder(slf.0.clone())
}
}
}

#[pyclass(name = "Ballista", module = "ballista", subclass)]
pub struct PyBallista {
pub conf: PyBallistaBuilder,
}

#[pymethods]
impl PyBallista {
#[new]
pub fn new() -> Self {
Self {
conf: PyBallistaBuilder::new(),
}
}

pub fn show_config(&self) {
println!("Ballista Config:");
for ele in self.conf.0.clone() {
println!("{:4}: {}", ele.0, ele.1)
}
}

/// Construct the standalone instance from the SessionContext
pub fn standalone(
&self,
concurrent_tasks: usize,
py: Python,
) -> PyResult<DataFusionPythonSessionContext> {
// Build the config
let config = BallistaConfig::with_settings(self.conf.0).unwrap();
// Define the SessionContext
let session_context = BallistaContext::standalone(&config, concurrent_tasks);
// SessionContext is an async function
let ctx = wait_for_future(py, session_context).unwrap();

// Convert the SessionContext into a Python SessionContext
Ok(ctx.context().into())
}

/// Construct the remote instance from the SessionContext
pub fn remote(url: &str, py: Python) -> PyResult<DataFusionPythonSessionContext> {
let session_context = SessionContext::remote(url);
let ctx = wait_for_future(py, session_context)?;

// Convert the SessionContext into a Python SessionContext
Ok(ctx.into())
}
}


/*
Plan to implement Session Config and State in a future issue
/// Ballista Session Extension builder
#[pyclass(name = "SessionConfig", module = "ballista", subclass)]
#[derive(Clone)]
Expand Down Expand Up @@ -94,48 +183,4 @@ impl PySessionStateBuilder {
}
}
}

#[pyclass(name = "Ballista", module = "ballista", subclass)]
pub struct PyBallista {
pub state: RefCell<SessionStateBuilder>,
}

#[pymethods]
impl PyBallista {
#[new]
pub fn new() -> Self {
Self {
state: RefCell::new(SessionStateBuilder::new()),
}
}

pub fn update_state(&mut self, state: &PyCell<PySessionStateBuilder>) {
self.state = state.borrow_mut().state
}

/// Construct the standalone instance from the SessionContext
pub fn standalone(
slf: PyRef<'_, Self>,
state: PySessionStateBuilder,
py: Python,
) -> PyResult<DataFusionPythonSessionContext> {
let take_state = state.take().build();
// Define the SessionContext
let session_context = SessionContext::standalone_with_state(take_state);
// SessionContext is an async function
let ctx = wait_for_future(py, session_context).unwrap();

// Convert the SessionContext into a Python SessionContext
Ok(ctx.into())
}

#[staticmethod]
/// Construct the remote instance from the SessionContext
pub fn remote(url: &str, py: Python) -> PyResult<DataFusionPythonSessionContext> {
let session_context = SessionContext::remote(url);
let ctx = wait_for_future(py, session_context)?;

// Convert the SessionContext into a Python SessionContext
Ok(ctx.into())
}
}
*/

0 comments on commit 407540a

Please sign in to comment.