Skip to content

Commit

Permalink
#38 Building from config, preparing, validating now returns LazyFrame…
Browse files Browse the repository at this point in the history
…s only. This is to allow streaming feature on driver. If not cfg streaming then thoe LazyFrame will be collected
  • Loading branch information
AnatolyBuga committed Nov 17, 2022
1 parent c4dbed6 commit c7333df
Show file tree
Hide file tree
Showing 18 changed files with 416 additions and 274 deletions.
2 changes: 1 addition & 1 deletion base_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ futures = { version = "0.3", optional=true }
dashmap = {workspace = true, optional=true}

[features]
default = ["aws_s3"]
default = []
aws_s3 = ["dep:aws-config", "dep:aws-sdk-s3", "dep:tokio", "dep:futures"]
cache = ["dep:dashmap"]
11 changes: 2 additions & 9 deletions base_engine/src/api/execute_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,7 @@ pub fn execute_aggregation(
// Assuming Front End knows which columns can be in groupby, agg etc

// Step 0.1
let f1 = data.frame();
//let tmp = f1.clone().lazy().filter(col("RiskClass").eq(lit("DRC_SecNonCTP"))).collect()?;
//dbg!(&tmp["SensWeights"]);
//let f1_cols = f1.get_column_names();

// Polars DataFrame clone is cheap:
// https://stackoverflow.com/questions/72320911/how-to-avoid-deep-copy-when-using-groupby-in-polars-rust
let mut f1 = f1.clone().lazy();
let mut f1 = data.lazy_frame().clone();

// Step 1.0 Applying FILTERS:
// TODO check if column is present in DF - ( is this "second line of defence" even needed?)
Expand Down Expand Up @@ -91,7 +84,7 @@ pub fn execute_aggregation(
let groups_fill_nulls: Vec<Expr> = groups
.clone()
.into_iter()
.map(|e| e.fill_null(lit("EMPTY")))
.map(|e| e.fill_null(lit(" ")))
.collect();

// Step 2.5 Apply GroupBy and Agg
Expand Down
142 changes: 92 additions & 50 deletions base_engine/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ use crate::{derive_measure_map, DataSourceConfig, MeasuresMap};

/// This is the default struct which implements Dataset
/// Usually a client/user would overwrite it with their own DataSet
#[derive(Debug, Default, Serialize)]
#[derive(Default)]
pub struct DataSetBase {
pub frame: DataFrame,
pub frame: LazyFrame,
pub measures: MeasuresMap,
/// build_params are used in .prepare()
pub build_params: HashMap<String, String>,
pub calc_params: Vec<CalcParameter>,
}

/// This struct is purely for DataSet descriptive purposes.
Expand All @@ -30,23 +29,47 @@ pub struct CalcParameter {
///
/// If you have your own DataSet, implement this
pub trait DataSet: Send + Sync {
fn frame(&self) -> &DataFrame;
/// Polars DataFrame clone is cheap:
/// https://stackoverflow.com/questions/72320911/how-to-avoid-deep-copy-when-using-groupby-in-polars-rust
fn lazy_frame(&self) -> &LazyFrame;

fn measures(&self) -> &MeasuresMap;
fn build(conf: DataSourceConfig) -> Self

// Cannot be defined since returns Self which is a Struct
// TODO potentially remove to keep things simple
fn from_config(conf: DataSourceConfig) -> Self
where
Self: Sized;

/// See [DataSetBase] and [CalcParameter] for description of the parameters
fn new(frame: LazyFrame, mm: MeasuresMap, build_params: HashMap<String, String>) -> Self
where
Self: Sized;

fn collect(self) -> PolarsResult<Self>
where
Self: Sized,
{
Ok(self)
}

// These methods could be overwritten.

/// Prepare runs ONCE before server starts.
/// Any computations which are common to most queries could go in here.
/// Clones
fn frame(&self) -> PolarsResult<DataFrame> {
self.lazy_frame().clone().collect()
}

/// Prepare runs BEFORE any calculations. In eager mode it runs ONCE
/// Any pre-computations which are common to all queries could go in here.
fn prepare(&mut self) {}

fn calc_params(&self) -> Vec<CalcParameter> {
vec![]
}

fn overridable_columns(&self) -> Vec<String> {
overrides_columns(self.frame())
overrides_columns(self.lazy_frame())
}
/// Validate DataSet
/// Runs once, making sure all the required columns, their contents, types etc are valid
Expand All @@ -57,27 +80,35 @@ pub trait DataSet: Send + Sync {
}

impl DataSet for DataSetBase {
fn frame(&self) -> &DataFrame {
/// Polars DataFrame clone is cheap:
/// https://stackoverflow.com/questions/72320911/how-to-avoid-deep-copy-when-using-groupby-in-polars-rust
fn lazy_frame(&self) -> &LazyFrame {
&self.frame
}
fn measures(&self) -> &MeasuresMap {
&self.measures
}
/// It's ok to clone. Function is only called upon serialisation, so very rarely
fn calc_params(&self) -> Vec<CalcParameter> {
self.calc_params.clone()
}

fn build(conf: DataSourceConfig) -> Self {
let (frames, measure_cols, build_params) = conf.build();
fn from_config(conf: DataSourceConfig) -> Self {
let (frame, measure_cols, build_params) = conf.build();
let mm: MeasuresMap = derive_measure_map(measure_cols);
Self {
frame: frames,
frame,
measures: mm,
build_params,
calc_params: vec![],
}
}
fn new(frame: LazyFrame, mm: MeasuresMap, build_params: HashMap<String, String>) -> Self {
Self {
frame,
measures: mm,
build_params,
}
}
fn collect(self) -> PolarsResult<Self> {
let lf = self.frame.collect()?.lazy();
Ok(Self { frame: lf, ..self })
}

// /// Validate Dataset contains columns
// /// files_join_attributes and attributes_join_hierarchy
Expand All @@ -89,43 +120,54 @@ impl DataSet for DataSetBase {
// fn validate(&self) {}
}

pub(crate) fn numeric_columns(df: &DataFrame) -> Vec<String> {
let mut res = vec![];
for c in df.get_columns() {
if c.dtype().is_numeric() {
res.push(c.name().to_string())
}
}
res
// TODO return Result
pub(crate) fn numeric_columns(lf: &LazyFrame) -> Vec<String> {
lf.schema().map_or_else(
|_| vec![],
|schema| {
schema
.iter_fields()
.filter(|f| f.data_type().is_numeric())
.map(|f| f.name)
.collect::<Vec<String>>()
},
)
}

pub(crate) fn utf8_columns(df: &DataFrame) -> Vec<String> {
let mut res = vec![];
for c in df.get_columns() {
if let DataType::Utf8 = c.dtype() {
res.push(c.name().to_string())
}
}
res
// TODO return Result
pub(crate) fn utf8_columns(lf: &LazyFrame) -> Vec<String> {
lf.schema().map_or_else(
|_| vec![],
|schema| {
schema
.iter_fields()
.filter(|field| matches!(field.data_type(), DataType::Utf8))
.map(|field| field.name)
.collect::<Vec<String>>()
},
)
}

/// DataTypes supported for overrides are defined in [overrides::string_to_lit]
pub(crate) fn overrides_columns(df: &DataFrame) -> Vec<String> {
let mut res = vec![];
for c in df.get_columns() {
match c.dtype() {
DataType::Utf8 | DataType::Boolean | DataType::Float64 => {
res.push(c.name().to_string())
}
DataType::List(x) => {
if let DataType::Float64 = x.as_ref() {
res.push(c.name().to_string())
}
}
_ => (),
}
}
res
pub(crate) fn overrides_columns(lf: &LazyFrame) -> Vec<String> {
//let mut res = vec![];
lf.schema().map_or_else(
|_| vec![],
|schema| {
let res = schema
.iter_fields()
.filter(|c| match c.data_type() {
DataType::Utf8 | DataType::Boolean | DataType::Float64 => true,
DataType::List(x) => {
matches!(x.as_ref(), DataType::Float64)
}
_ => false,
})
.map(|c| c.name)
.collect::<Vec<String>>();
res
},
)
}

impl Serialize for dyn DataSet {
Expand All @@ -140,7 +182,7 @@ impl Serialize for dyn DataSet {
.collect::<HashMap<&String, Option<&str>>>();

let ordered_measures: BTreeMap<_, _> = measures.iter().collect();
let utf8_cols = utf8_columns(self.frame());
let utf8_cols = utf8_columns(self.lazy_frame());
let calc_params = self.calc_params();

let mut seq = serializer.serialize_map(Some(4))?;
Expand Down
103 changes: 83 additions & 20 deletions base_engine/src/datasource/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use std::collections::HashMap;

use polars::{
prelude::{
col, DataFrame, DataType, Expr, Field, IntoLazy, JoinType, LazyCsvReader, NamedFrom, Schema,
col, concat, DataFrame, DataType, Expr, Field, JoinType, LazyCsvReader, LazyFrame, Literal,
NamedFrom, PolarsResult, Schema, NULL,
},
series::Series,
};
Expand All @@ -20,7 +21,7 @@ pub fn empty_frame(with_columns: &[String]) -> DataFrame {
}

/// reads DataFrame from path, casts cols to str and numeric cols to f64
pub fn path_to_df(path: &str, cast_to_str: &[String], cast_to_f64: &[String]) -> DataFrame {
pub fn path_to_df(path: &str, cast_to_str: &[String], cast_to_f64: &[String]) -> LazyFrame {
let mut vc = Vec::with_capacity(cast_to_str.len() + cast_to_f64.len());
for str_col in cast_to_str {
vc.push(Field::new(str_col, DataType::Utf8))
Expand All @@ -33,51 +34,57 @@ pub fn path_to_df(path: &str, cast_to_str: &[String], cast_to_f64: &[String]) ->

// if path provided, then we expect it to be of the correct format
// unrecoverable. Panic if failed to read file
let df = LazyCsvReader::new(path)
let lf = LazyCsvReader::new(path)
.has_header(true)
.with_parse_dates(true)
.with_dtype_overwrite(Some(&schema))
//.with_ignore_parser_errors(ignore)
.finish()
.and_then(|lf| lf.collect())
.unwrap_or_else(|_| panic!("Error reading file: {path}"));

df
lf
}

pub fn finish(
a2h: Vec<String>,
f2a: Vec<String>,
measures: Vec<String>,
mut df_attr: DataFrame,
df_hms: DataFrame,
mut concatinated_frame: DataFrame,
mut df_attr: LazyFrame,
df_hms: LazyFrame,
mut concatinated_frame: LazyFrame,
build_params: HashMap<String, String>,
) -> (DataFrame, Vec<Measure>, HashMap<String, String>) {
) -> (LazyFrame, Vec<Measure>, HashMap<String, String>) {
// join with hms if a2h was provided
if !a2h.is_empty() {
let a2h_expr = a2h.iter().map(|c| col(c)).collect::<Vec<Expr>>();
df_attr = df_attr.lazy()
.join(df_hms.lazy(), a2h_expr.clone(), a2h_expr, JoinType::Left)
.collect()
.expect("Could not join attributes to hms. Review attributes_join_hierarchy field in the setup");
df_attr = df_attr.join(df_hms, a2h_expr.clone(), a2h_expr, JoinType::Left)
//.collect()
//.expect("Could not join attributes to hms. Review attributes_join_hierarchy field in the setup");
}
// if files to attributes was provided
if !f2a.is_empty() {
let f2a_expr = f2a.iter().map(|c| col(c)).collect::<Vec<Expr>>();
concatinated_frame = concatinated_frame.lazy()
.join(df_attr.lazy(), f2a_expr.clone(), f2a_expr, JoinType::Outer)
.collect()
.expect("Could not join files with attributes-hms. Review files_join_attributes field in the setup");
concatinated_frame =
concatinated_frame.join(df_attr, f2a_expr.clone(), f2a_expr, JoinType::Outer)
//.collect()
//.expect("Could not join files with attributes-hms. Review files_join_attributes field in the setup");
}

// if measures were provided
let measures = if !measures.is_empty() {
let schema = concatinated_frame
.schema()
.expect("Could not extract Schema");
let fields = schema
.iter_fields()
.map(|f| f.name)
.collect::<Vec<String>>();

// Checking if each measure is present in DF
measures.iter().for_each(|col| {
concatinated_frame
.column(col)
.unwrap_or_else(|_| panic!("Column {} not found", col));
if !fields.contains(col) {
panic!("Measure: {}, is not part of the fields: {:?}", col, fields)
}
});
derive_basic_measures_vec(measures)
}
Expand All @@ -89,3 +96,59 @@ pub fn finish(

(concatinated_frame, measures, build_params)
}

/// TODO contribute to Polars
/// Concat [LazyFrame]s diagonally.
/// Calls [concat] internally.
pub fn diag_concat_lf<L: AsRef<[LazyFrame]>>(
lfs: L,
rechunk: bool,
parallel: bool,
) -> PolarsResult<LazyFrame> {
let lfs = lfs.as_ref().to_vec();
let upper_bound_width = lfs
.iter()
.map(|lf| Ok(lf.schema()?.len()))
.collect::<PolarsResult<Vec<_>>>()?
.iter()
.sum();
// Use Vec instead of a HashSet to preserve order
let mut column_names = Vec::with_capacity(upper_bound_width);
let mut total_schema = Vec::with_capacity(upper_bound_width);

for lf in lfs.iter() {
lf.schema()?.iter().for_each(|(name, dtype)| {
if !column_names.contains(name) {
column_names.push(name.clone());
total_schema.push((name.clone(), dtype.clone()))
}
});
}

let dfs = lfs
.into_iter()
.map(|mut lf| {
// Get current frame's Schema
let lf_schema = lf.schema()?;

for (name, dtype) in total_schema.iter() {
// If a name from Total Schema is not present - append
if lf_schema.get_field(name).is_none() {
lf = lf.with_column(NULL.lit().cast(dtype.clone()).alias(name))
}
}

// Now, reorder to match schema
let reordered_lf = lf.select(
column_names
.iter()
.map(|col_name| col(col_name))
.collect::<Vec<Expr>>(),
);

Ok(reordered_lf)
})
.collect::<PolarsResult<Vec<_>>>()?;

concat(dfs, rechunk, parallel)
}
Loading

0 comments on commit c7333df

Please sign in to comment.