Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support sleep function #5448

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/common/function/src/function_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::scalars::expression::ExpressionFunction;
use crate::scalars::json::JsonFunction;
use crate::scalars::matches::MatchesFunction;
use crate::scalars::math::MathFunction;
use crate::scalars::sleep::SleepFunction;
use crate::scalars::timestamp::TimestampFunction;
use crate::scalars::vector::VectorFunction;
use crate::system::SystemFunction;
Expand Down Expand Up @@ -122,6 +123,9 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
// Vector related functions
VectorFunction::register(&function_registry);

// Sleep function
SleepFunction::register(&function_registry);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm hesitant about whether to provide a sleep function. We may only provide a sleep function under the admin statement if we really need this.

admin sleep(60);

If our final goal is to support kill query, the sleep function may not be the first thing we need to implement. We can implement a way to cancel a query first.


// Geo functions
#[cfg(feature = "geo")]
crate::scalars::geo::GeoFunctions::register(&function_registry);
Expand Down
1 change: 1 addition & 0 deletions src/common/function/src/scalars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod geo;
pub mod json;
pub mod matches;
pub mod math;
pub mod sleep;
pub mod vector;

#[cfg(test)]
Expand Down
170 changes: 170 additions & 0 deletions src/common/function/src/scalars/sleep.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::time::Duration;
use std::{fmt, thread};

use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::{Signature, Volatility};
use datatypes::prelude::ConcreteDataType;
use datatypes::value::Value;
use datatypes::vectors::{Int64Vector, VectorRef};
use snafu::ensure;

use crate::function::{Function, FunctionContext};
use crate::function_registry::FunctionRegistry;

/// Sleep function that pauses execution for specified seconds
#[derive(Clone, Debug, Default)]
pub(crate) struct SleepFunction;

impl SleepFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register(Arc::new(SleepFunction));
}
}

const NAME: &str = "sleep";

impl Function for SleepFunction {
fn name(&self) -> &str {
NAME
}

fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::int64_datatype())
}

fn signature(&self) -> Signature {
// Accept int32, int64 and float64 types
Signature::uniform(
1,
vec![
ConcreteDataType::int32_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::float64_datatype(),
],
Volatility::Volatile,
)
}

fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly one, have: {}",
columns.len()
),
}
);

let vector = &columns[0];
let mut result = Vec::with_capacity(vector.len());

for i in 0..vector.len() {
let secs = match vector.get(i) {
Value::Int64(x) => x as f64,
Value::Int32(x) => x as f64,
Value::Float64(x) => x.into_inner(),
_ => {
result.push(None);
continue;
}
};
// Sleep for the specified seconds TODO: use tokio::time::sleep when the scalars are async
thread::sleep(Duration::from_secs_f64(secs));
Comment on lines +87 to +88
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest not to sleep in a scalar function. This pauses the runtime thread which is risky.

result.push(Some(secs as i64));
}

Ok(Arc::new(Int64Vector::from(result)))
}
}

impl fmt::Display for SleepFunction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "SLEEP")
}
}

#[cfg(test)]
mod tests {
use std::time::Instant;

use datatypes::value::Value;
use datatypes::vectors::{Float64Vector, Int32Vector};

use super::*;

#[test]
fn test_sleep() {
let f = SleepFunction;
assert_eq!("sleep", f.name());
assert_eq!(
ConcreteDataType::int64_datatype(),
f.return_type(&[]).unwrap()
);

let times = vec![Some(1_i64), None, Some(2_i64)];
let args: Vec<VectorRef> = vec![Arc::new(Int64Vector::from(times.clone()))];

let start = Instant::now();
let vector = f.eval(FunctionContext::default(), &args).unwrap();
let elapsed = start.elapsed();

assert_eq!(3, vector.len());
assert!(elapsed.as_secs() >= 3); // Should sleep for total of 3 seconds

assert_eq!(vector.get(0), Value::Int64(1));
assert_eq!(vector.get(1), Value::Null);
assert_eq!(vector.get(2), Value::Int64(2));
}

#[test]
fn test_sleep_float64() {
let f = SleepFunction;
let times = vec![Some(0.5_f64), None, Some(1.5_f64)];
let args: Vec<VectorRef> = vec![Arc::new(Float64Vector::from(times))];

let start = Instant::now();
let vector = f.eval(FunctionContext::default(), &args).unwrap();
let elapsed = start.elapsed();

assert_eq!(3, vector.len());
assert!(elapsed.as_secs_f64() >= 2.0);

assert_eq!(vector.get(0), Value::Int64(0));
assert_eq!(vector.get(1), Value::Null);
assert_eq!(vector.get(2), Value::Int64(1));
}

#[test]
fn test_sleep_int32() {
let f = SleepFunction;
let times = vec![Some(1_i32), None, Some(2_i32)];
let args: Vec<VectorRef> = vec![Arc::new(Int32Vector::from(times))];

let start = Instant::now();
let vector = f.eval(FunctionContext::default(), &args).unwrap();
let elapsed = start.elapsed();

assert_eq!(3, vector.len());
assert!(elapsed.as_secs() >= 3);

assert_eq!(vector.get(0), Value::Int64(1));
assert_eq!(vector.get(1), Value::Null);
assert_eq!(vector.get(2), Value::Int64(2));
}
}
8 changes: 8 additions & 0 deletions src/servers/src/postgres/fixtures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,17 @@ pub(crate) fn process<'a>(query: &str, query_ctx: QueryContextRef) -> Option<Vec

static LIMIT_CAST_PATTERN: Lazy<Regex> =
Lazy::new(|| Regex::new("(?i)(LIMIT\\s+\\d+)::bigint").unwrap());

static PG_SLEEP_PATTERN: Lazy<Regex> =
Lazy::new(|| Regex::new("(?i)pg_sleep\\s*\\((.*?)\\)").unwrap());

pub(crate) fn rewrite_sql(query: &str) -> Cow<'_, str> {
//TODO(sunng87): remove this when we upgraded datafusion to 43 or newer
let query = LIMIT_CAST_PATTERN.replace_all(query, "$1");

// tricky way to support both sleep in mysql and pg_sleep in postgres
let query = PG_SLEEP_PATTERN.replace_all(&query, "sleep($1)");

// DBeaver tricky replacement for datafusion not support sql
// TODO: add more here
query
Expand Down
40 changes: 40 additions & 0 deletions tests/cases/standalone/common/function/sleep.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
select sleep(0.1);

+---------------------+
| sleep(Float64(0.1)) |
+---------------------+
| 0 |
+---------------------+

select sleep(1) as a;

+---+
| a |
+---+
| 1 |
+---+

-- should fail it is for postgres
select pg_sleep(0.1);

Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Invalid function 'pg_sleep'.
Did you mean 'sleep'?

-- SQLNESS PROTOCOL POSTGRES
select pg_sleep(0.5);

+---------------------+
| sleep(Float64(0.5)) |
+---------------------+
| 0 |
+---------------------+

-- SQLNESS PROTOCOL POSTGRES
select pg_sleep(2) as b;

+---+
| b |
+---+
| 2 |
+---+

12 changes: 12 additions & 0 deletions tests/cases/standalone/common/function/sleep.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
select sleep(0.1);

select sleep(1) as a;

-- should fail it is for postgres
select pg_sleep(0.1);

-- SQLNESS PROTOCOL POSTGRES
select pg_sleep(0.5);

-- SQLNESS PROTOCOL POSTGRES
select pg_sleep(2) as b;
Loading