From 4a0d5ec48720b8708447213b0e2df39f31d90420 Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Mon, 13 Jan 2025 18:17:15 +0000 Subject: [PATCH] Add GPU support to the LazyFrame profiler --- .../polars-expr/src/state/execution_state.rs | 2 +- crates/polars-expr/src/state/mod.rs | 4 +- crates/polars-expr/src/state/node_timer.rs | 8 ++-- crates/polars-lazy/src/frame/mod.rs | 12 ++++++ .../src/executors/scan/python_scan.rs | 29 ++++++++++++++ crates/polars-python/src/lazyframe/general.rs | 38 ++++++++++++++++++- py-polars/polars/lazyframe/frame.py | 34 ++++++++++++++++- 7 files changed, 116 insertions(+), 11 deletions(-) diff --git a/crates/polars-expr/src/state/execution_state.rs b/crates/polars-expr/src/state/execution_state.rs index 07c571e26653..096e22cb36f2 100644 --- a/crates/polars-expr/src/state/execution_state.rs +++ b/crates/polars-expr/src/state/execution_state.rs @@ -70,7 +70,7 @@ pub struct ExecutionState { pub branch_idx: usize, pub flags: AtomicU8, pub ext_contexts: Arc>, - node_timer: Option, + pub node_timer: Option, stop: Arc, } diff --git a/crates/polars-expr/src/state/mod.rs b/crates/polars-expr/src/state/mod.rs index d8f5ca5b8ca0..84b794ad0ff6 100644 --- a/crates/polars-expr/src/state/mod.rs +++ b/crates/polars-expr/src/state/mod.rs @@ -1,5 +1,5 @@ mod execution_state; -mod node_timer; +pub mod node_timer; pub use execution_state::*; -use node_timer::*; +pub use node_timer::*; diff --git a/crates/polars-expr/src/state/node_timer.rs b/crates/polars-expr/src/state/node_timer.rs index c3114d3029cd..85e4f0293876 100644 --- a/crates/polars-expr/src/state/node_timer.rs +++ b/crates/polars-expr/src/state/node_timer.rs @@ -11,20 +11,20 @@ type Nodes = Vec; type Ticks = Vec<(StartInstant, EndInstant)>; #[derive(Clone)] -pub(super) struct NodeTimer { +pub struct NodeTimer { query_start: Instant, data: Arc>, } impl NodeTimer { - pub(super) fn new() -> Self { + pub fn new() -> Self { Self { query_start: Instant::now(), data: Arc::new(Mutex::new((Vec::with_capacity(16), Vec::with_capacity(16)))), } } - pub(super) fn store(&self, start: StartInstant, end: EndInstant, name: String) { + pub fn store(&self, start: StartInstant, end: EndInstant, name: String) { let mut data = self.data.lock().unwrap(); let nodes = &mut data.0; nodes.push(name); @@ -32,7 +32,7 @@ impl NodeTimer { ticks.push((start, end)) } - pub(super) fn finish(self) -> PolarsResult { + pub fn finish(self) -> PolarsResult { let mut data = self.data.lock().unwrap(); let mut nodes = std::mem::take(&mut data.0); nodes.push("optimization".to_string()); diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index b365e7284566..d48cf354b92b 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -747,6 +747,18 @@ impl LazyFrame { self._collect_post_opt(|_, _, _| Ok(())) } + /// Profile a LazyFrame, but also allow a post-optimization callback (e.g. for GPU). + pub fn _profile_post_opt

(self, post_opt: P) -> PolarsResult<(DataFrame, DataFrame)> + where + P: Fn(Node, &mut Arena, &mut Arena) -> PolarsResult<()>, + { + let (mut state, mut physical_plan, _) = self.prepare_collect_post_opt(false, post_opt)?; + state.time_nodes(); + let df = physical_plan.execute(&mut state)?; + let timer_df = state.finish_timer()?; + Ok((df, timer_df)) + } + /// Profile a LazyFrame. /// /// This will run the query and return a tuple diff --git a/crates/polars-mem-engine/src/executors/scan/python_scan.rs b/crates/polars-mem-engine/src/executors/scan/python_scan.rs index b8d4ca9e3e7f..df81b7e9bfb8 100644 --- a/crates/polars-mem-engine/src/executors/scan/python_scan.rs +++ b/crates/polars-mem-engine/src/executors/scan/python_scan.rs @@ -4,6 +4,9 @@ use pyo3::exceptions::PyStopIteration; use pyo3::prelude::*; use pyo3::types::{PyBytes, PyNone}; use pyo3::{intern, IntoPyObjectExt, PyTypeInfo}; +use std::time::Instant; +use std::time::Duration; +use polars_expr::state::node_timer::NodeTimer; use super::*; @@ -13,6 +16,30 @@ pub(crate) struct PythonScanExec { pub(crate) predicate_serialized: Option>, } +#[pyclass] +pub struct PyNodeTimer { + timer: Option, +} + +#[pymethods] +impl PyNodeTimer { + pub fn store(&self, name: &str, start_ns: u64, end_ns: u64) -> PyResult<()> { + if let Some(timer) = &self.timer { + let now = Instant::now(); + let start = now + Duration::from_nanos(start_ns); + let end = now + Duration::from_nanos(end_ns); + timer.store(start, end, name.to_string()) + } + Ok(()) + } +} + +impl PyNodeTimer { + pub fn new(timer: Option) -> Self { + PyNodeTimer { timer } + } +} + fn python_df_to_rust(py: Python, df: Bound) -> PolarsResult { let err = |_| polars_err!(ComputeError: "expected a polars.DataFrame; got {}", df); let pydf = df.getattr(intern!(py, "_df")).map_err(err)?; @@ -66,11 +93,13 @@ impl Executor for PythonScanExec { self.options.python_source, PythonScanSource::Pyarrow | PythonScanSource::Cuda ) { + let py_node_timer = PyNodeTimer::new(state.node_timer.clone()); let args = ( python_scan_function, with_columns.map(|x| x.into_iter().map(|x| x.to_string()).collect::>()), predicate, n_rows, + Py::new(py, py_node_timer).unwrap(), ); callable.call1(args).map_err(to_compute_err) } else { diff --git a/crates/polars-python/src/lazyframe/general.rs b/crates/polars-python/src/lazyframe/general.rs index ea6c39f8a2b9..cf766b5aa7f4 100644 --- a/crates/polars-python/src/lazyframe/general.rs +++ b/crates/polars-python/src/lazyframe/general.rs @@ -602,12 +602,46 @@ impl PyLazyFrame { ldf.cache().into() } - fn profile(&self, py: Python) -> PyResult<(PyDataFrame, PyDataFrame)> { + #[pyo3(signature = (lambda_post_opt=None))] + fn profile(&self, py: Python, lambda_post_opt: Option) -> PyResult<(PyDataFrame, PyDataFrame)> { // if we don't allow threads and we have udfs trying to acquire the gil from different // threads we deadlock. let (df, time_df) = py.allow_threads(|| { let ldf = self.ldf.clone(); - ldf.profile().map_err(PyPolarsErr::from) + if let Some(lambda) = lambda_post_opt { + ldf._profile_post_opt(|root, lp_arena, expr_arena| { + Python::with_gil(|py| { + let nt = NodeTraverser::new( + root, + std::mem::take(lp_arena), + std::mem::take(expr_arena), + ); + + // Get a copy of the arena's. + let arenas = nt.get_arenas(); + + // Pass the node visitor which allows the python callback to replace parts of the query plan. + // Remove "cuda" or specify better once we have multiple post-opt callbacks. + lambda.call1(py, (nt,)).map_err( + |e| polars_err!(ComputeError: "'cuda' conversion failed: {}", e), + )?; + + // ... unwrap the updated arenas, etc ... + // This step ensures that any IR rewriting done by Python + // gets placed back into the polars arenas. + + // Unpack the arena's. + // At this point the `nt` is useless. + std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap()); + std::mem::swap(expr_arena, &mut *arenas.1.lock().unwrap()); + + Ok(()) + }) + }) + } else { + ldf.profile() + } + .map_err(PyPolarsErr::from) })?; Ok((df.into(), time_df.into())) } diff --git a/py-polars/polars/lazyframe/frame.py b/py-polars/polars/lazyframe/frame.py index 5453daa3995c..7906ed19ba2b 100644 --- a/py-polars/polars/lazyframe/frame.py +++ b/py-polars/polars/lazyframe/frame.py @@ -1622,6 +1622,8 @@ def profile( figsize: tuple[int, int] = (18, 8), streaming: bool = False, _check_order: bool = True, + engine: EngineType = "cpu", + **_kwargs: Any, ) -> tuple[DataFrame, DataFrame]: """ Profile a LazyFrame. @@ -1722,8 +1724,36 @@ def profile( _check_order=_check_order, new_streaming=False, ) - df, timings = ldf.profile() - (df, timings) = wrap_df(df), wrap_df(timings) + is_config_obj = isinstance(engine, GPUEngine) + is_gpu = (engine == "gpu") or is_config_obj + if streaming and is_gpu: + issue_warning( + "GPU engine does not support streaming; disabling GPU engine.", + category=UserWarning, + ) + is_gpu = False + + callback = None + if is_gpu: + cudf_polars = import_optional( + "cudf_polars", + err_prefix="GPU engine requested, but required package", + install_message=( + "Please install using the command " + "`pip install cudf-polars-cu12` " + "(or `pip install --extra-index-url=https://pypi.nvidia.com cudf-polars-cu11` " + "if your system has a CUDA 11 driver)." + ), + ) + if not is_config_obj: + engine = GPUEngine() # If user only passed engine="gpu" + callback = partial(cudf_polars.execute_with_cudf, config=engine) + + callback = _kwargs.get("post_opt_callback", callback) + + df, timings = ldf.profile(callback) + + df, timings = wrap_df(df), wrap_df(timings) if show_plot: import_optional(