Skip to content

Commit

Permalink
fix case of f(scalar, array) invocation
Browse files Browse the repository at this point in the history
  • Loading branch information
davidhewitt committed Jan 9, 2025
1 parent 4956e2b commit 7fc419a
Show file tree
Hide file tree
Showing 3 changed files with 240 additions and 115 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ paste = "1"
log = "0.4"

[dev-dependencies]
datafusion = { version = "44", default-features = false, features = ["nested_expressions"] }
codspeed-criterion-compat = "2.6"
criterion = "0.5.1"
clap = "4"
Expand Down
278 changes: 163 additions & 115 deletions src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,46 @@ impl From<i64> for JsonPath<'_> {
}
}

impl<'s> JsonPath<'s> {
pub fn extract_path(args: &'s [ColumnarValue]) -> Vec<Self> {
args[1..]
enum JsonPathArgs<'a> {
Array(&'a ArrayRef),
Scalars(Vec<JsonPath<'a>>),
}

impl<'s> JsonPathArgs<'s> {
fn extract_path(path_args: &'s [ColumnarValue]) -> DataFusionResult<Self> {
// If there is a single argument as an array, we know how to handle it
if let Some((ColumnarValue::Array(array), &[])) = path_args.split_first() {
return Ok(Self::Array(array));
}

path_args
.iter()
.map(|arg| match arg {
ColumnarValue::Scalar(ScalarValue::Utf8(Some(s)) | ScalarValue::LargeUtf8(Some(s))) => Self::Key(s),
ColumnarValue::Scalar(ScalarValue::UInt64(Some(i))) => (*i).into(),
ColumnarValue::Scalar(ScalarValue::Int64(Some(i))) => (*i).into(),
_ => Self::None,
.enumerate()
.map(|(pos, arg)| match arg {
ColumnarValue::Scalar(ScalarValue::Utf8(Some(s)) | ScalarValue::LargeUtf8(Some(s))) => {
Ok(JsonPath::Key(s))
}
ColumnarValue::Scalar(ScalarValue::UInt64(Some(i))) => Ok((*i).into()),
ColumnarValue::Scalar(ScalarValue::Int64(Some(i))) => Ok((*i).into()),
ColumnarValue::Scalar(
ScalarValue::Null
| ScalarValue::Utf8(None)
| ScalarValue::LargeUtf8(None)
| ScalarValue::UInt64(None)
| ScalarValue::Int64(None),
) => Ok(JsonPath::None),
ColumnarValue::Array(_) => {
// if there was a single arg, which is an array, handled above in the
// split_first case. So this is multiple args of which one is an array
exec_err!("More than 1 path element is not supported when querying JSON using an array.")
}
ColumnarValue::Scalar(arg) => exec_err!(
"Unexpected argument type at position {}, expected string or int, got {arg:?}.",
pos + 1
),
})
.collect()
.collect::<DataFusionResult<_>>()
.map(JsonPathArgs::Scalars)
}
}

Expand All @@ -116,154 +145,173 @@ pub fn invoke<C: FromIterator<Option<I>> + 'static, I>(
to_scalar: impl Fn(Option<I>) -> ScalarValue,
return_dict: bool,
) -> DataFusionResult<ColumnarValue> {
let Some(first_arg) = args.first() else {
// I think this can't happen, but I assumed the same about args[1] and I was wrong, so better to be safe
let Some((json_arg, path_args)) = args.split_first() else {
return exec_err!("expected at least one argument");
};
match first_arg {
ColumnarValue::Array(json_array) => {
let array = match args.get(1) {
Some(ColumnarValue::Array(a)) => {
if args.len() > 2 {
// TODO perhaps we could support this by zipping the arrays, but it's not trivial, #23
exec_err!("More than 1 path element is not supported when querying JSON using an array.")
} else {
invoke_array(json_array, a, to_array, jiter_find, return_dict)
}
}
Some(ColumnarValue::Scalar(_)) => scalar_apply(
json_array,
&JsonPath::extract_path(args),
to_array,
jiter_find,
return_dict,
),
None => scalar_apply(json_array, &[], to_array, jiter_find, return_dict),
};
array.map(ColumnarValue::from)

let path = JsonPathArgs::extract_path(path_args)?;
match (json_arg, path) {
(ColumnarValue::Array(json_array), JsonPathArgs::Array(path_array)) => {
invoke_array_array(json_array, path_array, to_array, jiter_find, return_dict).map(ColumnarValue::Array)
}
(ColumnarValue::Array(json_array), JsonPathArgs::Scalars(path)) => {
invoke_array_scalars(json_array, &path, to_array, jiter_find, return_dict).map(ColumnarValue::Array)
}
(ColumnarValue::Scalar(s), JsonPathArgs::Array(path_array)) => {
invoke_scalar_array(s, path_array, jiter_find, to_array)
}
(ColumnarValue::Scalar(s), JsonPathArgs::Scalars(path)) => {
invoke_scalar_scalars(s, &path, jiter_find, to_scalar)
}
ColumnarValue::Scalar(s) => invoke_scalar(s, args, jiter_find, to_scalar),
}
}

fn invoke_array<C: FromIterator<Option<I>> + 'static, I>(
fn invoke_array_array<C: FromIterator<Option<I>> + 'static, I>(
json_array: &ArrayRef,
needle_array: &ArrayRef,
path_array: &ArrayRef,
to_array: impl Fn(C) -> DataFusionResult<ArrayRef>,
jiter_find: impl Fn(Option<&str>, &[JsonPath]) -> Result<I, GetError>,
return_dict: bool,
) -> DataFusionResult<ArrayRef> {
downcast_dictionary_array!(
needle_array => match needle_array.values().data_type() {
DataType::Utf8 => zip_apply(json_array, needle_array.downcast_dict::<StringArray>().unwrap(), to_array, jiter_find, true, return_dict),
DataType::LargeUtf8 => zip_apply(json_array, needle_array.downcast_dict::<LargeStringArray>().unwrap(), to_array, jiter_find, true, return_dict),
DataType::Utf8View => zip_apply(json_array, needle_array.downcast_dict::<StringViewArray>().unwrap(), to_array, jiter_find, true, return_dict),
DataType::Int64 => zip_apply(json_array, needle_array.downcast_dict::<Int64Array>().unwrap(), to_array, jiter_find, false, return_dict),
DataType::UInt64 => zip_apply(json_array, needle_array.downcast_dict::<UInt64Array>().unwrap(), to_array, jiter_find, false, return_dict),
other => exec_err!("unexpected second argument type, expected string or int array, got {:?}", other),
},
DataType::Utf8 => zip_apply(json_array, needle_array.as_string::<i32>(), to_array, jiter_find, true, return_dict),
DataType::LargeUtf8 => zip_apply(json_array, needle_array.as_string::<i64>(), to_array, jiter_find, true, return_dict),
DataType::Utf8View => zip_apply(json_array, needle_array.as_string_view(), to_array, jiter_find, true, return_dict),
DataType::Int64 => zip_apply(json_array, needle_array.as_primitive::<Int64Type>(), to_array, jiter_find, false, return_dict),
DataType::UInt64 => zip_apply(json_array, needle_array.as_primitive::<UInt64Type>(), to_array, jiter_find, false, return_dict),
other => exec_err!("unexpected second argument type, expected string or int array, got {:?}", other)
json_array => {
let values = invoke_array_array(json_array.values(), path_array, to_array, jiter_find, return_dict)?;
post_process_dict(json_array, values, return_dict)
}
DataType::Utf8 => zip_apply(json_array.as_string::<i32>().iter(), path_array, to_array, jiter_find),
DataType::LargeUtf8 => zip_apply(json_array.as_string::<i64>().iter(), path_array, to_array, jiter_find),
DataType::Utf8View => zip_apply(json_array.as_string_view().iter(), path_array, to_array, jiter_find),
other => if let Some(string_array) = nested_json_array(json_array, is_object_lookup_array(path_array.data_type())) {
zip_apply(string_array.iter(), path_array, to_array, jiter_find)
} else {
exec_err!("unexpected json array type {:?}", other)
}
)
}

fn zip_apply<'a, P: Into<JsonPath<'a>>, C: FromIterator<Option<I>> + 'static, I>(
fn invoke_array_scalars<C: FromIterator<Option<I>>, I>(
json_array: &ArrayRef,
path_array: impl ArrayAccessor<Item = P>,
path: &[JsonPath],
to_array: impl Fn(C) -> DataFusionResult<ArrayRef>,
jiter_find: impl Fn(Option<&str>, &[JsonPath]) -> Result<I, GetError>,
object_lookup: bool,
return_dict: bool,
) -> DataFusionResult<ArrayRef> {
fn inner<'j, C: FromIterator<Option<I>>, I>(
json_iter: impl IntoIterator<Item = Option<&'j str>>,
path: &[JsonPath],
jiter_find: impl Fn(Option<&str>, &[JsonPath]) -> Result<I, GetError>,
) -> C {
json_iter
.into_iter()
.map(|opt_json| jiter_find(opt_json, path).ok())
.collect::<C>()
}

let c = downcast_dictionary_array!(
json_array => {
let values = zip_apply(json_array.values(), path_array, to_array, jiter_find, object_lookup, false)?;
let values = invoke_array_scalars(json_array.values(), path, to_array, jiter_find, false)?;
return post_process_dict(json_array, values, return_dict);
}
DataType::Utf8 => zip_apply_iter(json_array.as_string::<i32>().iter(), path_array, jiter_find),
DataType::LargeUtf8 => zip_apply_iter(json_array.as_string::<i64>().iter(), path_array, jiter_find),
DataType::Utf8View => zip_apply_iter(json_array.as_string_view().iter(), path_array, jiter_find),
other => if let Some(string_array) = nested_json_array(json_array, object_lookup) {
zip_apply_iter(string_array.iter(), path_array, jiter_find)
DataType::Utf8 => inner(json_array.as_string::<i32>(), path, jiter_find),
DataType::LargeUtf8 => inner(json_array.as_string::<i64>(), path, jiter_find),
DataType::Utf8View => inner(json_array.as_string_view(), path, jiter_find),
other => if let Some(string_array) = nested_json_array(json_array, is_object_lookup(path)) {
inner(string_array, path, jiter_find)
} else {
return exec_err!("unexpected json array type {:?}", other);
}
);

to_array(c)
}

#[allow(clippy::needless_pass_by_value)] // ArrayAccessor is implemented on references
fn zip_apply_iter<'a, 'j, P: Into<JsonPath<'a>>, C: FromIterator<Option<I>> + 'static, I>(
json_iter: impl Iterator<Item = Option<&'j str>>,
path_array: impl ArrayAccessor<Item = P>,
fn invoke_scalar_array<C: FromIterator<Option<I>> + 'static, I>(
scalar: &ScalarValue,
path_array: &ArrayRef,
jiter_find: impl Fn(Option<&str>, &[JsonPath]) -> Result<I, GetError>,
) -> C {
json_iter
.enumerate()
.map(|(i, opt_json)| {
if path_array.is_null(i) {
None
} else {
let path = path_array.value(i).into();
jiter_find(opt_json, &[path]).ok()
}
})
.collect::<C>()
to_array: impl Fn(C) -> DataFusionResult<ArrayRef>,
) -> DataFusionResult<ColumnarValue> {
let s = extract_json_scalar(scalar)?;
// TODO: possible optimization here if path_array is a dictionary; can apply against the
// dictionary values directly for less work
zip_apply(
std::iter::repeat(s).take(path_array.len()),
path_array,
to_array,
jiter_find,
)
.map(ColumnarValue::Array)
}

fn invoke_scalar<I>(
fn invoke_scalar_scalars<I>(
scalar: &ScalarValue,
args: &[ColumnarValue],
path: &[JsonPath],
jiter_find: impl Fn(Option<&str>, &[JsonPath]) -> Result<I, GetError>,
to_scalar: impl Fn(Option<I>) -> ScalarValue,
) -> DataFusionResult<ColumnarValue> {
match scalar {
ScalarValue::Dictionary(_, b) => invoke_scalar(b.as_ref(), args, jiter_find, to_scalar),
ScalarValue::Utf8(s) | ScalarValue::Utf8View(s) | ScalarValue::LargeUtf8(s) => {
let path = JsonPath::extract_path(args);
let v = jiter_find(s.as_ref().map(String::as_str), &path).ok();
Ok(ColumnarValue::Scalar(to_scalar(v)))
}
ScalarValue::Union(type_id_value, union_fields, _) => {
let opt_json = json_from_union_scalar(type_id_value.as_ref(), union_fields);
let v = jiter_find(opt_json, &JsonPath::extract_path(args)).ok();
Ok(ColumnarValue::Scalar(to_scalar(v)))
}
_ => {
exec_err!("unexpected first argument type, expected string or JSON union")
}
}
let s = extract_json_scalar(scalar)?;
let v = jiter_find(s, path).ok();
Ok(ColumnarValue::Scalar(to_scalar(v)))
}

fn scalar_apply<C: FromIterator<Option<I>>, I>(
json_array: &ArrayRef,
path: &[JsonPath],
fn zip_apply<'a, C: FromIterator<Option<I>> + 'static, I>(
json_array: impl IntoIterator<Item = Option<&'a str>>,
path_array: &ArrayRef,
to_array: impl Fn(C) -> DataFusionResult<ArrayRef>,
jiter_find: impl Fn(Option<&str>, &[JsonPath]) -> Result<I, GetError>,
return_dict: bool,
) -> DataFusionResult<ArrayRef> {
#[allow(clippy::needless_pass_by_value)] // ArrayAccessor is implemented on references
fn inner<'a, 'j, P: Into<JsonPath<'a>>, C: FromIterator<Option<I>> + 'static, I>(
json_iter: impl IntoIterator<Item = Option<&'j str>>,
path_array: impl ArrayAccessor<Item = P>,
jiter_find: impl Fn(Option<&str>, &[JsonPath]) -> Result<I, GetError>,
) -> C {
json_iter
.into_iter()
.enumerate()
.map(|(i, opt_json)| {
if path_array.is_null(i) {
None
} else {
let path = path_array.value(i).into();
jiter_find(opt_json, &[path]).ok()
}
})
.collect::<C>()
}

let c = downcast_dictionary_array!(
json_array => {
let values = scalar_apply(json_array.values(), path, to_array, jiter_find, false)?;
return post_process_dict(json_array, values, return_dict);
}
DataType::Utf8 => scalar_apply_iter(json_array.as_string::<i32>().iter(), path, jiter_find),
DataType::LargeUtf8 => scalar_apply_iter(json_array.as_string::<i64>().iter(), path, jiter_find),
DataType::Utf8View => scalar_apply_iter(json_array.as_string_view().iter(), path, jiter_find),
other => if let Some(string_array) = nested_json_array(json_array, is_object_lookup(path)) {
scalar_apply_iter(string_array.iter(), path, jiter_find)
} else {
return exec_err!("unexpected json array type {:?}", other);
}
path_array => match path_array.values().data_type() {
DataType::Utf8 => inner(json_array, path_array.downcast_dict::<StringArray>().unwrap(), jiter_find),
DataType::LargeUtf8 => inner(json_array, path_array.downcast_dict::<LargeStringArray>().unwrap(), jiter_find),
DataType::Utf8View => inner(json_array, path_array.downcast_dict::<StringViewArray>().unwrap(), jiter_find),
DataType::Int64 => inner(json_array, path_array.downcast_dict::<Int64Array>().unwrap(), jiter_find),
DataType::UInt64 => inner(json_array, path_array.downcast_dict::<UInt64Array>().unwrap(), jiter_find),
other => return exec_err!("unexpected second argument type, expected string or int array, got {:?}", other),
},
DataType::Utf8 => inner(json_array, path_array.as_string::<i32>(), jiter_find),
DataType::LargeUtf8 => inner(json_array, path_array.as_string::<i64>(), jiter_find),
DataType::Utf8View => inner(json_array, path_array.as_string_view(), jiter_find),
DataType::Int64 => inner(json_array, path_array.as_primitive::<Int64Type>(), jiter_find),
DataType::UInt64 => inner(json_array, path_array.as_primitive::<UInt64Type>(), jiter_find),
other => return exec_err!("unexpected second argument type, expected string or int array, got {:?}", other)
);

to_array(c)
}

fn extract_json_scalar(scalar: &ScalarValue) -> DataFusionResult<Option<&str>> {
match scalar {
ScalarValue::Dictionary(_, b) => extract_json_scalar(b.as_ref()),
ScalarValue::Utf8(s) | ScalarValue::Utf8View(s) | ScalarValue::LargeUtf8(s) => Ok(s.as_deref()),
ScalarValue::Union(type_id_value, union_fields, _) => {
Ok(json_from_union_scalar(type_id_value.as_ref(), union_fields))
}
_ => {
exec_err!("unexpected first argument type, expected string or JSON union")
}
}
}

/// Take a dictionary array of JSON data and an array of result values and combine them.
fn post_process_dict<T: ArrowDictionaryKeyType>(
dict_array: &DictionaryArray<T>,
Expand Down Expand Up @@ -295,12 +343,12 @@ fn is_object_lookup(path: &[JsonPath]) -> bool {
}
}

fn scalar_apply_iter<'j, C: FromIterator<Option<I>>, I>(
json_iter: impl Iterator<Item = Option<&'j str>>,
path: &[JsonPath],
jiter_find: impl Fn(Option<&str>, &[JsonPath]) -> Result<I, GetError>,
) -> C {
json_iter.map(|opt_json| jiter_find(opt_json, path).ok()).collect::<C>()
fn is_object_lookup_array(data_type: &DataType) -> bool {
match data_type {
DataType::Dictionary(_, value_type) => is_object_lookup_array(value_type),
DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => true,
_ => false,
}
}

pub fn jiter_json_find<'j>(opt_json: Option<&'j str>, path: &[JsonPath]) -> Option<(Jiter<'j>, Peek)> {
Expand Down
Loading

0 comments on commit 7fc419a

Please sign in to comment.