Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Experimental unity catalog client #20798

Merged
merged 5 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test-pyodide.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:

- name: Disable incompatible features
env:
FEATURES: parquet|async|json|extract_jsonpath|cloud|polars_cloud|tokio|clipboard|decompress|new_streaming
FEATURES: parquet|async|json|extract_jsonpath|catalog|cloud|polars_cloud|tokio|clipboard|decompress|new_streaming
run: |
sed -i 's/^ "json",$/ "serde_json",/' crates/polars-python/Cargo.toml
sed -E -i "/^ \"(${FEATURES})\",$/d" crates/polars-python/Cargo.toml py-polars/Cargo.toml
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ serde = { workspace = true, features = ["rc"], optional = true }
serde_json = { version = "1", optional = true }
simd-json = { workspace = true, optional = true }
simdutf8 = { workspace = true, optional = true }
strum_macros = { workspace = true, optional = true }
tokio = { workspace = true, features = ["fs", "net", "rt-multi-thread", "time", "sync"], optional = true }
tokio-util = { workspace = true, features = ["io", "io-util"], optional = true }
url = { workspace = true, optional = true }
Expand All @@ -59,6 +60,7 @@ home = "0.5.4"
tempfile = "3"

[features]
catalog = ["cloud", "serde", "reqwest", "futures", "strum_macros"]
default = ["decompress"]
# support for arrows json parsing
json = [
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-io/src/catalog/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod schema;
pub mod unity;
265 changes: 265 additions & 0 deletions crates/polars-io/src/catalog/schema.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
use polars_core::prelude::{DataType, Field};
use polars_core::schema::{Schema, SchemaRef};
use polars_error::{polars_bail, polars_err, PolarsResult};
use polars_utils::pl_str::PlSmallStr;

use super::unity::models::TableInfo;

/// Returns `(schema, hive_schema)`
pub fn table_info_to_schemas(
table_info: &TableInfo,
) -> PolarsResult<(Option<SchemaRef>, Option<SchemaRef>)> {
let Some(columns) = table_info.columns.as_deref() else {
return Ok((None, None));
};

let mut schema = Schema::default();
let mut hive_schema = Schema::default();

for (i, col) in columns.iter().enumerate() {
let dtype = parse_type_str(&col.type_text)?;

if let Some(position) = col.position {
if usize::try_from(position).unwrap() != i {
polars_bail!(
ComputeError:
"not yet supported: position was not ordered"
)
}
}

if let Some(i) = col.partition_index {
if usize::try_from(i).unwrap() != hive_schema.len() {
polars_bail!(
ComputeError:
"not yet supported: partition_index was not ordered"
)
}

hive_schema.extend([Field::new(col.name.as_str().into(), dtype)]);
} else {
schema.extend([Field::new(col.name.as_str().into(), dtype)])
}
}

Ok((
Some(schema.into()),
Some(hive_schema)
.filter(|x| !x.is_empty())
.map(|x| x.into()),
))
}

/// Parse a type string from a catalog API response.
///
/// References:
/// * https://spark.apache.org/docs/latest/sql-ref-datatypes.html
/// * https://docs.databricks.com/api/workspace/tables/get
/// * https://docs.databricks.com/en/sql/language-manual/sql-ref-datatypes.html
///
/// Note: `type_precision` and `type_scale` in the API response are defined as supplementary data to
/// the `type_text`, but from testing they aren't actually used - e.g. a decimal type would have a
/// `type_text` of `decimal(18, 2)`
fn parse_type_str(type_text: &str) -> PolarsResult<DataType> {
use DataType::*;

let dtype = match type_text {
"boolean" => Boolean,

"byte" | "tinyint" => Int8,
"short" | "smallint" => Int16,
"int" | "integer" => Int32,
"long" | "bigint" => Int64,

"float" | "real" => Float32,
"double" => Float64,

"date" => Date,
"timestamp" | "timestamp_ltz" | "timestamp_ntz" => {
Datetime(polars_core::prelude::TimeUnit::Nanoseconds, None)
},

"string" => String,
"binary" => Binary,

"null" | "void" => Null,

v => {
if v.starts_with("decimal") {
// e.g. decimal(38,18)
(|| {
let (precision, scale) = v
.get(7..)?
.strip_prefix('(')?
.strip_suffix(')')?
.split_once(',')?;
let precision: usize = precision.parse().ok()?;
let scale: usize = scale.parse().ok()?;

Some(DataType::Decimal(Some(precision), Some(scale)))
})()
.ok_or_else(|| {
polars_err!(
ComputeError:
"type format did not match decimal(int,int): {}",
v
)
})?
} else if v.starts_with("array") {
// e.g. array<int>
DataType::List(Box::new(parse_type_str(extract_angle_brackets_inner(
v, "array",
)?)?))
} else if v.starts_with("struct") {
parse_struct_type_str(v)?
} else if v.starts_with("map") {
// e.g. map<int,string>
let inner = extract_angle_brackets_inner(v, "map")?;
let (key_type_str, value_type_str) = split_comma_nesting_aware(inner);
DataType::List(Box::new(DataType::Struct(vec![
Field::new(
PlSmallStr::from_static("key"),
parse_type_str(key_type_str)?,
),
Field::new(
PlSmallStr::from_static("value"),
parse_type_str(value_type_str)?,
),
])))
} else {
polars_bail!(
ComputeError:
"parse_type_str unknown type name: {}",
v
)
}
},
};

Ok(dtype)
}

/// `array<inner> -> inner`
fn extract_angle_brackets_inner<'a>(value: &'a str, name: &'static str) -> PolarsResult<&'a str> {
let i = value.find('<');
let j = value.rfind('>');

if i.is_none() || j.is_none() || i.unwrap().saturating_add(1) >= j.unwrap() {
polars_bail!(
ComputeError:
"type format did not match {}<...>: {}",
name, value
)
}

let i = i.unwrap();
let j = j.unwrap();

let inner = value[i + 1..j].trim();

Ok(inner)
}

/// `struct<default:decimal(38,18),promotional:struct<default:decimal(38,18)>,effective_list:struct<default:decimal(38,18)>>`
fn parse_struct_type_str(value: &str) -> PolarsResult<DataType> {
let orig_value = value;
let mut value = extract_angle_brackets_inner(value, "struct")?;

let mut fields = vec![];

while !value.is_empty() {
let (field_str, new_value) = split_comma_nesting_aware(value);
value = new_value;

let (name, dtype_str) = field_str.split_once(':').ok_or_else(|| {
polars_err!(
ComputeError:
"type format did not match struct<name:type,..>: {}",
orig_value
)
})?;

let dtype = parse_type_str(dtype_str)?;

fields.push(Field::new(name.into(), dtype));
}

Ok(DataType::Struct(fields))
}

/// `default:decimal(38,18),promotional:struct<default:decimal(38,18)>` ->
/// * 1: `default:decimal(38,18)`
/// * 2: `struct<default:decimal(38,18)>`
///
/// If there are no splits, returns the full string and an empty string.
fn split_comma_nesting_aware(value: &str) -> (&str, &str) {
let mut bracket_level = 0usize;
let mut angle_bracket_level = 0usize;

for (i, b) in value.as_bytes().iter().enumerate() {
match b {
b'(' => bracket_level += 1,
b')' => bracket_level = bracket_level.saturating_sub(1),
b'<' => angle_bracket_level += 1,
b'>' => angle_bracket_level = angle_bracket_level.saturating_sub(1),
b',' if bracket_level == 0 && angle_bracket_level == 0 => {
return (&value[..i], &value[1 + i..])
},
_ => {},
}
}

(value, &value[value.len()..])
}

#[cfg(test)]
mod tests {
#[test]
fn test_parse_type_str_nested_struct() {
use super::{parse_type_str, DataType, Field};

let type_str = "struct<default:decimal(38,18),promotional:struct<default:decimal(38,18)>,effective_list:struct<default:decimal(38,18)>>";
let dtype = parse_type_str(type_str).unwrap();

use DataType::*;

assert_eq!(
dtype,
Struct(vec![
Field::new("default".into(), Decimal(Some(38), Some(18))),
Field::new(
"promotional".into(),
Struct(vec![Field::new(
"default".into(),
Decimal(Some(38), Some(18))
)])
),
Field::new(
"effective_list".into(),
Struct(vec![Field::new(
"default".into(),
Decimal(Some(38), Some(18))
)])
)
])
);
}

#[test]
fn test_parse_type_str_map() {
use super::{parse_type_str, DataType, Field};

let type_str = "map<array<int>,array<decimal(18,2)>>";
let dtype = parse_type_str(type_str).unwrap();

use DataType::*;

assert_eq!(
dtype,
List(Box::new(Struct(vec![
Field::new("key".into(), List(Box::new(Int32))),
Field::new("value".into(), List(Box::new(Decimal(Some(18), Some(2)))))
])))
);
}
}
Loading
Loading