Skip to content

Commit

Permalink
updated python to have two static methods for creating a ballista con…
Browse files Browse the repository at this point in the history
…text
  • Loading branch information
Trevor Barnes committed Nov 13, 2024
1 parent a92a568 commit ca9d60d
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 106 deletions.
10 changes: 6 additions & 4 deletions python/ballista/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@

from .ballista_internal import (
Ballista,
BallistaConfig,
BallistaConfigBuilder
SessionConfig,
SessionStateBuilder,
SessionState
)

__version__ = importlib_metadata.version(__name__)

__all__ = [
"Ballista",
"BallistaConfig",
"BallistaConfigBuilder"
"SessionConfig",
"SessionStateBuilder",
"SessionState"
]
122 changes: 26 additions & 96 deletions python/ballista/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,115 +17,45 @@

from _typeshed import Self
from datafusion import SessionContext
from ballista import Ballista, BallistaConfig, BallistaConfigBuilder
from ballista import SessionConfig, SessionStateBuilder, SessionState, Ballista

from typing import List, Any

class BallistaConfigBuilder:
def __init__(self) -> None:
pass

def set(self, k: str, v: str):
BallistaConfigBuilder.set(self, k, v)
class SessionConfig:
def __new__(cls):
return super().__new__(cls)

def build() -> BallistaConfig:
BallistaConfig()

class BallistaConfig:
def __init__(self):
self.config = self.with_settings({})

def builder(self) -> BallistaConfig:
builder = self.builder()

return builder

def with_settings(self, settings: dict) -> BallistaConfig:
self.with_settings(settings)

return self
self.session_config = SessionConfig()

def settings(self) -> None:
self.settings()
def set_str(self, key: str, value: str):
self.session_config.set_str(key, value)

def default_shuffle_partitions(self):
self.default_shuffle_partitions()

def default_batch_size(self):
self.default_batch_size()
class SessionStateBuilder:
def __new__(cls):
return super().__new__(cls)

def hash_join_single_partition_threshold(self):
self.hash_join_single_partition_threshold()

def default_grpc_client_max_message_size(self):
self.default_grpc_client_max_message_size()

def repartition_joins(self):
self.repartition_joins()

def repartition_aggregations(self):
self.repartition_aggregations()

def repartition_windows(self):
self.repartition_windows()
def __init__(self) -> None:
self.state = SessionStateBuilder()

def parquet_pruning(self):
self.parquet_pruning()
def with_config(self, config: SessionConfig) -> SessionStateBuilder:
self.with_config(config)

def collect_statistics(self):
self.collect_statistics()
return self

def default_standalone_parallelism(self):
self.default_standalone_parallelism()
def build(self) -> SessionState:
self.build()

def default_with_information_schema(self):
self.default_with_information_schema()
class SessionState:
def __new__(cls):
return super().__new__(cls)

class Ballista:
def __init__(self):
self.config = BallistaConfig()

def configuration(self, settings: dict):
self.config = BallistaConfig().builder().with_settings(settings)

def standalone(self) -> SessionContext:
return self.standalone()

def remote(self, url: str) -> SessionContext:
return self.remote(url)

def settings(self):
self.config.settings()

def default_shuffle_partitions(self):
self.config.default_shuffle_partitions()

def default_batch_size(self):
self.config.default_batch_size()

def hash_join_single_partition_threshold(self):
self.config.hash_join_single_partition_threshold()
def __new__(cls):
return super().__new__(cls)

def default_grpc_client_max_message_size(self):
self.config.default_grpc_client_max_message_size()

def repartition_joins(self):
self.config.repartition_joins()

def repartion_aggregations(self):
self.config.repartition_aggregations()

def repartition_windows(self):
self.config.repartition_windows()

def parquet_pruning(self):
self.config.parquet_pruning()

def collect_statistics(self):
self.config.collect_statistics()

def default_standalone_parallelism(self):
self.config.default_standalone_parallelism()
def __init__(self) -> None:
self.state = Ballista()

def default_with_information_schema(self):
self.config.default_with_information_schema()
def standalone(self):
self.standalone()
17 changes: 15 additions & 2 deletions python/examples/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,25 @@
# specific language governing permissions and limitations
# under the License.

from ballista import Ballista, BallistaConfig, BallistaConfigBuilder
from ballista import Ballista, SessionConfig, SessionStateBuilder
from datafusion.context import SessionContext

# Ballista will initiate with an empty config
ballista = Ballista()
config = BallistaConfig()
config = SessionConfig()\
.set_str("BALLISTA_DEFAULT_SHUFFLE_PARTITIONS", "4")

# Build the state
state = SessionStateBuilder()\
.with_config(config)\
.build()

# Create the context
ctx: SessionContext = Ballista().standalone()

ctx.sql("SELECT 1").show()

"""
# Define custom settings
job_settings = {
"BALLISTA_JOB_NAME": "Example Ballista Job",
Expand Down Expand Up @@ -52,3 +64,4 @@
# Show the dataframe
test_csv.show()
"""
27 changes: 23 additions & 4 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

use ballista::extension::SessionConfigExt;
use ballista::prelude::*;
use datafusion::execution::SessionStateBuilder;
use ballista_core::utils::SessionStateExt;
use datafusion::catalog::Session;
use datafusion::execution::{SessionState, SessionStateBuilder};
use datafusion::prelude::*;
use datafusion_python::context::PySessionContext as DataFusionPythonSessionContext;
use datafusion_python::utils::wait_for_future;
use std::borrow::BorrowMut;
use std::cell::RefCell;

use pyo3::prelude::*;
Expand All @@ -35,6 +38,7 @@ fn ballista_internal(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<datafusion_python::dataframe::PyDataFrame>()?;
// Ballista Config
m.add_class::<PySessionStateBuilder>()?;
m.add_class::<PySessionState>()?;
m.add_class::<PySessionConfig>()?;
Ok(())
}
Expand Down Expand Up @@ -76,8 +80,18 @@ impl PySessionStateBuilder {
}
}

pub fn with_config(slf: PyRefMut<'_, Self>, config: PySessionConfig) {
slf.state.take().with_config(config.session_config);
pub fn with_config(&mut self, config: PySessionConfig) -> PySessionStateBuilder {
let state = self.state.take().with_config(config.session_config);

PySessionStateBuilder {
state: state.into()
}
}

pub fn build(&mut self) -> PySessionStateBuilder {
PySessionStateBuilder {
state: RefCell::new(self.state.take())
}
}
}

Expand All @@ -94,13 +108,18 @@ impl PyBallista {
state: RefCell::new(SessionStateBuilder::new()),
}
}

pub fn update_state(&mut self, state: &PyCell<PySessionStateBuilder>) {
self.state = state.borrow_mut().state
}

/// Construct the standalone instance from the SessionContext
pub fn standalone(
slf: PyRef<'_, Self>,
state: PySessionStateBuilder,
py: Python,
) -> PyResult<DataFusionPythonSessionContext> {
let take_state = slf.state.take().build();
let take_state = state.take().build();
// Define the SessionContext
let session_context = SessionContext::standalone_with_state(take_state);
// SessionContext is an async function
Expand Down

0 comments on commit ca9d60d

Please sign in to comment.