Skip to content

Commit

Permalink
Add GPU support to the LazyFrame profiler
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt711 committed Jan 13, 2025
1 parent 2e0b3a3 commit 4a0d5ec
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 11 deletions.
2 changes: 1 addition & 1 deletion crates/polars-expr/src/state/execution_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub struct ExecutionState {
pub branch_idx: usize,
pub flags: AtomicU8,
pub ext_contexts: Arc<Vec<DataFrame>>,
node_timer: Option<NodeTimer>,
pub node_timer: Option<NodeTimer>,
stop: Arc<AtomicBool>,
}

Expand Down
4 changes: 2 additions & 2 deletions crates/polars-expr/src/state/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
mod execution_state;
mod node_timer;
pub mod node_timer;

pub use execution_state::*;
use node_timer::*;
pub use node_timer::*;
8 changes: 4 additions & 4 deletions crates/polars-expr/src/state/node_timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,28 @@ type Nodes = Vec<String>;
type Ticks = Vec<(StartInstant, EndInstant)>;

#[derive(Clone)]
pub(super) struct NodeTimer {
pub struct NodeTimer {
query_start: Instant,
data: Arc<Mutex<(Nodes, Ticks)>>,
}

impl NodeTimer {
pub(super) fn new() -> Self {
pub fn new() -> Self {
Self {
query_start: Instant::now(),
data: Arc::new(Mutex::new((Vec::with_capacity(16), Vec::with_capacity(16)))),
}
}

pub(super) fn store(&self, start: StartInstant, end: EndInstant, name: String) {
pub fn store(&self, start: StartInstant, end: EndInstant, name: String) {
let mut data = self.data.lock().unwrap();
let nodes = &mut data.0;
nodes.push(name);
let ticks = &mut data.1;
ticks.push((start, end))
}

pub(super) fn finish(self) -> PolarsResult<DataFrame> {
pub fn finish(self) -> PolarsResult<DataFrame> {
let mut data = self.data.lock().unwrap();
let mut nodes = std::mem::take(&mut data.0);
nodes.push("optimization".to_string());
Expand Down
12 changes: 12 additions & 0 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,18 @@ impl LazyFrame {
self._collect_post_opt(|_, _, _| Ok(()))
}

/// Profile a LazyFrame, but also allow a post-optimization callback (e.g. for GPU).
pub fn _profile_post_opt<P>(self, post_opt: P) -> PolarsResult<(DataFrame, DataFrame)>
where
P: Fn(Node, &mut Arena<IR>, &mut Arena<AExpr>) -> PolarsResult<()>,
{
let (mut state, mut physical_plan, _) = self.prepare_collect_post_opt(false, post_opt)?;
state.time_nodes();
let df = physical_plan.execute(&mut state)?;
let timer_df = state.finish_timer()?;
Ok((df, timer_df))
}

/// Profile a LazyFrame.
///
/// This will run the query and return a tuple
Expand Down
29 changes: 29 additions & 0 deletions crates/polars-mem-engine/src/executors/scan/python_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use pyo3::exceptions::PyStopIteration;
use pyo3::prelude::*;
use pyo3::types::{PyBytes, PyNone};
use pyo3::{intern, IntoPyObjectExt, PyTypeInfo};
use std::time::Instant;
use std::time::Duration;
use polars_expr::state::node_timer::NodeTimer;

use super::*;

Expand All @@ -13,6 +16,30 @@ pub(crate) struct PythonScanExec {
pub(crate) predicate_serialized: Option<Vec<u8>>,
}

#[pyclass]
pub struct PyNodeTimer {
timer: Option<NodeTimer>,
}

#[pymethods]
impl PyNodeTimer {
pub fn store(&self, name: &str, start_ns: u64, end_ns: u64) -> PyResult<()> {
if let Some(timer) = &self.timer {
let now = Instant::now();
let start = now + Duration::from_nanos(start_ns);
let end = now + Duration::from_nanos(end_ns);
timer.store(start, end, name.to_string())
}
Ok(())
}
}

impl PyNodeTimer {
pub fn new(timer: Option<NodeTimer>) -> Self {
PyNodeTimer { timer }
}
}

fn python_df_to_rust(py: Python, df: Bound<PyAny>) -> PolarsResult<DataFrame> {
let err = |_| polars_err!(ComputeError: "expected a polars.DataFrame; got {}", df);
let pydf = df.getattr(intern!(py, "_df")).map_err(err)?;
Expand Down Expand Up @@ -66,11 +93,13 @@ impl Executor for PythonScanExec {
self.options.python_source,
PythonScanSource::Pyarrow | PythonScanSource::Cuda
) {
let py_node_timer = PyNodeTimer::new(state.node_timer.clone());
let args = (
python_scan_function,
with_columns.map(|x| x.into_iter().map(|x| x.to_string()).collect::<Vec<_>>()),
predicate,
n_rows,
Py::new(py, py_node_timer).unwrap(),
);
callable.call1(args).map_err(to_compute_err)
} else {
Expand Down
38 changes: 36 additions & 2 deletions crates/polars-python/src/lazyframe/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -602,12 +602,46 @@ impl PyLazyFrame {
ldf.cache().into()
}

fn profile(&self, py: Python) -> PyResult<(PyDataFrame, PyDataFrame)> {
#[pyo3(signature = (lambda_post_opt=None))]
fn profile(&self, py: Python, lambda_post_opt: Option<PyObject>) -> PyResult<(PyDataFrame, PyDataFrame)> {
// if we don't allow threads and we have udfs trying to acquire the gil from different
// threads we deadlock.
let (df, time_df) = py.allow_threads(|| {
let ldf = self.ldf.clone();
ldf.profile().map_err(PyPolarsErr::from)
if let Some(lambda) = lambda_post_opt {
ldf._profile_post_opt(|root, lp_arena, expr_arena| {
Python::with_gil(|py| {
let nt = NodeTraverser::new(
root,
std::mem::take(lp_arena),
std::mem::take(expr_arena),
);

// Get a copy of the arena's.
let arenas = nt.get_arenas();

// Pass the node visitor which allows the python callback to replace parts of the query plan.
// Remove "cuda" or specify better once we have multiple post-opt callbacks.
lambda.call1(py, (nt,)).map_err(
|e| polars_err!(ComputeError: "'cuda' conversion failed: {}", e),
)?;

// ... unwrap the updated arenas, etc ...
// This step ensures that any IR rewriting done by Python
// gets placed back into the polars arenas.

// Unpack the arena's.
// At this point the `nt` is useless.
std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap());
std::mem::swap(expr_arena, &mut *arenas.1.lock().unwrap());

Ok(())
})
})
} else {
ldf.profile()
}
.map_err(PyPolarsErr::from)
})?;
Ok((df.into(), time_df.into()))
}
Expand Down
34 changes: 32 additions & 2 deletions py-polars/polars/lazyframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -1622,6 +1622,8 @@ def profile(
figsize: tuple[int, int] = (18, 8),
streaming: bool = False,
_check_order: bool = True,
engine: EngineType = "cpu",
**_kwargs: Any,
) -> tuple[DataFrame, DataFrame]:
"""
Profile a LazyFrame.
Expand Down Expand Up @@ -1722,8 +1724,36 @@ def profile(
_check_order=_check_order,
new_streaming=False,
)
df, timings = ldf.profile()
(df, timings) = wrap_df(df), wrap_df(timings)
is_config_obj = isinstance(engine, GPUEngine)
is_gpu = (engine == "gpu") or is_config_obj
if streaming and is_gpu:
issue_warning(
"GPU engine does not support streaming; disabling GPU engine.",
category=UserWarning,
)
is_gpu = False

callback = None
if is_gpu:
cudf_polars = import_optional(
"cudf_polars",
err_prefix="GPU engine requested, but required package",
install_message=(
"Please install using the command "
"`pip install cudf-polars-cu12` "
"(or `pip install --extra-index-url=https://pypi.nvidia.com cudf-polars-cu11` "
"if your system has a CUDA 11 driver)."
),
)
if not is_config_obj:
engine = GPUEngine() # If user only passed engine="gpu"
callback = partial(cudf_polars.execute_with_cudf, config=engine)

callback = _kwargs.get("post_opt_callback", callback)

df, timings = ldf.profile(callback)

df, timings = wrap_df(df), wrap_df(timings)

if show_plot:
import_optional(
Expand Down

0 comments on commit 4a0d5ec

Please sign in to comment.