Skip to content

Commit

Permalink
feat: expose cid_string udf in query and flight sql
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Oct 14, 2024
1 parent 6f98819 commit b8170e8
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 141 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions arrow-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,4 @@ publish = false

[dependencies]
datafusion.workspace = true
arrow.workspace = true
cid.workspace = true
ceramic-pipeline.workspace = true
139 changes: 4 additions & 135 deletions arrow-test/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,147 +1,16 @@
//! Common utilities for testing APIs using Arrow [`RecordBatch`]es and related data structures.
#![warn(missing_docs)]

use std::{any::Any, sync::Arc};
use std::sync::Arc;

use arrow::{
array::{ArrayIter, ListBuilder, StringBuilder},
datatypes::DataType,
record_batch::RecordBatch,
};
use cid::Cid;
use ceramic_pipeline::cid_string::{CidString, CidStringList};
use datafusion::{
common::{
cast::{as_binary_array, as_list_array},
exec_datafusion_err,
},
arrow::{datatypes::DataType, record_batch::RecordBatch},
dataframe::DataFrame,
execution::context::SessionContext,
logical_expr::{
col, expr::ScalarFunction, Cast, ColumnarValue, Expr, ScalarUDF, ScalarUDFImpl, Signature,
TypeSignature, Volatility,
},
logical_expr::{col, expr::ScalarFunction, Cast, Expr, ScalarUDF},
};

/// ScalarUDF to convert a binary CID into a string for easier inspection.
#[derive(Debug)]
pub struct CidString {
signature: Signature,
}

impl Default for CidString {
fn default() -> Self {
Self::new()
}
}

impl CidString {
/// Construct new instance
pub fn new() -> Self {
Self {
signature: Signature::new(
TypeSignature::Exact(vec![DataType::Binary]),
Volatility::Immutable,
),
}
}
}

impl ScalarUDFImpl for CidString {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"cid_string"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, _args: &[DataType]) -> datafusion::common::Result<DataType> {
Ok(DataType::Utf8)
}
fn invoke(&self, args: &[ColumnarValue]) -> datafusion::common::Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(args)?;
let cids = as_binary_array(&args[0])?;
let mut strs = StringBuilder::new();
for cid in cids {
if let Some(cid) = cid {
strs.append_value(
Cid::read_bytes(cid)
.map_err(|err| exec_datafusion_err!("Error {err}"))?
.to_string(),
);
} else {
strs.append_null()
}
}
Ok(ColumnarValue::Array(Arc::new(strs.finish())))
}
}

/// ScalarUDF to convert a binary CID into a string for easier inspection.
#[derive(Debug)]
pub struct CidStringList {
signature: Signature,
}

impl Default for CidStringList {
fn default() -> Self {
Self::new()
}
}

impl CidStringList {
/// Construct new instance
pub fn new() -> Self {
Self {
signature: Signature::new(
TypeSignature::Exact(vec![DataType::new_list(DataType::Binary, true)]),
Volatility::Immutable,
),
}
}
}

impl ScalarUDFImpl for CidStringList {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"array_cid_string"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, _args: &[DataType]) -> datafusion::common::Result<DataType> {
Ok(DataType::new_list(DataType::Utf8, true))
}
fn invoke(&self, args: &[ColumnarValue]) -> datafusion::common::Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(args)?;
let all_cids = as_list_array(&args[0])?;
let mut strs = ListBuilder::new(StringBuilder::new());
for cids in ArrayIter::new(all_cids) {
if let Some(cids) = cids {
let cids = as_binary_array(&cids)?;
for cid in cids {
if let Some(cid) = cid {
strs.values().append_value(
Cid::read_bytes(cid)
.map_err(|err| exec_datafusion_err!("Error {err}"))?
.to_string(),
);
} else {
strs.values().append_null()
}
}
strs.append(true)
} else {
strs.append_null()
}
}
Ok(ColumnarValue::Array(Arc::new(strs.finish())))
}
}

/// Applies various transformations on a record batch of conclusion_feed data to make it easier to
/// read.
/// Useful in conjuction with expect_test.
Expand Down
1 change: 1 addition & 0 deletions flight/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ async-stream.workspace = true
async-trait.workspace = true
ceramic-event.workspace = true
ceramic-core.workspace = true
ceramic-pipeline.workspace = true
cid.workspace = true
datafusion-flight-sql-server.workspace = true
datafusion.workspace = true
Expand Down
5 changes: 4 additions & 1 deletion flight/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ use std::sync::Arc;
use arrow_flight::flight_service_server::FlightServiceServer;
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use async_stream::try_stream;
use ceramic_pipeline::cid_string::{CidString, CidStringList};
use datafusion::catalog::{Session, TableProvider};
use datafusion::common::exec_datafusion_err;
use datafusion::datasource::TableType;
use datafusion::execution::config::SessionConfig;
use datafusion::execution::context::{SQLOptions, SessionContext};
use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
use datafusion::logical_expr::{Expr, ScalarUDF, TableProviderFilterPushDown};
use datafusion::physical_expr::EquivalenceProperties;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{ExecutionMode, ExecutionPlan, PlanProperties};
Expand Down Expand Up @@ -42,6 +43,8 @@ pub fn new_server(
SessionConfig::new().with_default_catalog_and_schema("ceramic", "v0"),
);
ctx.register_table("conclusion_feed", Arc::new(FeedTable::new(feed)))?;
ctx.register_udf(ScalarUDF::new_from_impl(CidString::new()));
ctx.register_udf(ScalarUDF::new_from_impl(CidStringList::new()));
let svc = FlightServiceServer::new(
FlightSqlService::new(ctx.state()).with_sql_options(Some(
// Disable all access except read only queries.
Expand Down
1 change: 1 addition & 0 deletions pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ repository.workspace = true
[dependencies]
anyhow.workspace = true
arrow-flight.workspace = true
cid.workspace = true
datafusion-federation.workspace = true
datafusion-flight-sql-table-provider.workspace = true
datafusion.workspace = true
Expand Down
134 changes: 134 additions & 0 deletions pipeline/src/cid_string.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
use std::{any::Any, sync::Arc};

use cid::Cid;
use datafusion::{
arrow::{
array::{ArrayIter, ListBuilder, StringBuilder},
datatypes::DataType,
},
common::{
cast::{as_binary_array, as_list_array},
exec_datafusion_err,
},
logical_expr::{ColumnarValue, ScalarUDFImpl, Signature, TypeSignature, Volatility},
};

/// ScalarUDF to convert a binary CID into a string for easier inspection.
#[derive(Debug)]
pub struct CidString {
signature: Signature,
}

impl Default for CidString {
fn default() -> Self {
Self::new()
}
}

impl CidString {
/// Construct new instance
pub fn new() -> Self {
Self {
signature: Signature::new(
TypeSignature::Exact(vec![DataType::Binary]),
Volatility::Immutable,
),
}
}
}

impl ScalarUDFImpl for CidString {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"cid_string"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, _args: &[DataType]) -> datafusion::common::Result<DataType> {
Ok(DataType::Utf8)
}
fn invoke(&self, args: &[ColumnarValue]) -> datafusion::common::Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(args)?;
let cids = as_binary_array(&args[0])?;
let mut strs = StringBuilder::new();
for cid in cids {
if let Some(cid) = cid {
strs.append_value(
Cid::read_bytes(cid)
.map_err(|err| exec_datafusion_err!("Error {err}"))?
.to_string(),
);
} else {
strs.append_null()
}
}
Ok(ColumnarValue::Array(Arc::new(strs.finish())))
}
}

/// ScalarUDF to convert a binary CID into a string for easier inspection.
#[derive(Debug)]
pub struct CidStringList {
signature: Signature,
}

impl Default for CidStringList {
fn default() -> Self {
Self::new()
}
}

impl CidStringList {
/// Construct new instance
pub fn new() -> Self {
Self {
signature: Signature::new(
TypeSignature::Exact(vec![DataType::new_list(DataType::Binary, true)]),
Volatility::Immutable,
),
}
}
}

impl ScalarUDFImpl for CidStringList {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"array_cid_string"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, _args: &[DataType]) -> datafusion::common::Result<DataType> {
Ok(DataType::new_list(DataType::Utf8, true))
}
fn invoke(&self, args: &[ColumnarValue]) -> datafusion::common::Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(args)?;
let all_cids = as_list_array(&args[0])?;
let mut strs = ListBuilder::new(StringBuilder::new());
for cids in ArrayIter::new(all_cids) {
if let Some(cids) = cids {
let cids = as_binary_array(&cids)?;
for cid in cids {
if let Some(cid) = cid {
strs.values().append_value(
Cid::read_bytes(cid)
.map_err(|err| exec_datafusion_err!("Error {err}"))?
.to_string(),
);
} else {
strs.values().append_null()
}
}
strs.append(true)
} else {
strs.append_null()
}
}
Ok(ColumnarValue::Array(Arc::new(strs.finish())))
}
}
10 changes: 9 additions & 1 deletion pipeline/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Pipeline provides a set of tables of Ceramic events and transformations between them.

pub mod cid_string;
#[warn(missing_docs)]
mod config;
pub mod schemas;
Expand All @@ -8,12 +9,14 @@ use std::{any::Any, sync::Arc};

use anyhow::{anyhow, Result};
use arrow_flight::sql::client::FlightSqlServiceClient;
use cid_string::{CidString, CidStringList};
use datafusion::{
catalog::{CatalogProvider, SchemaProvider},
datasource::{file_format::parquet::ParquetFormat, listing::ListingOptions},
error::DataFusionError,
execution::context::SessionContext,
logical_expr::col,
functions_aggregate::first_last::LastValue,
logical_expr::{col, AggregateUDF, ScalarUDF},
};
use datafusion_federation::sql::{SQLFederationProvider, SQLSchemaProvider};
use datafusion_flight_sql_table_provider::FlightSQLExecutor;
Expand All @@ -39,6 +42,11 @@ pub async fn session_from_config(config: impl Into<Config>) -> Result<SessionCon
// Create datafusion context
let ctx = SessionContext::new_with_state(state);

// Register various UDxFs
ctx.register_udaf(AggregateUDF::new_from_impl(LastValue::default()));
ctx.register_udf(ScalarUDF::new_from_impl(CidString::new()));
ctx.register_udf(ScalarUDF::new_from_impl(CidStringList::new()));

// Register s3 object store
let bucket = config
.aws_s3_builder
Expand Down

0 comments on commit b8170e8

Please sign in to comment.