Skip to content

Commit

Permalink
Adding config to ballista context
Browse files Browse the repository at this point in the history
  • Loading branch information
Trevor Barnes committed Nov 16, 2024
1 parent 23b1957 commit 108736d
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 171 deletions.
2 changes: 0 additions & 2 deletions python/ballista/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import pyarrow as pa

from .ballista_internal import (
Ballista,
BallistaBuilder,
#SessionConfig,
#SessionStateBuilder,
Expand All @@ -36,7 +35,6 @@
__version__ = importlib_metadata.version(__name__)

__all__ = [
"Ballista",
"BallistaBuilder",
#"SessionConfig",
#"SessionStateBuilder",
Expand Down
49 changes: 3 additions & 46 deletions python/ballista/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from _typeshed import Self
from datafusion import SessionContext
from ballista import Ballista, BallistaBuilder
from ballista import BallistaBuilder

from typing import List, Any

Expand All @@ -29,51 +29,8 @@ def __new__(cls):
def __init__(self) -> None:
self = {}

def set(self, k, v) -> Self:
return self.set(k, v)


class Ballista:
def __new__(cls):
return super().__new__(cls)

def __init__(self) -> None:
self.state = Ballista()
def config(self, k, v) -> Self:
return self.config(k, v)

def standalone(self, concurrent_tasks: int = 4):
self.standalone()

def builder(self) -> BallistaBuilder:
return BallistaBuilder()

"""
### Future State Implementation
class SessionState:
def __new__(cls):
return super().__new__(cls)
class SessionConfig:
def __new__(cls):
return super().__new__(cls)
def __init__(self):
self.session_config = SessionConfig()
def set_str(self, key: str, value: str):
self.session_config.set_str(key, value)
class SessionStateBuilder:
def __new__(cls):
return super().__new__(cls)
def __init__(self) -> None:
self.state = SessionStateBuilder()
def with_config(self, config: SessionConfig) -> SessionStateBuilder:
self.with_config(config)
return self
def build(self) -> SessionState:
self.build()
"""
18 changes: 6 additions & 12 deletions python/examples/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,17 @@
# specific language governing permissions and limitations
# under the License.

from ballista import Ballista, BallistaBuilder
from ballista import BallistaBuilder
from datafusion.context import SessionContext

# Ballista will initiate with an empty config
# set config variables with `set()`
ballista = BallistaBuilder()\
.set("ballista.job.name", "example ballista")\
.set("ballista.shuffle.partitions", "16")\
.set("ballista.executor.cpus", "4")\
.build()
ctx: SessionContext = BallistaBuilder()\
.config("ballista.job.name", "example ballista")\
.config("ballista.shuffle.partitions", "16")\
.config("ballista.executor.cpus", "4")\
.remote("http://10.103.0.25:50050")

# Show the Ballista Config
print(ballista.show_config())

# Build a standalone Cluster (use `remote()`)
# for remote cluster
ctx: SessionContext = ballista.standalone()
#ctx_remote: SessionContext = ballista.remote("remote_ip", 50050)

# Select 1 to verify its working
Expand Down
140 changes: 29 additions & 111 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
// under the License.

use ballista::prelude::*;
use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::*;
use datafusion_python::context::PySessionContext as DataFusionPythonSessionContext;
use datafusion_python::utils::wait_for_future;

Expand All @@ -29,7 +31,6 @@ use utils::to_pyerr;
fn ballista_internal(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> {
pyo3_log::init();
// Ballista structs
m.add_class::<PyBallista>()?;
m.add_class::<PyBallistaBuilder>()?;
// DataFusion structs
m.add_class::<datafusion_python::dataframe::PyDataFrame>()?;
Expand All @@ -53,10 +54,12 @@ pub struct PyBallistaBuilder {
impl PyBallistaBuilder {
#[new]
pub fn new() -> Self {
Self { conf: HashMap::new() }
Self {
conf: HashMap::new(),
}
}

pub fn set(
pub fn config(
mut slf: PyRefMut<'_, Self>,
k: &str,
v: &str,
Expand All @@ -74,52 +77,20 @@ impl PyBallistaBuilder {
}
}

pub fn build(slf: PyRef<'_, Self>) -> PyBallista {
PyBallista {
conf: PyBallistaBuilder {
conf: slf.conf.clone(),
},
}
}
}

#[pyclass(name = "Ballista", module = "ballista", subclass)]
pub struct PyBallista {
pub conf: PyBallistaBuilder,
}

#[pymethods]
impl PyBallista {
#[new]
pub fn new() -> Self {
Self {
conf: PyBallistaBuilder::new(),
}
}

pub fn show_config(&self) {
println!("Ballista Config:");
for ele in self.conf.conf.clone() {
println!("{:4}: {}", ele.0, ele.1)
}
}

/// Construct the standalone instance from the SessionContext
#[pyo3(signature = (concurrent_tasks = 4))]
pub fn standalone(
&self,
concurrent_tasks: usize,
py: Python,
) -> PyResult<DataFusionPythonSessionContext> {
pub fn standalone(&self, py: Python) -> PyResult<DataFusionPythonSessionContext> {
// Build the config
let config = &BallistaConfig::with_settings(self.conf.conf.clone()).unwrap();
// Define the SessionContext
let session_context = BallistaContext::standalone(&config, concurrent_tasks);
let config: SessionConfig = SessionConfig::from_string_hash_map(&self.conf)?;
// Build the state
let state = SessionStateBuilder::new()
.with_config(config)
.with_default_features()
.build();
// Build the context
let standalone_session = SessionContext::standalone_with_state(state);

// SessionContext is an async function
let ctx = wait_for_future(py, session_context)
.map_err(to_pyerr)?
.context()
.clone();
let ctx = wait_for_future(py, standalone_session)?;

// Convert the SessionContext into a Python SessionContext
Ok(ctx.into())
Expand All @@ -128,76 +99,23 @@ impl PyBallista {
/// Construct the remote instance from the SessionContext
pub fn remote(
&self,
host: &str,
port: u16,
url: &str,
py: Python,
) -> PyResult<DataFusionPythonSessionContext> {
// Build the config
let config = &BallistaConfig::with_settings(self.conf.conf.clone()).unwrap();
// Create the BallistaContext
let session_context = BallistaContext::remote(host, port, config);
let ctx = wait_for_future(py, session_context)
.map_err(to_pyerr)?
.context()
.clone();
let config: SessionConfig = SessionConfig::from_string_hash_map(&self.conf)?;
// Build the state
let state = SessionStateBuilder::new()
.with_config(config)
.with_default_features()
.build();
// Build the context
let remote_session = SessionContext::remote_with_state(url, state);

// SessionContext is an async function
let ctx = wait_for_future(py, remote_session)?;

// Convert the SessionContext into a Python SessionContext
Ok(ctx.into())
}
}

/*
Plan to implement Session Config and State in a future issue
/// Ballista Session Extension builder
#[pyclass(name = "SessionConfig", module = "ballista", subclass)]
#[derive(Clone)]
pub struct PySessionConfig {
pub session_config: SessionConfig,
}
#[pymethods]
impl PySessionConfig {
#[new]
pub fn new() -> Self {
let session_config = SessionConfig::new_with_ballista();
Self { session_config }
}
pub fn set_str(&mut self, key: &str, value: &str) -> Self {
self.session_config.options_mut().set(key, value);
self.clone()
}
}
#[pyclass(name = "SessionStateBuilder", module = "ballista", subclass)]
pub struct PySessionStateBuilder {
pub state: RefCell<SessionStateBuilder>,
}
#[pymethods]
impl PySessionStateBuilder {
#[new]
pub fn new() -> Self {
Self {
state: RefCell::new(SessionStateBuilder::new()),
}
}
pub fn with_config(&mut self, config: PySessionConfig) -> PySessionStateBuilder {
let state = self.state.take().with_config(config.session_config);
PySessionStateBuilder {
state: state.into()
}
}
pub fn build(&mut self) -> PySessionStateBuilder {
PySessionStateBuilder {
state: RefCell::new(self.state.take())
}
}
}
*/

0 comments on commit 108736d

Please sign in to comment.