diff --git a/Cargo.lock b/Cargo.lock index 3131f51a9008..80aeebcb0389 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -609,7 +609,7 @@ version = "0.2.6" source = "git+https://github.com/datafuse-extras/async-backtrace.git?rev=dea4553#dea4553c47ff2946684f7efb295a4105d5627fac" dependencies = [ "async-backtrace-attributes", - "dashmap", + "dashmap 5.5.3", "futures", "itertools 0.10.5", "loom", @@ -1481,6 +1481,18 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "bb8" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89aabfae550a5c44b43ab941844ffcd2e993cb6900b342debf59e9ea74acdb8" +dependencies = [ + "async-trait", + "futures-util", + "parking_lot 0.12.3", + "tokio", +] + [[package]] name = "beef" version = "0.5.2" @@ -3083,6 +3095,20 @@ dependencies = [ "serde", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core 0.9.10", +] + [[package]] name = "data-encoding" version = "2.6.0" @@ -3108,7 +3134,7 @@ dependencies = [ "databend-storages-common-table-meta", "limits-rs", "log", - "opendal 0.49.2", + "opendal 0.50.1", "serde", "serde_json", "serfig", @@ -3179,7 +3205,7 @@ dependencies = [ "lz4", "num", "num-traits", - "opendal 0.49.2", + "opendal 0.50.1", "ordered-float 4.2.2", "proptest", "quanta 0.11.1", @@ -3272,6 +3298,8 @@ dependencies = [ "rand 0.8.5", "regex", "replace_with", + "reqwest", + "reqwest-hickory-resolver", "rustix 0.38.37", "semver", "serde", @@ -3313,7 +3341,7 @@ dependencies = [ "async-backtrace", "async-trait", "chrono", - "dashmap", + "dashmap 5.5.3", "databend-common-arrow", "databend-common-ast", "databend-common-base", @@ -3427,7 +3455,7 @@ dependencies = [ "geos", "geozero 0.13.0", "http 1.1.0", - "opendal 0.49.2", + "opendal 0.50.1", "parquet", "paste", "prost 0.12.6", @@ -3457,7 +3485,7 @@ dependencies = [ "chrono", "chrono-tz 0.8.6", "comfy-table 6.2.0", - "dashmap", + "dashmap 5.5.3", "databend-common-arrow", "databend-common-ast", "databend-common-base", @@ -3764,7 +3792,7 @@ dependencies = [ "maplit", "num-derive", "num-traits", - "opendal 0.49.2", + "opendal 0.50.1", "paste", "prost 0.12.6", "serde", @@ -4017,7 +4045,7 @@ dependencies = [ "flate2", "futures", "lz4", - "opendal 0.49.2", + "opendal 0.50.1", "parquet-format-safe", "rand 0.8.5", "seq-macro", @@ -4152,7 +4180,7 @@ version = "0.1.0" dependencies = [ "async-backtrace", "chrono-tz 0.8.6", - "dashmap", + "dashmap 5.5.3", "databend-common-ast", "databend-common-base", "databend-common-config", @@ -4185,7 +4213,7 @@ dependencies = [ "cidr", "cron", "ctor 0.2.8", - "dashmap", + "dashmap 5.5.3", "databend-common-ast", "databend-common-base", "databend-common-catalog", @@ -4219,7 +4247,7 @@ dependencies = [ "log", "num-derive", "num-traits", - "opendal 0.49.2", + "opendal 0.50.1", "parking_lot 0.12.3", "percent-encoding", "prqlc", @@ -4243,7 +4271,7 @@ dependencies = [ "arrow-schema", "async-backtrace", "chrono", - "dashmap", + "dashmap 5.5.3", "databend-common-arrow", "databend-common-auth", "databend-common-base", @@ -4254,12 +4282,10 @@ dependencies = [ "flagset", "futures", "log", - "opendal 0.49.2", + "opendal 0.50.1", "parquet", "prometheus-client", "regex", - "reqwest", - "reqwest-hickory-resolver", "serde", "thiserror", ] @@ -4286,6 +4312,8 @@ dependencies = [ "fastrace", "match-template", "object_store_opendal", + "opendal 0.49.2", + "opendal_compat", "parquet", "serde", "serde_json", @@ -4299,7 +4327,7 @@ name = "databend-common-storages-factory" version = "0.1.0" dependencies = [ "async-trait", - "dashmap", + "dashmap 5.5.3", "databend-common-catalog", "databend-common-config", "databend-common-exception", @@ -4366,7 +4394,7 @@ dependencies = [ "itertools 0.10.5", "jsonb", "log", - "opendal 0.49.2", + "opendal 0.50.1", "parquet", "rand 0.8.5", "serde", @@ -4413,7 +4441,7 @@ dependencies = [ "futures", "hive_metastore", "log", - "opendal 0.49.2", + "opendal 0.50.1", "parquet", "recursive", "serde", @@ -4512,7 +4540,7 @@ dependencies = [ "async-trait", "bytes", "chrono", - "dashmap", + "dashmap 5.5.3", "databend-common-base", "databend-common-catalog", "databend-common-exception", @@ -4527,7 +4555,7 @@ dependencies = [ "databend-storages-common-table-meta", "futures-util", "log", - "opendal 0.49.2", + "opendal 0.50.1", "orc-rust", "serde", "serde_json", @@ -4564,7 +4592,7 @@ dependencies = [ "ethnum", "futures", "log", - "opendal 0.49.2", + "opendal 0.50.1", "parquet", "rand 0.8.5", "serde", @@ -4611,7 +4639,7 @@ dependencies = [ "databend-common-storages-parquet", "databend-storages-common-blocks", "databend-storages-common-table-meta", - "opendal 0.49.2", + "opendal 0.50.1", "parquet", "serde", "serde_json", @@ -4652,7 +4680,7 @@ dependencies = [ "enum-as-inner 0.6.0", "futures", "log", - "opendal 0.49.2", + "opendal 0.50.1", "parquet", "serde", "serde_json", @@ -4716,7 +4744,7 @@ dependencies = [ "jsonb", "log", "once_cell", - "opendal 0.49.2", + "opendal 0.50.1", "parking_lot 0.12.3", "regex", "serde", @@ -4927,7 +4955,7 @@ dependencies = [ "aws-sdk-s3", "chrono", "chrono-tz 0.8.6", - "dashmap", + "dashmap 5.5.3", "databend-common-ast", "databend-common-base", "databend-common-building", @@ -4975,7 +5003,7 @@ dependencies = [ "jsonb", "jwt-simple 0.11.9", "log", - "opendal 0.49.2", + "opendal 0.50.1", "tantivy", "tempfile", ] @@ -5158,7 +5186,7 @@ dependencies = [ "config", "criterion", "ctor 0.2.8", - "dashmap", + "dashmap 5.5.3", "databend-common-arrow", "databend-common-ast", "databend-common-base", @@ -5250,7 +5278,7 @@ dependencies = [ "naive-cityhash", "num", "num_cpus", - "opendal 0.49.2", + "opendal 0.50.1", "opensrv-mysql", "opentelemetry", "opentelemetry_sdk", @@ -5453,7 +5481,7 @@ dependencies = [ "fastrace", "futures", "log", - "opendal 0.49.2", + "opendal 0.50.1", ] [[package]] @@ -5597,12 +5625,13 @@ checksum = "da692b8d1080ea3045efaab14434d40468c3d8657e42abddfffca87b428f4c1b" [[package]] name = "delta_kernel" -version = "0.1.1" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1ddfe35af3696786ab5f23cd995df33a66f6cff272ac1f85e09c1a6316acd4c" +checksum = "fa08a82239f51e6d3d249c38f0f5bf7c8a78b28587e1b466893c9eac84d252d8" dependencies = [ "arrow-arith", "arrow-array", + "arrow-cast", "arrow-json", "arrow-ord", "arrow-schema", @@ -5620,6 +5649,7 @@ dependencies = [ "rustc_version", "serde", "serde_json", + "strum 0.26.3", "thiserror", "tracing", "url", @@ -5630,9 +5660,9 @@ dependencies = [ [[package]] name = "delta_kernel_derive" -version = "0.1.1" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4d2127a34b12919a6bce08225f0ca6fde8a19342a32675370edfc8795e7c38a" +checksum = "ec5c4fb5b59b1bd55ed8ebcf941f27a327d600c19a4a4103546846c358be93ff" dependencies = [ "proc-macro2", "quote", @@ -5641,16 +5671,18 @@ dependencies = [ [[package]] name = "deltalake" -version = "0.18.0" -source = "git+https://github.com/delta-io/delta-rs?rev=57795da#57795da9d9cc86a460a5888713adfb3d0584b4cc" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0cdaf5eed6cf6be7d94ce89ee9d7325324fc7c6c0b1ca8b911b0a5d95f6b1af5" dependencies = [ "deltalake-core", ] [[package]] name = "deltalake-core" -version = "0.18.0" -source = "git+https://github.com/delta-io/delta-rs?rev=57795da#57795da9d9cc86a460a5888713adfb3d0584b4cc" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ebab21c8c8820f9accb3ee74cc6ab7d930adf44323f39b7a764fd34e34aa7f4" dependencies = [ "arrow", "arrow-arith", @@ -5667,7 +5699,7 @@ dependencies = [ "bytes", "cfg-if", "chrono", - "dashmap", + "dashmap 6.1.0", "delta_kernel", "either", "errno", @@ -5693,10 +5725,12 @@ dependencies = [ "roaring", "serde", "serde_json", + "sqlparser 0.51.0", "thiserror", "tokio", "tracing", "url", + "urlencoding", "uuid", "z85", ] @@ -6194,6 +6228,16 @@ dependencies = [ "syn 2.0.58", ] +[[package]] +name = "env_filter" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f2c92ceda6ceec50f43169f9ee8424fe2db276791afde7b2cd8bc084cb376ab" +dependencies = [ + "log", + "regex", +] + [[package]] name = "env_logger" version = "0.10.2" @@ -7841,7 +7885,7 @@ version = "14.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "006acf5a613e0b5cf095d8e4b3f48c12a60d9062aa2b2dd105afaf8344a5600c" dependencies = [ - "dashmap", + "dashmap 5.5.3", "gix-fs", "libc", "once_cell", @@ -8702,7 +8746,7 @@ dependencies = [ "murmur3", "num-bigint", "once_cell", - "opendal 0.50.0", + "opendal 0.50.1", "ordered-float 4.2.2", "parquet", "paste", @@ -9719,6 +9763,7 @@ dependencies = [ "anyhow", "colored", "crossbeam-channel", + "env_filter", "fastrace", "jiff", "log", @@ -10764,28 +10809,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b04d09b9822c2f75a1d2fc513a2c1279c70e91e7407936fffdf6a6976ec530a" dependencies = [ "anyhow", - "async-backtrace", "async-trait", "backon 0.4.4", "base64 0.22.1", "bytes", "chrono", "crc32c", - "fastrace", "flagset", "futures", "getrandom 0.2.15", - "hdrs", "http 1.1.0", "log", "md-5", "moka", "once_cell", "percent-encoding", - "prometheus-client", "prost 0.13.1", "quick-xml 0.36.1", - "redis", + "redis 0.26.1", "reqsign", "reqwest", "serde", @@ -10797,34 +10838,53 @@ dependencies = [ [[package]] name = "opendal" -version = "0.50.0" +version = "0.50.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36e44fc43be9ffe18dad3e3ef9d61c1ae01991ee6f1c8c026978c35777a711bf" +checksum = "213222b6c86949314d8f51acb26d8241e7c8dd0879b016a79471d49f21ee592f" dependencies = [ "anyhow", + "async-backtrace", "async-trait", "backon 1.2.0", "base64 0.22.1", + "bb8", "bytes", "chrono", "crc32c", + "fastrace", "flagset", "futures", "getrandom 0.2.15", + "hdrs", "http 1.1.0", "log", "md-5", + "moka", "once_cell", "percent-encoding", + "prometheus-client", + "prost 0.13.1", "quick-xml 0.36.1", + "redis 0.27.5", "reqsign", "reqwest", "serde", "serde_json", + "sha2", "tokio", "uuid", ] +[[package]] +name = "opendal_compat" +version = "1.0.0" +source = "git+https://github.com/apache/opendal?rev=f6e60f6#f6e60f621dec82765a49283caf0ddc1ff8b1d260" +dependencies = [ + "async-trait", + "opendal 0.49.2", + "opendal 0.50.1", +] + [[package]] name = "openraft" version = "0.10.0" @@ -12648,6 +12708,39 @@ dependencies = [ "url", ] +[[package]] +name = "redis" +version = "0.27.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81cccf17a692ce51b86564334614d72dcae1def0fd5ecebc9f02956da74352b5" +dependencies = [ + "arc-swap", + "async-trait", + "bytes", + "combine", + "crc16", + "futures", + "futures-util", + "itoa", + "log", + "num-bigint", + "percent-encoding", + "pin-project-lite", + "rand 0.8.5", + "rustls 0.23.12", + "rustls-native-certs 0.7.1", + "rustls-pemfile 2.1.3", + "rustls-pki-types", + "ryu", + "sha1_smol", + "socket2 0.5.7", + "tokio", + "tokio-retry2", + "tokio-rustls 0.26.0", + "tokio-util", + "url", +] + [[package]] name = "redox_syscall" version = "0.2.16" @@ -14180,6 +14273,15 @@ dependencies = [ "log", ] +[[package]] +name = "sqlparser" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fe11944a61da0da3f592e19a45ebe5ab92dc14a779907ff1f08fbb797bfefc7" +dependencies = [ + "log", +] + [[package]] name = "sqlx" version = "0.8.2" @@ -15129,9 +15231,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.39.2" +version = "1.41.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "daa4fb1bc778bd6f04cbfc4bb2d06a7396a8f299dc33ea1900cedaa316f467b1" +checksum = "145f3413504347a2be84393cc8a7d2fb4d863b375909ea59f2158261aa258bbb" dependencies = [ "backtrace", "bytes", @@ -15200,6 +15302,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-retry2" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "903934dba1c4c2f2e9cb460ef10b5695e0b0ecad3bf9ee7c8675e540c5e8b2d1" +dependencies = [ + "pin-project", + "rand 0.8.5", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.24.1" @@ -15950,7 +16063,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "71bc0fbf23ebdfc8718d40ba5f31a520376767edd6ebb9fe06c27eb977379796" dependencies = [ "async-broadcast", - "dashmap", + "dashmap 5.5.3", "faststr", "futures", "lazy_static", diff --git a/Cargo.toml b/Cargo.toml index 3a01536656f0..679998b3c7e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -229,7 +229,7 @@ criterion = "0.5" ctor = "0.2" dashmap = "5.4.0" deepsize = { version = "0.2.0" } -deltalake = "0.18" +deltalake = "0.20" derive-visitor = { version = "0.4.0", features = ["std-types-drive"] } derive_more = "0.99.17" enumflags2 = { version = "0.7.7", features = ["serde"] } @@ -256,7 +256,7 @@ mysql_async = { version = "0.34", default-features = false, features = ["native- object_store_opendal = "0.46" once_cell = "1.15.0" openai_api_rust = "0.1" -opendal = { version = "0.49.0", features = [ +opendal = { version = "0.50.1", features = [ "layers-fastrace", "layers-prometheus-client", "layers-async-backtrace", @@ -275,6 +275,7 @@ opendal = { version = "0.49.0", features = [ "services-huggingface", "services-redis", ] } +opendal_compat = { version = "1" } openraft = { git = "https://github.com/drmingdrmer/openraft", tag = "v0.10.0-alpha.6", features = [ "serde", "tracing-log", @@ -413,10 +414,10 @@ backtrace = { git = "https://github.com/rust-lang/backtrace-rs.git", rev = "7226 "serialize-serde", ] } color-eyre = { git = "https://github.com/eyre-rs/eyre.git", rev = "e5d92c3" } -deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "57795da" } ethnum = { git = "https://github.com/datafuse-extras/ethnum-rs", rev = "4cb05f1" } jsonb = { git = "https://github.com/databendlabs/jsonb", rev = "ada713c" } openai_api_rust = { git = "https://github.com/datafuse-extras/openai-api", rev = "819a0ed" } +opendal_compat = { git = "https://github.com/apache/opendal", rev = "f6e60f6" } orc-rust = { git = "https://github.com/datafuse-extras/datafusion-orc", rev = "03372b97" } recursive = { git = "https://github.com/datafuse-extras/recursive.git", rev = "6af35a1" } sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafuse.1" } @@ -424,3 +425,7 @@ tantivy = { git = "https://github.com/datafuse-extras/tantivy", rev = "7502370" tantivy-common = { git = "https://github.com/datafuse-extras/tantivy", rev = "7502370", package = "tantivy-common" } tantivy-jieba = { git = "https://github.com/datafuse-extras/tantivy-jieba", rev = "0e300e9" } xorfilter-rs = { git = "https://github.com/datafuse-extras/xorfilter", tag = "databend-alpha.4" } + +# This patch makes sure opendal_compat also used the same version of opendal instead of git one. +[patch.'https://github.com/apache/opendal'] +opendal = { version = "0.50.1" } diff --git a/src/common/base/Cargo.toml b/src/common/base/Cargo.toml index ab434f8558db..1bea77d7eaf3 100644 --- a/src/common/base/Cargo.toml +++ b/src/common/base/Cargo.toml @@ -50,6 +50,8 @@ prometheus-parse = "0.2.3" rand = { workspace = true, features = ["serde1"] } regex = { workspace = true } replace_with = "0.1.7" +reqwest = { workspace = true } +reqwest-hickory-resolver = { workspace = true } rustix = "0.38.37" semver = { workspace = true } serde = { workspace = true } diff --git a/src/common/base/src/http_client.rs b/src/common/base/src/http_client.rs new file mode 100644 index 000000000000..182c0a46611d --- /dev/null +++ b/src/common/base/src/http_client.rs @@ -0,0 +1,88 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::env; +use std::sync::Arc; +use std::sync::LazyLock; +use std::time::Duration; + +use reqwest_hickory_resolver::HickoryResolver; + +/// Global shared hickory resolver. +static GLOBAL_HICKORY_RESOLVER: LazyLock> = + LazyLock::new(|| Arc::new(HickoryResolver::default())); + +/// Global shared http client. +/// +/// Please create your own http client if you want dedicated http connection pool. +pub static GLOBAL_HTTP_CLIENT: LazyLock = LazyLock::new(HttpClient::new); + +/// HttpClient that used by databend. +pub struct HttpClient { + client: reqwest::Client, +} + +impl Default for HttpClient { + fn default() -> Self { + Self::new() + } +} + +impl HttpClient { + /// Create a new http client. + /// + /// # Notes + /// + /// This client is optimized for interact with storage services. + /// Please tune the settings if you want to use it for other purposes. + pub fn new() -> Self { + let mut builder = reqwest::ClientBuilder::new(); + + // Disable http2 for better performance. + builder = builder.http1_only(); + + // Set dns resolver. + builder = builder.dns_resolver(GLOBAL_HICKORY_RESOLVER.clone()); + + // Pool max idle per host controls connection pool size. + // Default to no limit, set to `0` for disable it. + let pool_max_idle_per_host = env::var("_DATABEND_INTERNAL_POOL_MAX_IDLE_PER_HOST") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(usize::MAX); + builder = builder.pool_max_idle_per_host(pool_max_idle_per_host); + + // Connect timeout default to 30s. + let connect_timeout = env::var("_DATABEND_INTERNAL_CONNECT_TIMEOUT") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(30); + builder = builder.connect_timeout(Duration::from_secs(connect_timeout)); + + // Enable TCP keepalive if set. + if let Ok(v) = env::var("_DATABEND_INTERNAL_TCP_KEEPALIVE") { + if let Ok(v) = v.parse::() { + builder = builder.tcp_keepalive(Duration::from_secs(v)); + } + } + + let client = builder.build().expect("http client must be created"); + HttpClient { client } + } + + /// Get the inner reqwest client. + pub fn inner(&self) -> reqwest::Client { + self.client.clone() + } +} diff --git a/src/common/base/src/lib.rs b/src/common/base/src/lib.rs index 31060daffe9e..b66ab4a8eecd 100644 --- a/src/common/base/src/lib.rs +++ b/src/common/base/src/lib.rs @@ -31,6 +31,7 @@ pub mod containers; pub mod display; pub mod future; pub mod headers; +pub mod http_client; pub mod mem_allocator; pub mod rangemap; pub mod runtime; diff --git a/src/common/storage/Cargo.toml b/src/common/storage/Cargo.toml index bacc5c9ecfd6..816392e7f221 100644 --- a/src/common/storage/Cargo.toml +++ b/src/common/storage/Cargo.toml @@ -29,8 +29,6 @@ opendal = { workspace = true } parquet = { workspace = true } prometheus-client = { workspace = true } regex = { workspace = true } -reqwest = { workspace = true } -reqwest-hickory-resolver = { workspace = true } serde = { workspace = true } thiserror = { workspace = true } diff --git a/src/common/storage/src/metrics_layer.rs b/src/common/storage/src/metrics_layer.rs index a63cb56ca30c..6cfadf8ce5d9 100644 --- a/src/common/storage/src/metrics_layer.rs +++ b/src/common/storage/src/metrics_layer.rs @@ -12,565 +12,157 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt; use std::fmt::Debug; -use std::fmt::Formatter; use std::sync::Arc; use std::sync::LazyLock; use std::time::Duration; -use std::time::Instant; use databend_common_base::runtime::metrics::register_counter_family; use databend_common_base::runtime::metrics::register_histogram_family; use databend_common_base::runtime::metrics::FamilyCounter; use databend_common_base::runtime::metrics::FamilyHistogram; -use futures::FutureExt; -use futures::TryFutureExt; -use opendal::raw::oio; +use opendal::layers::observe; use opendal::raw::Access; use opendal::raw::Layer; -use opendal::raw::LayeredAccess; -use opendal::raw::OpBatch; -use opendal::raw::OpCreateDir; -use opendal::raw::OpDelete; -use opendal::raw::OpList; -use opendal::raw::OpPresign; -use opendal::raw::OpRead; -use opendal::raw::OpStat; -use opendal::raw::OpWrite; use opendal::raw::Operation; -use opendal::raw::RpBatch; -use opendal::raw::RpCreateDir; -use opendal::raw::RpDelete; -use opendal::raw::RpList; -use opendal::raw::RpPresign; -use opendal::raw::RpRead; -use opendal::raw::RpStat; -use opendal::raw::RpWrite; -use opendal::Buffer; use opendal::ErrorKind; use opendal::Scheme; +use prometheus_client::encoding::EncodeLabel; +use prometheus_client::encoding::EncodeLabelSet; +use prometheus_client::encoding::LabelSetEncoder; use prometheus_client::metrics::histogram::exponential_buckets; -type OperationLabels = [(&'static str, &'static str); 2]; -type ErrorLabels = [(&'static str, &'static str); 3]; - pub static METRICS_LAYER: LazyLock = LazyLock::new(|| MetricsLayer { - metrics: Arc::new(MetricsRecorder::new()), + metrics: MetricsRecorder::new(), }); +#[derive(Debug, Clone)] +pub struct MetricsLayer { + metrics: MetricsRecorder, +} + +impl Layer for MetricsLayer { + type LayeredAccess = observe::MetricsAccessor; + + fn layer(&self, inner: A) -> Self::LayeredAccess { + observe::MetricsLayer::new(self.metrics.clone()).layer(inner) + } +} + #[derive(Debug, Clone)] pub struct MetricsRecorder { - /// Total counter of the specific operation be called. - requests_total: FamilyCounter, - /// Total counter of the errors. - errors_total: FamilyCounter, /// Latency of the specific operation be called. - request_duration_seconds: FamilyHistogram, + operation_duration_seconds: FamilyHistogram, /// The histogram of bytes - bytes_histogram: FamilyHistogram, - /// The counter of bytes - bytes_total: FamilyCounter, + operation_bytes: FamilyHistogram, + /// Total counter of the specific operation be called. + operation_errors_total: FamilyCounter, } impl MetricsRecorder { pub fn new() -> Self { MetricsRecorder { - requests_total: register_counter_family("opendal_requests"), - errors_total: register_counter_family("opendal_errors"), - request_duration_seconds: register_histogram_family( - "opendal_request_duration_seconds", + operation_duration_seconds: register_histogram_family( + &observe::METRIC_OPERATION_DURATION_SECONDS.name(), exponential_buckets(0.01, 2.0, 16), ), - bytes_histogram: register_histogram_family( - "opendal_bytes_histogram", + operation_bytes: register_histogram_family( + &observe::METRIC_OPERATION_BYTES.name(), exponential_buckets(1.0, 2.0, 16), ), - bytes_total: register_counter_family("opendal_bytes"), - } - } - - fn increment_errors_total(&self, scheme: Scheme, op: Operation, err: ErrorKind) { - let labels = [ - ("scheme", scheme.into_static()), - ("op", op.into_static()), - ("err", err.into_static()), - ]; - self.errors_total.get_or_create(&labels).inc(); - } - - fn increment_request_total(&self, scheme: Scheme, op: Operation) { - let labels = [("scheme", scheme.into_static()), ("op", op.into_static())]; - self.requests_total.get_or_create(&labels).inc(); - } - - fn observe_bytes_total(&self, scheme: Scheme, op: Operation, bytes: usize) { - let labels = [("scheme", scheme.into_static()), ("op", op.into_static())]; - self.bytes_histogram - .get_or_create(&labels) - .observe(bytes as f64); - self.bytes_total.get_or_create(&labels).inc_by(bytes as u64); - } - - fn observe_request_duration(&self, scheme: Scheme, op: Operation, duration: Duration) { - let labels = [("scheme", scheme.into_static()), ("op", op.into_static())]; - self.request_duration_seconds - .get_or_create(&labels) - .observe(duration.as_secs_f64()); - } -} - -#[derive(Debug, Clone)] -pub struct MetricsLayer { - metrics: Arc, -} - -impl Layer for MetricsLayer { - type LayeredAccess = MetricsLayerAccessor; - - fn layer(&self, inner: A) -> Self::LayeredAccess { - let meta = inner.info(); - let scheme = meta.scheme(); - - MetricsLayerAccessor { - inner, - metrics: self.metrics.clone(), - scheme, + operation_errors_total: register_counter_family("opendal_operation_errors"), } } } -#[derive(Clone)] -pub struct MetricsLayerAccessor { - inner: A, - scheme: Scheme, - metrics: Arc, -} - -impl Debug for MetricsLayerAccessor { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - f.debug_struct("MetricsAccessor") - .field("inner", &self.inner) - .finish_non_exhaustive() - } -} - -impl LayeredAccess for MetricsLayerAccessor { - type Inner = A; - type Reader = OperatorMetricsWrapper; - type BlockingReader = OperatorMetricsWrapper; - type Writer = OperatorMetricsWrapper; - type BlockingWriter = OperatorMetricsWrapper; - type Lister = A::Lister; - type BlockingLister = A::BlockingLister; - - fn inner(&self) -> &Self::Inner { - &self.inner - } - - async fn create_dir(&self, path: &str, args: OpCreateDir) -> opendal::Result { - self.metrics - .increment_request_total(self.scheme, Operation::CreateDir); - - let start_time = Instant::now(); - let create_res = self.inner.create_dir(path, args).await; - - self.metrics.observe_request_duration( - self.scheme, - Operation::CreateDir, - start_time.elapsed(), - ); - create_res.inspect_err(|e| { - self.metrics - .increment_errors_total(self.scheme, Operation::CreateDir, e.kind()); - }) - } - - async fn read(&self, path: &str, args: OpRead) -> opendal::Result<(RpRead, Self::Reader)> { - self.metrics - .increment_request_total(self.scheme, Operation::Read); - - let read_res = self - .inner - .read(path, args) - .map(|v| { - v.map(|(rp, r)| { - ( - rp, - OperatorMetricsWrapper::new( - r, - Operation::Read, - self.metrics.clone(), - self.scheme, - ), - ) - }) - }) - .await; - read_res.inspect_err(|e| { - self.metrics - .increment_errors_total(self.scheme, Operation::Read, e.kind()); - }) - } - - async fn write(&self, path: &str, args: OpWrite) -> opendal::Result<(RpWrite, Self::Writer)> { - self.metrics - .increment_request_total(self.scheme, Operation::Write); - - let write_res = self - .inner - .write(path, args) - .map(|v| { - v.map(|(rp, r)| { - ( - rp, - OperatorMetricsWrapper::new( - r, - Operation::Write, - self.metrics.clone(), - self.scheme, - ), - ) - }) - }) - .await; - - write_res.inspect_err(|e| { - self.metrics - .increment_errors_total(self.scheme, Operation::Write, e.kind()); - }) - } - - async fn stat(&self, path: &str, args: OpStat) -> opendal::Result { - self.metrics - .increment_request_total(self.scheme, Operation::Stat); - let start_time = Instant::now(); - - let stat_res = self - .inner - .stat(path, args) - .inspect_err(|e| { - self.metrics - .increment_errors_total(self.scheme, Operation::Stat, e.kind()); - }) - .await; - - self.metrics - .observe_request_duration(self.scheme, Operation::Stat, start_time.elapsed()); - stat_res.inspect_err(|e| { - self.metrics - .increment_errors_total(self.scheme, Operation::Stat, e.kind()); - }) - } - - async fn delete(&self, path: &str, args: OpDelete) -> opendal::Result { - self.metrics - .increment_request_total(self.scheme, Operation::Delete); - let start_time = Instant::now(); - - let delete_res = self.inner.delete(path, args).await; - - self.metrics - .observe_request_duration(self.scheme, Operation::Delete, start_time.elapsed()); - delete_res.inspect_err(|e| { - self.metrics - .increment_errors_total(self.scheme, Operation::Delete, e.kind()); - }) - } - - async fn list(&self, path: &str, args: OpList) -> opendal::Result<(RpList, Self::Lister)> { - self.metrics - .increment_request_total(self.scheme, Operation::List); - let start_time = Instant::now(); - - let list_res = self.inner.list(path, args).await; - - self.metrics - .observe_request_duration(self.scheme, Operation::List, start_time.elapsed()); - list_res.inspect_err(|e| { - self.metrics - .increment_errors_total(self.scheme, Operation::List, e.kind()); - }) - } - - async fn batch(&self, args: OpBatch) -> opendal::Result { - self.metrics - .increment_request_total(self.scheme, Operation::Batch); - let start_time = Instant::now(); - - let result = self.inner.batch(args).await; - - self.metrics - .observe_request_duration(self.scheme, Operation::Batch, start_time.elapsed()); - result.inspect_err(|e| { - self.metrics - .increment_errors_total(self.scheme, Operation::Batch, e.kind()); - }) - } - - async fn presign(&self, path: &str, args: OpPresign) -> opendal::Result { - self.metrics - .increment_request_total(self.scheme, Operation::Presign); - let start_time = Instant::now(); - - let result = self.inner.presign(path, args).await; - - self.metrics.observe_request_duration( - self.scheme, - Operation::Presign, - start_time.elapsed(), - ); - result.inspect_err(|e| { - self.metrics - .increment_errors_total(self.scheme, Operation::Presign, e.kind()); - }) - } - - fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> opendal::Result { - self.metrics - .increment_request_total(self.scheme, Operation::BlockingCreateDir); - let start_time = Instant::now(); - - let result = self.inner.blocking_create_dir(path, args); - - self.metrics.observe_request_duration( - self.scheme, - Operation::BlockingCreateDir, - start_time.elapsed(), - ); - result.inspect_err(|e| { - self.metrics.increment_errors_total( - self.scheme, - Operation::BlockingCreateDir, - e.kind(), - ); - }) - } - - fn blocking_read( +impl observe::MetricsIntercept for MetricsRecorder { + fn observe_operation_duration_seconds( &self, - path: &str, - args: OpRead, - ) -> opendal::Result<(RpRead, Self::BlockingReader)> { - self.metrics - .increment_request_total(self.scheme, Operation::BlockingRead); - - let result = self.inner.blocking_read(path, args).map(|(rp, r)| { - ( - rp, - OperatorMetricsWrapper::new( - r, - Operation::BlockingRead, - self.metrics.clone(), - self.scheme, - ), - ) - }); - - result.inspect_err(|e| { - self.metrics - .increment_errors_total(self.scheme, Operation::BlockingRead, e.kind()); - }) + scheme: Scheme, + namespace: Arc, + root: Arc, + _path: &str, + op: Operation, + duration: Duration, + ) { + self.operation_duration_seconds + .get_or_create(&OperationLabels { + scheme, + namespace, + root, + operation: op, + path: None, + error: None, + }) + .observe(duration.as_secs_f64()) } - fn blocking_write( + fn observe_operation_bytes( &self, - path: &str, - args: OpWrite, - ) -> opendal::Result<(RpWrite, Self::BlockingWriter)> { - self.metrics - .increment_request_total(self.scheme, Operation::BlockingWrite); - - let result = self.inner.blocking_write(path, args).map(|(rp, r)| { - ( - rp, - OperatorMetricsWrapper::new( - r, - Operation::BlockingWrite, - self.metrics.clone(), - self.scheme, - ), - ) - }); - - result.inspect_err(|e| { - self.metrics - .increment_errors_total(self.scheme, Operation::BlockingWrite, e.kind()); - }) - } - - fn blocking_stat(&self, path: &str, args: OpStat) -> opendal::Result { - self.metrics - .increment_request_total(self.scheme, Operation::BlockingStat); - let start_time = Instant::now(); - - let result = self.inner.blocking_stat(path, args); - self.metrics.observe_request_duration( - self.scheme, - Operation::BlockingStat, - start_time.elapsed(), - ); - - result.inspect_err(|e| { - self.metrics - .increment_errors_total(self.scheme, Operation::BlockingStat, e.kind()); - }) - } - - fn blocking_delete(&self, path: &str, args: OpDelete) -> opendal::Result { - self.metrics - .increment_request_total(self.scheme, Operation::BlockingDelete); - let start_time = Instant::now(); - - let result = self.inner.blocking_delete(path, args); - - self.metrics.observe_request_duration( - self.scheme, - Operation::BlockingDelete, - start_time.elapsed(), - ); - result.inspect_err(|e| { - self.metrics - .increment_errors_total(self.scheme, Operation::BlockingDelete, e.kind()); - }) + scheme: Scheme, + namespace: Arc, + root: Arc, + _path: &str, + op: Operation, + bytes: usize, + ) { + self.operation_bytes + .get_or_create(&OperationLabels { + scheme, + namespace, + root, + operation: op, + path: None, + error: None, + }) + .observe(bytes as f64) } - fn blocking_list( + fn observe_operation_errors_total( &self, - path: &str, - args: OpList, - ) -> opendal::Result<(RpList, Self::BlockingLister)> { - self.metrics - .increment_request_total(self.scheme, Operation::BlockingList); - let start_time = Instant::now(); - - let result = self.inner.blocking_list(path, args); - - self.metrics.observe_request_duration( - self.scheme, - Operation::BlockingList, - start_time.elapsed(), - ); - result.inspect_err(|e| { - self.metrics - .increment_errors_total(self.scheme, Operation::BlockingList, e.kind()); - }) + scheme: Scheme, + namespace: Arc, + root: Arc, + _path: &str, + op: Operation, + error: ErrorKind, + ) { + self.operation_errors_total + .get_or_create(&OperationLabels { + scheme, + namespace, + root, + operation: op, + path: None, + error: Some(error.into_static()), + }) + .inc(); } } -pub struct OperatorMetricsWrapper { - inner: R, - - op: Operation, - metrics: Arc, +#[derive(Clone, Debug, Eq, PartialEq, Hash)] +struct OperationLabels { scheme: Scheme, + namespace: Arc, + root: Arc, + operation: Operation, + path: Option, + error: Option<&'static str>, } -impl OperatorMetricsWrapper { - fn new(inner: R, op: Operation, metrics: Arc, scheme: Scheme) -> Self { - Self { - inner, - op, - metrics, - scheme, +impl EncodeLabelSet for OperationLabels { + fn encode(&self, mut encoder: LabelSetEncoder) -> Result<(), fmt::Error> { + (observe::LABEL_SCHEME, self.scheme.into_static()).encode(encoder.encode_label())?; + (observe::LABEL_NAMESPACE, self.namespace.as_str()).encode(encoder.encode_label())?; + (observe::LABEL_ROOT, self.root.as_str()).encode(encoder.encode_label())?; + (observe::LABEL_OPERATION, self.operation.into_static()).encode(encoder.encode_label())?; + if let Some(path) = &self.path { + (observe::LABEL_PATH, path.as_str()).encode(encoder.encode_label())?; } - } -} - -impl oio::Read for OperatorMetricsWrapper { - async fn read(&mut self) -> opendal::Result { - let start = Instant::now(); - - self.inner - .read() - .await - .inspect(|res| { - self.metrics - .observe_bytes_total(self.scheme, self.op, res.len()); - self.metrics - .observe_request_duration(self.scheme, self.op, start.elapsed()); - }) - .inspect_err(|err| { - self.metrics - .increment_errors_total(self.scheme, self.op, err.kind()); - }) - } -} - -impl oio::BlockingRead for OperatorMetricsWrapper { - fn read(&mut self) -> opendal::Result { - let start = Instant::now(); - - self.inner - .read() - .inspect(|res| { - self.metrics - .observe_bytes_total(self.scheme, self.op, res.len()); - self.metrics - .observe_request_duration(self.scheme, self.op, start.elapsed()); - }) - .inspect_err(|err| { - self.metrics - .increment_errors_total(self.scheme, self.op, err.kind()); - }) - } -} - -impl oio::Write for OperatorMetricsWrapper { - async fn write(&mut self, bs: Buffer) -> opendal::Result<()> { - let start = Instant::now(); - let size = bs.len(); - - self.inner - .write(bs) - .await - .inspect(|_| { - self.metrics.observe_bytes_total(self.scheme, self.op, size); - self.metrics - .observe_request_duration(self.scheme, self.op, start.elapsed()); - }) - .inspect_err(|err| { - self.metrics - .increment_errors_total(self.scheme, self.op, err.kind()); - }) - } - - async fn close(&mut self) -> opendal::Result<()> { - self.inner.close().await.inspect_err(|err| { - self.metrics - .increment_errors_total(self.scheme, self.op, err.kind()); - }) - } - - async fn abort(&mut self) -> opendal::Result<()> { - self.inner.abort().await.inspect_err(|err| { - self.metrics - .increment_errors_total(self.scheme, self.op, err.kind()); - }) - } -} - -impl oio::BlockingWrite for OperatorMetricsWrapper { - fn write(&mut self, bs: Buffer) -> opendal::Result<()> { - let start = Instant::now(); - let size = bs.len(); - - self.inner - .write(bs) - .inspect(|_| { - self.metrics.observe_bytes_total(self.scheme, self.op, size); - self.metrics - .observe_request_duration(self.scheme, self.op, start.elapsed()); - }) - .inspect_err(|err| { - self.metrics - .increment_errors_total(self.scheme, self.op, err.kind()); - }) - } - - fn close(&mut self) -> opendal::Result<()> { - self.inner.close().inspect_err(|err| { - self.metrics - .increment_errors_total(self.scheme, self.op, err.kind()); - }) + if let Some(error) = self.error { + (observe::LABEL_ERROR, error).encode(encoder.encode_label())?; + } + Ok(()) } } diff --git a/src/common/storage/src/operator.rs b/src/common/storage/src/operator.rs index 392343187731..6cf73e49d124 100644 --- a/src/common/storage/src/operator.rs +++ b/src/common/storage/src/operator.rs @@ -16,12 +16,11 @@ use std::env; use std::io::Error; use std::io::ErrorKind; use std::io::Result; -use std::sync::Arc; -use std::sync::LazyLock; use std::time::Duration; use anyhow::anyhow; use databend_common_base::base::GlobalInstance; +use databend_common_base::http_client::GLOBAL_HTTP_CLIENT; use databend_common_base::runtime::GlobalIORuntime; use databend_common_base::runtime::TrySpawn; use databend_common_exception::ErrorCode; @@ -54,22 +53,11 @@ use opendal::raw::HttpClient; use opendal::services; use opendal::Builder; use opendal::Operator; -use reqwest_hickory_resolver::HickoryResolver; use crate::metrics_layer::METRICS_LAYER; use crate::runtime_layer::RuntimeLayer; use crate::StorageConfig; -/// The global dns resolver for opendal. -static GLOBAL_HICKORY_RESOLVER: LazyLock> = - LazyLock::new(|| Arc::new(HickoryResolver::default())); - -static GLOBAL_HTTP_CLIENT: LazyLock = LazyLock::new(|| { - new_storage_http_client().unwrap_or_else(|err| { - panic!("http client must be created successfully, but failed for {err}") - }) -}); - /// init_operator will init an opendal operator based on storage config. pub fn init_operator(cfg: &StorageParams) -> Result { let op = match &cfg { @@ -102,16 +90,35 @@ pub fn init_operator(cfg: &StorageParams) -> Result { Ok(op) } +/// Please take care about the timing of calling opendal's `finish`. +/// +/// Layers added before `finish` will use static dispatch, and layers added after `finish` +/// will use dynamic dispatch. Adding too many layers via static dispatch will increase +/// the compile time of rustc or even results in a compile error. +/// +/// ```txt +/// error[E0275]: overflow evaluating the requirement `http::response::Response<()>: std::marker::Send` +/// | +/// = help: consider increasing the recursion limit by adding a `#![recursion_limit = "256"]` attribute to your crate (`databend_common_storage`) +/// note: required because it appears within the type `h2::proto::peer::PollMessage` +/// --> /home/xuanwo/.cargo/registry/src/index.crates.io-6f17d22bba15001f/h2-0.4.5/src/proto/peer.rs:43:10 +/// | +/// 43 | pub enum PollMessage { +/// | ^^^^^^^^^^^ +/// ``` +/// +/// Please balance the performance and compile time. pub fn build_operator(builder: B) -> Result { - let ob = Operator::new(builder)?; - - let op = ob + let ob = Operator::new(builder)? // NOTE // // Magic happens here. We will add a layer upon original // storage operator so that all underlying storage operations // will send to storage runtime. .layer(RuntimeLayer::new(GlobalIORuntime::instance())) + .finish(); + + let mut op = ob .layer({ let retry_timeout = env::var("_DATABEND_INTERNAL_RETRY_TIMEOUT") .ok() @@ -154,11 +161,11 @@ pub fn build_operator(builder: B) -> Result { if let Ok(permits) = env::var("_DATABEND_INTERNAL_MAX_CONCURRENT_IO_REQUEST") { if let Ok(permits) = permits.parse::() { - return Ok(op.layer(ConcurrentLimitLayer::new(permits)).finish()); + op = op.layer(ConcurrentLimitLayer::new(permits)); } } - Ok(op.finish()) + Ok(op) } /// init_azblob_operator will init an opendal azblob operator. @@ -173,7 +180,7 @@ pub fn init_azblob_operator(cfg: &StorageAzblobConfig) -> Result { // Credential .account_name(&cfg.account_name) .account_key(&cfg.account_key) - .http_client(GLOBAL_HTTP_CLIENT.clone()); + .http_client(HttpClient::with(GLOBAL_HTTP_CLIENT.inner())); Ok(builder) } @@ -198,7 +205,7 @@ fn init_gcs_operator(cfg: &StorageGcsConfig) -> Result { .bucket(&cfg.bucket) .root(&cfg.root) .credential(&cfg.credential) - .http_client(GLOBAL_HTTP_CLIENT.clone()); + .http_client(HttpClient::with(GLOBAL_HTTP_CLIENT.inner())); Ok(builder) } @@ -295,7 +302,7 @@ fn init_s3_operator(cfg: &StorageS3Config) -> Result { builder = builder.enable_virtual_host_style(); } - builder = builder.http_client(GLOBAL_HTTP_CLIENT.clone()); + builder = builder.http_client(HttpClient::with(GLOBAL_HTTP_CLIENT.inner())); Ok(builder) } @@ -312,7 +319,7 @@ fn init_obs_operator(cfg: &StorageObsConfig) -> Result { // Credential .access_key_id(&cfg.access_key_id) .secret_access_key(&cfg.secret_access_key) - .http_client(GLOBAL_HTTP_CLIENT.clone()); + .http_client(HttpClient::with(GLOBAL_HTTP_CLIENT.inner())); Ok(builder) } @@ -328,7 +335,7 @@ fn init_oss_operator(cfg: &StorageOssConfig) -> Result { .root(&cfg.root) .server_side_encryption(&cfg.server_side_encryption) .server_side_encryption_key_id(&cfg.server_side_encryption_key_id) - .http_client(GLOBAL_HTTP_CLIENT.clone()); + .http_client(HttpClient::with(GLOBAL_HTTP_CLIENT.inner())); Ok(builder) } @@ -361,7 +368,7 @@ fn init_cos_operator(cfg: &StorageCosConfig) -> Result { .secret_key(&cfg.secret_key) .bucket(&cfg.bucket) .root(&cfg.root) - .http_client(GLOBAL_HTTP_CLIENT.clone()); + .http_client(HttpClient::with(GLOBAL_HTTP_CLIENT.inner())); Ok(builder) } @@ -378,41 +385,6 @@ fn init_huggingface_operator(cfg: &StorageHuggingfaceConfig) -> Result Result { - let mut builder = reqwest::ClientBuilder::new(); - - // Disable http2 for better performance. - builder = builder.http1_only(); - - // Set dns resolver. - builder = builder.dns_resolver(GLOBAL_HICKORY_RESOLVER.clone()); - - // Pool max idle per host controls connection pool size. - // Default to no limit, set to `0` for disable it. - let pool_max_idle_per_host = env::var("_DATABEND_INTERNAL_POOL_MAX_IDLE_PER_HOST") - .ok() - .and_then(|v| v.parse::().ok()) - .unwrap_or(usize::MAX); - builder = builder.pool_max_idle_per_host(pool_max_idle_per_host); - - // Connect timeout default to 30s. - let connect_timeout = env::var("_DATABEND_INTERNAL_CONNECT_TIMEOUT") - .ok() - .and_then(|v| v.parse::().ok()) - .unwrap_or(30); - builder = builder.connect_timeout(Duration::from_secs(connect_timeout)); - - // Enable TCP keepalive if set. - if let Ok(v) = env::var("_DATABEND_INTERNAL_TCP_KEEPALIVE") { - if let Ok(v) = v.parse::() { - builder = builder.tcp_keepalive(Duration::from_secs(v)); - } - } - - Ok(HttpClient::build(builder)?) -} - pub struct DatabendRetryInterceptor; impl RetryInterceptor for DatabendRetryInterceptor { diff --git a/src/common/storage/src/runtime_layer.rs b/src/common/storage/src/runtime_layer.rs index 69da2b471b2e..fc662a83504e 100644 --- a/src/common/storage/src/runtime_layer.rs +++ b/src/common/storage/src/runtime_layer.rs @@ -98,7 +98,6 @@ impl LayeredAccess for RuntimeAccessor { &self.inner } - #[async_backtrace::framed] async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result { let op = self.inner.clone(); let path = path.to_string(); @@ -108,7 +107,6 @@ impl LayeredAccess for RuntimeAccessor { .expect("join must success") } - #[async_backtrace::framed] async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { let op = self.inner.clone(); let path = path.to_string(); @@ -123,7 +121,6 @@ impl LayeredAccess for RuntimeAccessor { }) } - #[async_backtrace::framed] async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { let op = self.inner.clone(); let path = path.to_string(); @@ -133,7 +130,6 @@ impl LayeredAccess for RuntimeAccessor { .expect("join must success") } - #[async_backtrace::framed] async fn stat(&self, path: &str, args: OpStat) -> Result { let op = self.inner.clone(); let path = path.to_string(); @@ -143,7 +139,6 @@ impl LayeredAccess for RuntimeAccessor { .expect("join must success") } - #[async_backtrace::framed] async fn delete(&self, path: &str, args: OpDelete) -> Result { let op = self.inner.clone(); let path = path.to_string(); @@ -153,7 +148,6 @@ impl LayeredAccess for RuntimeAccessor { .expect("join must success") } - #[async_backtrace::framed] async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { let op = self.inner.clone(); let path = path.to_string(); diff --git a/src/common/tracing/Cargo.toml b/src/common/tracing/Cargo.toml index 822411af6cac..bb3b6c050903 100644 --- a/src/common/tracing/Cargo.toml +++ b/src/common/tracing/Cargo.toml @@ -28,6 +28,7 @@ logforth = { version = "0.12", features = [ 'rolling_file', 'opentelemetry', 'fastrace', + "env-filter", ] } opentelemetry = { workspace = true } opentelemetry-otlp = { workspace = true } diff --git a/src/common/tracing/src/init.rs b/src/common/tracing/src/init.rs index 93b5793dfdea..1b66f0c51939 100644 --- a/src/common/tracing/src/init.rs +++ b/src/common/tracing/src/init.rs @@ -22,7 +22,9 @@ use databend_common_base::runtime::Thread; use fastrace::prelude::*; use log::LevelFilter; use log::Metadata; +use logforth::filter::env::EnvFilterBuilder; use logforth::filter::CustomFilter; +use logforth::filter::EnvFilter; use logforth::filter::FilterResult; use logforth::filter::TargetFilter; use logforth::Dispatch; @@ -213,7 +215,9 @@ pub fn init_logging( "databend::log::structlog", LevelFilter::Off, )) - .filter(cfg.stderr.level.parse().unwrap_or(LevelFilter::Info)) + .filter(EnvFilter::new( + EnvFilterBuilder::new().parse(&cfg.stderr.level), + )) .layout(get_layout(&cfg.stderr.format)) .append(logforth::append::Stderr); logger = logger.dispatch(dispatch); diff --git a/src/meta/app/src/storage/storage_params.rs b/src/meta/app/src/storage/storage_params.rs index 32af7a146b62..73e7e8bd2209 100644 --- a/src/meta/app/src/storage/storage_params.rs +++ b/src/meta/app/src/storage/storage_params.rs @@ -15,9 +15,7 @@ use std::fmt::Debug; use std::fmt::Display; use std::fmt::Formatter; -use std::time::Duration; -use databend_common_exception::ErrorCode; use databend_common_exception::Result; use serde::Deserialize; use serde::Serialize; @@ -123,7 +121,6 @@ impl StorageParams { pub async fn auto_detect(self) -> Result { let sp = match self { StorageParams::S3(mut s3) if s3.region.is_empty() => { - // TODO: endpoint related logic should be moved out from opendal as a new API. // Remove the possible trailing `/` in endpoint. let endpoint = s3.endpoint_url.trim_end_matches('/'); @@ -135,23 +132,7 @@ impl StorageParams { format!("https://{}", endpoint) }; - // We should not return error if client create failed, just ignore it. - if let Ok(client) = opendal::raw::HttpClient::new() { - // The response itself doesn't important. - let _ = client - .client() - .get(&endpoint) - .timeout(Duration::from_secs(10)) - .send() - .await - .map_err(|err| { - ErrorCode::InvalidConfig(format!( - "s3 endpoint_url {} is invalid or incomplete: {err:?}", - s3.endpoint_url - )) - })?; - } - s3.region = opendal::services::S3::detect_region(&s3.endpoint_url, &s3.bucket) + s3.region = opendal::services::S3::detect_region(&endpoint, &s3.bucket) .await .unwrap_or_default(); StorageParams::S3(s3) diff --git a/src/query/ee/src/storages/fuse/operations/vacuum_temporary_files.rs b/src/query/ee/src/storages/fuse/operations/vacuum_temporary_files.rs index a5045f786716..81ef70142bb1 100644 --- a/src/query/ee/src/storages/fuse/operations/vacuum_temporary_files.rs +++ b/src/query/ee/src/storages/fuse/operations/vacuum_temporary_files.rs @@ -47,7 +47,7 @@ pub async fn do_vacuum_temporary_files( let operator = DataOperator::instance().operator(); - let temporary_dir = format!("{}/", temporary_dir); + let temporary_dir = format!("{}/", temporary_dir.trim_end_matches('/')); let mut ds = operator .lister_with(&temporary_dir) @@ -66,12 +66,16 @@ pub async fn do_vacuum_temporary_files( let mut batch_size = 0; while let Some(de) = ds.try_next().await? { + if de.path() == temporary_dir { + continue; + } + let meta = de.metadata(); match meta.mode() { EntryMode::DIR => { let life_mills = - match operator.is_exist(&format!("{}finished", de.path())).await? { + match operator.exists(&format!("{}finished", de.path())).await? { true => 0, false => expire_time, }; @@ -159,7 +163,7 @@ async fn vacuum_finished_query( removed_temp_files: &mut usize, total_cleaned_size: &mut usize, batch_size: &mut usize, - de: &Entry, + parent: &Entry, limit: usize, timestamp: i64, life_mills: i64, @@ -168,7 +172,7 @@ async fn vacuum_finished_query( let mut all_files_removed = true; let mut ds = operator - .lister_with(de.path()) + .lister_with(parent.path()) .metakey(Metakey::Mode | Metakey::LastModified) .await?; @@ -180,6 +184,10 @@ async fn vacuum_finished_query( let mut remove_temp_files_path = Vec::with_capacity(1001); while let Some(de) = ds.try_next().await? { + if de.path() == parent.path() { + continue; + } + let meta = de.metadata(); if meta.is_file() { if de.name() == "finished" { @@ -218,7 +226,7 @@ async fn vacuum_finished_query( info!( "vacuum removed {} temp files in {:?}(elapsed: {} seconds), batch size: {} bytes", cur_removed, - de.path(), + parent.path(), instant.elapsed().as_secs(), *batch_size ); @@ -238,8 +246,10 @@ async fn vacuum_finished_query( } if all_files_removed { - operator.delete(&format!("{}finished", de.path())).await?; - operator.delete(de.path()).await?; + operator + .delete(&format!("{}finished", parent.path())) + .await?; + operator.delete(parent.path()).await?; } Ok(()) diff --git a/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs b/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs index 2efa93ca5f41..5eefbb43a1cc 100644 --- a/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs +++ b/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs @@ -112,6 +112,7 @@ async fn test_fuse_do_vacuum_drop_tables() -> Result<()> { Ok(()) } + #[tokio::test(flavor = "multi_thread")] async fn test_do_vacuum_temporary_files() -> Result<()> { let _fixture = TestFixture::setup().await?; @@ -121,15 +122,14 @@ async fn test_do_vacuum_temporary_files() -> Result<()> { operator.write("test_dir/test2", vec![1, 2]).await?; operator.write("test_dir/test3", vec![1, 2]).await?; - assert_eq!( - 3, - operator.list_with("test_dir/").recursive(true).await?.len() - ); + let size = operator.list_with("test_dir/").recursive(true).await?.len(); + assert!((3..=4).contains(&size)); tokio::time::sleep(Duration::from_secs(2)).await; do_vacuum_temporary_files("test_dir/".to_string(), Some(Duration::from_secs(2)), 1).await?; - assert_eq!(2, operator.list("test_dir/").await?.len()); + let size = operator.list("test_dir/").await?.len(); + assert!((2..=3).contains(&size)); operator.write("test_dir/test4/test4", vec![1, 2]).await?; operator.write("test_dir/test5/test5", vec![1, 2]).await?; @@ -138,11 +138,16 @@ async fn test_do_vacuum_temporary_files() -> Result<()> { .await?; do_vacuum_temporary_files("test_dir/".to_string(), Some(Duration::from_secs(2)), 2).await?; - assert_eq!(operator.list("test_dir/").await?.len(), 2); + let size = operator.list("test_dir/").await?.len(); + assert!((2..=3).contains(&size)); tokio::time::sleep(Duration::from_secs(3)).await; do_vacuum_temporary_files("test_dir/".to_string(), Some(Duration::from_secs(3)), 1000).await?; - assert!(operator.list_with("test_dir/").await?.is_empty()); + + dbg!(operator.list_with("test_dir/").await?); + + let size = operator.list("test_dir/").await?.len(); + assert!((0..=1).contains(&size)); Ok(()) } @@ -155,8 +160,10 @@ mod test_accessor { use opendal::raw::oio; use opendal::raw::oio::Entry; use opendal::raw::MaybeSend; + use opendal::raw::OpBatch; use opendal::raw::OpDelete; use opendal::raw::OpList; + use opendal::raw::RpBatch; use opendal::raw::RpDelete; use opendal::raw::RpList; @@ -266,6 +273,18 @@ mod test_accessor { } } + async fn batch(&self, _args: OpBatch) -> opendal::Result { + self.hit_delete.store(true, Ordering::Release); + if self.inject_delete_faulty { + Err(opendal::Error::new( + opendal::ErrorKind::Unexpected, + "does not matter (delete)", + )) + } else { + Ok(RpBatch::new(vec![])) + } + } + async fn list(&self, path: &str, _args: OpList) -> opendal::Result<(RpList, Self::Lister)> { if self.inject_delete_faulty { // While injecting faulty for delete operation, return an empty list; diff --git a/src/query/ee/tests/it/storages/fuse/operations/virtual_columns.rs b/src/query/ee/tests/it/storages/fuse/operations/virtual_columns.rs index 5ed79b727779..97770a1c91c7 100644 --- a/src/query/ee/tests/it/storages/fuse/operations/virtual_columns.rs +++ b/src/query/ee/tests/it/storages/fuse/operations/virtual_columns.rs @@ -101,7 +101,7 @@ async fn test_fuse_do_refresh_virtual_column() -> Result<()> { for block_meta in block_metas { let virtual_loc = TableMetaLocationGenerator::gen_virtual_block_location(&block_meta.location.0); - assert!(dal.is_exist(&virtual_loc).await?); + assert!(dal.exists(&virtual_loc).await?); let schema = match storage_format { FuseStorageFormat::Parquet => read_parquet_schema_async_rs(dal, &virtual_loc, None) diff --git a/src/query/storages/delta/Cargo.toml b/src/query/storages/delta/Cargo.toml index bd98a9a61050..4a3f5f4d4b5a 100644 --- a/src/query/storages/delta/Cargo.toml +++ b/src/query/storages/delta/Cargo.toml @@ -25,6 +25,24 @@ deltalake = { workspace = true } fastrace = { workspace = true } match-template = "0.0.1" object_store_opendal = { workspace = true } +# enable features to make delta happy. +opendal = { version = "0.49.2", features = [ + "services-s3", + "services-fs", + "services-gcs", + "services-cos", + "services-obs", + "services-oss", + "services-azblob", + "services-azdls", + "services-ipfs", + "services-http", + "services-moka", + "services-webhdfs", + "services-huggingface", + "services-redis", +] } +opendal_compat = { workspace = true, features = ["v0_50_to_v0_49"] } parquet = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } @@ -36,4 +54,4 @@ url = "2.4.1" workspace = true [package.metadata.cargo-machete] -ignored = ["match-template"] +ignored = ["match-template", "opendal"] diff --git a/src/query/storages/delta/src/table.rs b/src/query/storages/delta/src/table.rs index 2bff4c9b76d2..053d86559a19 100644 --- a/src/query/storages/delta/src/table.rs +++ b/src/query/storages/delta/src/table.rs @@ -159,6 +159,7 @@ impl DeltaTable { #[async_backtrace::framed] pub async fn load(sp: &StorageParams) -> Result { let op = init_operator(sp)?; + let op = opendal_compat::v0_50_to_v0_49(op); let opendal_store = Arc::new(OpendalStore::new(op)); let mut table = DeltaTableBuilder::from_uri(Url::from_directory_path("/").unwrap()) diff --git a/src/query/storages/fuse/src/pruning/bloom_pruner.rs b/src/query/storages/fuse/src/pruning/bloom_pruner.rs index 24cbd30f5961..06ee5cefd1d0 100644 --- a/src/query/storages/fuse/src/pruning/bloom_pruner.rs +++ b/src/query/storages/fuse/src/pruning/bloom_pruner.rs @@ -218,7 +218,7 @@ impl BloomPrunerCreator { return Ok(None); }; - if self.dal.is_exist(bloom_index_location.0.as_str()).await? { + if self.dal.exists(bloom_index_location.0.as_str()).await? { info!("bloom index exists, ignore"); return Ok(None); } diff --git a/src/query/storages/hive/hive/src/hive_table.rs b/src/query/storages/hive/hive/src/hive_table.rs index 12fe4e01274b..fa728e47c226 100644 --- a/src/query/storages/hive/hive/src/hive_table.rs +++ b/src/query/storages/hive/hive/src/hive_table.rs @@ -614,6 +614,10 @@ async fn do_list_files_from_dir( if path[file_offset..].starts_with('.') || path[file_offset..].starts_with('_') { continue; } + // Ignore the location itself + if path.trim_matches('/') == location.trim_matches('/') { + continue; + } match meta.mode() { EntryMode::FILE => { diff --git a/tests/sqllogictests/src/mock_source/redis_source.rs b/tests/sqllogictests/src/mock_source/redis_source.rs index 905de7cbf603..e502b55a2909 100644 --- a/tests/sqllogictests/src/mock_source/redis_source.rs +++ b/tests/sqllogictests/src/mock_source/redis_source.rs @@ -46,19 +46,27 @@ async fn process(stream: TcpStream) { let request = String::from_utf8(buf.clone()).unwrap(); let cmds = parse_resp(request); for cmd in cmds { - if let Command::Get(key) = cmd { - // Return a value if the first character of the key is ASCII alphanumeric, - // otherwise treat it as the key does not exist. - let ret_value = if key.starts_with(|c: char| c.is_ascii_alphanumeric()) { - let v = format!("{}_value", key); - format!("${}\r\n{}\r\n", v.len(), v) - } else { - "$-1\r\n".to_string() - }; - ret_values.push_back(ret_value); - } else { - let ret_value = "+OK\r\n".to_string(); - ret_values.push_back(ret_value); + match cmd { + Command::Get(key) => { + // Return a value if the first character of the key is ASCII alphanumeric, + // otherwise treat it as the key does not exist. + let ret_value = if key.starts_with(|c: char| c.is_ascii_alphanumeric()) + { + let v = format!("{}_value", key); + format!("${}\r\n{}\r\n", v.len(), v) + } else { + "$-1\r\n".to_string() + }; + ret_values.push_back(ret_value); + } + Command::Ping => { + let ret_value = "+PONG\r\n".to_string(); + ret_values.push_back(ret_value); + } + _ => { + let ret_value = "+OK\r\n".to_string(); + ret_values.push_back(ret_value); + } } } } @@ -91,6 +99,7 @@ async fn process(stream: TcpStream) { // Redis command, only support get, other commands are ignored. enum Command { Get(String), + Ping, Invalid, Other, } @@ -115,6 +124,8 @@ fn parse_resp(request: String) -> Vec { if lines[2] == "GET" { let cmd = Command::Get(lines[4].to_string()); cmds.push(cmd); + } else if lines[2] == "PING" { + cmds.push(Command::Ping) } else { cmds.push(Command::Other); }