Skip to content

Commit

Permalink
feat(query): Support access Redis data from dictionaries via the `dic…
Browse files Browse the repository at this point in the history
…t_get` function. (#16389)

* feat: dict_get to redis.

* fix

* fix: resolve_dict_get

* fix

* fix

* fix

* fix and fmt

* feat: add transform_dict_get

* feat: add lazy_static

* feat: add key's type Number and prepare to update transform more.

* update: bind_create_dictionary

* update: transform default expr

* fix: transform and binder and type_check

* update: operator & cache.

* fix

* fix: operator

* fix: transform--operators

* update: transform&resolve&argument

* fix

* update: binder & test.

* feat: redis-server & test.

* fmt

* update: test.

* fix: binder & test.

* fix: binder & test.

* update: errorcode & test

* fix

* fix

* fix

* fix

* update: binder.

* fix

* fix: schema & transform

* merge

* update:test

* fix

* fix

* fix.

* update: binder & transform.

* fix: dict_get test

* update.

* update
  • Loading branch information
Winnie-Hong0927 authored Sep 9, 2024
1 parent a0940db commit a7e4173
Show file tree
Hide file tree
Showing 20 changed files with 926 additions and 67 deletions.
55 changes: 55 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ opendal = { version = "0.49.0", features = [
"services-moka",
"services-webhdfs",
"services-huggingface",
"services-redis",
] }
openraft = { git = "https://github.com/drmingdrmer/openraft", tag = "v0.10.0-alpha.6", features = [
"serde",
Expand Down
7 changes: 1 addition & 6 deletions src/common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,12 +386,7 @@ build_exceptions! {
// dictionary
DictionaryAlreadyExists(3113),
UnknownDictionary(3114),
UnknownDictionaryId(3115),
UnsupportedDictionaryOption(3116),
UnsupportedDictionarySource(3117),
MissingDictionaryOption(3118),
WrongDictionaryFieldExpr(3119),

DictionarySourceError(3115),
// Procedure
UnknownProcedure(3130),
ProcedureAlreadyExists(3131),
Expand Down
1 change: 1 addition & 0 deletions src/common/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub use config::ShareTableConfig;
pub use config::StorageConfig;

mod operator;
pub use operator::build_operator;
pub use operator::init_operator;
pub use operator::DataOperator;

Expand Down
43 changes: 43 additions & 0 deletions src/meta/app/src/schema/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use std::sync::Arc;

use chrono::DateTime;
use chrono::Utc;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::TableSchema;

use super::dictionary_name_ident::DictionaryNameIdent;
Expand Down Expand Up @@ -77,6 +79,47 @@ impl Default for DictionaryMeta {
}
}

impl DictionaryMeta {
pub fn build_sql_connection_url(&self) -> Result<String> {
let username = self
.options
.get("username")
.ok_or_else(|| ErrorCode::BadArguments("Miss option `username`"))?;
let password = self
.options
.get("password")
.ok_or_else(|| ErrorCode::BadArguments("Miss option `password`"))?;
let host = self
.options
.get("host")
.ok_or_else(|| ErrorCode::BadArguments("Miss option `host`"))?;
let port = self
.options
.get("port")
.ok_or_else(|| ErrorCode::BadArguments("Miss option `port`"))?;
let db = self
.options
.get("db")
.ok_or_else(|| ErrorCode::BadArguments("Miss option `db`"))?;
Ok(format!(
"mysql://{}:{}@{}:{}/{}",
username, password, host, port, db
))
}

pub fn build_redis_connection_url(&self) -> Result<String> {
let host = self
.options
.get("host")
.ok_or_else(|| ErrorCode::BadArguments("Miss option `host`"))?;
let port = self
.options
.get("port")
.ok_or_else(|| ErrorCode::BadArguments("Miss option `port`"))?;
Ok(format!("tcp://{}:{}", host, port))
}
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct CreateDictionaryReq {
pub dictionary_ident: DictionaryNameIdent,
Expand Down
2 changes: 1 addition & 1 deletion src/query/functions/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub fn is_builtin_function(name: &str) -> bool {
#[ctor]
pub static BUILTIN_FUNCTIONS: FunctionRegistry = builtin_functions();

pub const ASYNC_FUNCTIONS: [&str; 1] = ["nextval"];
pub const ASYNC_FUNCTIONS: [&str; 2] = ["nextval", "dict_get"];

pub const GENERAL_WINDOW_FUNCTIONS: [&str; 13] = [
"row_number",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,13 @@ impl PipelineBuilder {
pub(crate) fn build_async_function(&mut self, async_function: &AsyncFunction) -> Result<()> {
self.build_pipeline(&async_function.input)?;

let operators = TransformAsyncFunction::init_operators(&async_function.async_func_descs)?;
self.main_pipeline.add_async_transformer(|| {
TransformAsyncFunction::new(self.ctx.clone(), async_function.async_func_descs.clone())
TransformAsyncFunction::new(
self.ctx.clone(),
async_function.async_func_descs.clone(),
operators.clone(),
)
});

Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod transform_block_compact_no_split;
mod transform_cache_scan;
mod transform_cast_schema;
mod transform_create_sets;
mod transform_dictionary;
mod transform_expression_scan;
mod transform_filter;
mod transform_limit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::sync::Arc;

use databend_common_exception::Result;
Expand All @@ -25,21 +26,29 @@ use databend_common_meta_app::schema::GetSequenceNextValueReq;
use databend_common_meta_app::schema::SequenceIdent;
use databend_common_pipeline_transforms::processors::AsyncTransform;
use databend_common_storages_fuse::TableContext;
use opendal::Operator;

use crate::sessions::QueryContext;
use crate::sql::executor::physical_plans::AsyncFunctionDesc;
use crate::sql::plans::AsyncFunctionArgument;

pub struct TransformAsyncFunction {
ctx: Arc<QueryContext>,
// key is the index of async_func_desc
pub(crate) operators: BTreeMap<usize, Arc<Operator>>,
async_func_descs: Vec<AsyncFunctionDesc>,
}

impl TransformAsyncFunction {
pub fn new(ctx: Arc<QueryContext>, async_func_descs: Vec<AsyncFunctionDesc>) -> Self {
pub fn new(
ctx: Arc<QueryContext>,
async_func_descs: Vec<AsyncFunctionDesc>,
operators: BTreeMap<usize, Arc<Operator>>,
) -> Self {
Self {
ctx,
async_func_descs,
operators,
}
}

Expand Down Expand Up @@ -80,7 +89,7 @@ impl AsyncTransform for TransformAsyncFunction {

#[async_backtrace::framed]
async fn transform(&mut self, mut data_block: DataBlock) -> Result<DataBlock> {
for async_func_desc in &self.async_func_descs {
for (i, async_func_desc) in self.async_func_descs.iter().enumerate() {
match &async_func_desc.func_arg {
AsyncFunctionArgument::SequenceFunction(sequence_name) => {
self.transform_sequence(
Expand All @@ -90,9 +99,18 @@ impl AsyncTransform for TransformAsyncFunction {
)
.await?;
}
AsyncFunctionArgument::DictGetFunction(dict_arg) => {
self.transform_dict_get(
i,
&mut data_block,
dict_arg,
&async_func_desc.arg_indices,
&async_func_desc.data_type,
)
.await?;
}
}
}

Ok(data_block)
}
}
Loading

0 comments on commit a7e4173

Please sign in to comment.