From 64cad6e247f18b8427b8d20b4c953b9093dedcb1 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 20 Nov 2024 16:54:22 +0800 Subject: [PATCH 1/4] fix: remove metric engine's internal column from promql's query Signed-off-by: Ruihang Xia --- src/query/src/promql/planner.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index 001e41ca9934..2f5c1059ee87 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -57,6 +57,9 @@ use promql_parser::parser::{ VectorMatchCardinality, VectorSelector, }; use snafu::{ensure, OptionExt, ResultExt}; +use store_api::metric_engine_consts::{ + DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, +}; use table::table::adapter::DfTableProviderAdapter; use crate::promql::error::{ @@ -1128,6 +1131,10 @@ impl PromPlanner { .table_info() .meta .row_key_column_names() + .filter(|col| { + // remove metric engine's internal columns + col != &DATA_SCHEMA_TABLE_ID_COLUMN_NAME && col != &DATA_SCHEMA_TSID_COLUMN_NAME + }) .cloned() .collect(); self.ctx.tag_columns = tags; From 7e11e6b555ff448bb6797ae50475e34ee229623d Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 20 Nov 2024 17:22:00 +0800 Subject: [PATCH 2/4] remove unwrap Signed-off-by: Ruihang Xia --- .../src/extension_plan/series_divide.rs | 45 +++++++++++++------ 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/src/promql/src/extension_plan/series_divide.rs b/src/promql/src/extension_plan/series_divide.rs index b0ce0219ea96..0518cd88c007 100644 --- a/src/promql/src/extension_plan/series_divide.rs +++ b/src/promql/src/extension_plan/series_divide.rs @@ -254,7 +254,10 @@ impl Stream for SeriesDivideStream { let timer = std::time::Instant::now(); loop { if !self.buffer.is_empty() { - let cut_at = self.find_first_diff_row(); + let cut_at = match self.find_first_diff_row() { + Ok(cut_at) => cut_at, + Err(e) => return Poll::Ready(Some(Err(e))), + }; if let Some((batch_index, row_index)) = cut_at { // slice out the first time series and return it. let half_batch_of_first_series = @@ -318,10 +321,10 @@ impl SeriesDivideStream { /// Return the position to cut buffer. /// None implies the current buffer only contains one time series. - fn find_first_diff_row(&mut self) -> Option<(usize, usize)> { + fn find_first_diff_row(&mut self) -> DataFusionResult> { // fast path: no tag columns means all data belongs to the same series. if self.tag_indices.is_empty() { - return None; + return Ok(None); } let mut resumed_batch_index = self.inspect_start; @@ -337,18 +340,26 @@ impl SeriesDivideStream { for index in &self.tag_indices { let current_array = batch.column(*index); let last_array = last_batch.column(*index); - let current_value = current_array + let current_string_array = current_array .as_any() .downcast_ref::() - .unwrap() - .value(0); - let last_value = last_array + .ok_or_else(|| { + datafusion::error::DataFusionError::Internal( + "Failed to downcast tag column to StringArray".to_string(), + ) + })?; + let last_string_array = last_array .as_any() .downcast_ref::() - .unwrap() - .value(last_row); + .ok_or_else(|| { + datafusion::error::DataFusionError::Internal( + "Failed to downcast tag column to StringArray".to_string(), + ) + })?; + let current_value = current_string_array.value(0); + let last_value = last_string_array.value(last_row); if current_value != last_value { - return Some((resumed_batch_index, 0)); + return Ok(Some((resumed_batch_index, 0))); } } } @@ -356,7 +367,15 @@ impl SeriesDivideStream { // check column by column for index in &self.tag_indices { let array = batch.column(*index); - let string_array = array.as_any().downcast_ref::().unwrap(); + let string_array = + array + .as_any() + .downcast_ref::() + .ok_or_else(|| { + datafusion::error::DataFusionError::Internal( + "Failed to downcast tag column to StringArray".to_string(), + ) + })?; // the first row number that not equal to the next row. let mut same_until = 0; while same_until < num_rows - 1 { @@ -372,12 +391,12 @@ impl SeriesDivideStream { // all rows are the same, inspect next batch resumed_batch_index += 1; } else { - return Some((resumed_batch_index, result_index)); + return Ok(Some((resumed_batch_index, result_index))); } } self.inspect_start = resumed_batch_index; - None + Ok(None) } } From ef8334c0de72ff5e1202e99b0476c03f9ca07f8b Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 23 Jan 2025 21:46:01 +0800 Subject: [PATCH 3/4] filter out physical table Signed-off-by: Ruihang Xia --- src/servers/src/http/prometheus.rs | 34 +++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/src/servers/src/http/prometheus.rs b/src/servers/src/http/prometheus.rs index adbf49010bf0..d6f205ee9e62 100644 --- a/src/servers/src/http/prometheus.rs +++ b/src/servers/src/http/prometheus.rs @@ -45,7 +45,7 @@ use serde_json::Value; use session::context::{QueryContext, QueryContextRef}; use snafu::{Location, OptionExt, ResultExt}; use store_api::metric_engine_consts::{ - DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, + DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, PHYSICAL_TABLE_METADATA_KEY, }; pub use super::result::prometheus_resp::PrometheusJsonResponse; @@ -941,16 +941,30 @@ pub async fn label_values_query( .start_timer(); if label_name == METRIC_NAME_LABEL { - let mut table_names = match handler - .catalog_manager() - .table_names(&catalog, &schema, Some(&query_ctx)) - .await - { - Ok(table_names) => table_names, - Err(e) => { - return PrometheusJsonResponse::error(e.status_code(), e.output_msg()); + let catalog_manager = handler.catalog_manager(); + let mut tables_stream = catalog_manager.tables(&catalog, &schema, Some(&query_ctx)); + let mut table_names = Vec::new(); + while let Some(table) = tables_stream.next().await { + // filter out physical tables + match table { + Ok(table) => { + if table + .table_info() + .meta + .options + .extra_options + .contains_key(PHYSICAL_TABLE_METADATA_KEY) + { + continue; + } + + table_names.push(table.table_info().name.clone()); + } + Err(e) => { + return PrometheusJsonResponse::error(e.status_code(), e.output_msg()); + } } - }; + } table_names.sort_unstable(); return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(table_names)); } else if label_name == FIELD_NAME_LABEL { From b9b46908a296170b31bc7b8c97a09bddc613925d Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 23 Jan 2025 23:53:19 +0800 Subject: [PATCH 4/4] add integration test Signed-off-by: Ruihang Xia --- tests-integration/tests/http.rs | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 714f19a972ca..186d84d1b75f 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -659,6 +659,18 @@ pub async fn test_prom_http_api(store_type: StorageType) { assert!(prom_resp.error_type.is_none()); // query `__name__` without match[] + // create a physical table and a logical table + let res = client + .get("/v1/sql?sql=create table physical_table (`ts` timestamp time index, message string) with ('physical_metric_table' = 'true');") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK, "{:?}", res.text().await); + let res = client + .get("/v1/sql?sql=create table logic_table (`ts` timestamp time index, message string) with ('on_physical_table' = 'physical_table');") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK, "{:?}", res.text().await); + // query `__name__` let res = client .get("/v1/prometheus/api/v1/label/__name__/values") .send() @@ -668,6 +680,15 @@ pub async fn test_prom_http_api(store_type: StorageType) { assert_eq!(prom_resp.status, "success"); assert!(prom_resp.error.is_none()); assert!(prom_resp.error_type.is_none()); + assert_eq!( + prom_resp.data, + PrometheusResponse::Labels(vec![ + "demo".to_string(), + "demo_metrics".to_string(), + "logic_table".to_string(), + "numbers".to_string() + ]) + ); // buildinfo let res = client