diff --git a/src/meta/app/src/principal/client_session.rs b/src/meta/app/src/principal/client_session.rs index c8d3398342d1..ee8ee8e6c388 100644 --- a/src/meta/app/src/principal/client_session.rs +++ b/src/meta/app/src/principal/client_session.rs @@ -16,6 +16,4 @@ use serde::Deserialize; use serde::Serialize; #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] -pub struct ClientSession { - pub user_name: String, -} +pub struct ClientSession; diff --git a/src/meta/app/src/principal/client_session_ident.rs b/src/meta/app/src/principal/client_session_ident.rs index f4ffe9b4d119..97fb78fb05f5 100644 --- a/src/meta/app/src/principal/client_session_ident.rs +++ b/src/meta/app/src/principal/client_session_ident.rs @@ -12,10 +12,36 @@ // See the License for the specific language governing permissions and // limitations under the License. +use databend_common_meta_kvapi::kvapi::KeyBuilder; +use databend_common_meta_kvapi::kvapi::KeyCodec; +use databend_common_meta_kvapi::kvapi::KeyError; +use databend_common_meta_kvapi::kvapi::KeyParser; + use crate::tenant_key::ident::TIdent; -/// Define the meta-service key for a user setting. -pub type ClientSessionIdent = TIdent; +#[derive(PartialEq, Debug)] +pub struct UserSessionId { + pub user_name: String, + pub session_id: String, +} + +impl KeyCodec for UserSessionId { + fn encode_key(&self, b: KeyBuilder) -> KeyBuilder { + b.push_str(&self.user_name).push_str(&self.session_id) + } + + fn decode_key(parser: &mut KeyParser) -> Result + where Self: Sized { + let user_name = parser.next_str()?; + let session_id = parser.next_str()?; + Ok(Self { + user_name, + session_id, + }) + } +} + +pub type ClientSessionIdent = TIdent; pub use kvapi_impl::Resource; @@ -48,13 +74,19 @@ mod tests { use databend_common_meta_kvapi::kvapi::Key; use crate::principal::client_session_ident::ClientSessionIdent; + use crate::principal::client_session_ident::UserSessionId; use crate::tenant::Tenant; #[test] fn test_setting_ident() { let tenant = Tenant::new_literal("tenant1"); - let ident = ClientSessionIdent::new(tenant.clone(), "test"); - assert_eq!("__fd_session/tenant1/test", ident.to_string_key()); + let id = UserSessionId { + user_name: "m:n".to_string(), + session_id: "x:y".to_string(), + }; + let ident = ClientSessionIdent::new_generic(tenant.clone(), id); + // encode to x%3a:y:m:n first + assert_eq!("__fd_session/tenant1/m%3an/x%3ay", ident.to_string_key()); let got = ClientSessionIdent::from_str_key(&ident.to_string_key()).unwrap(); assert_eq!(ident, got); diff --git a/src/meta/proto-conv/src/client_session_from_to_protobuf_impl.rs b/src/meta/proto-conv/src/client_session_from_to_protobuf_impl.rs index de6e93227511..73f1a147a9fa 100644 --- a/src/meta/proto-conv/src/client_session_from_to_protobuf_impl.rs +++ b/src/meta/proto-conv/src/client_session_from_to_protobuf_impl.rs @@ -35,9 +35,7 @@ impl FromToProto for mt::ClientSession { where Self: Sized { reader_check_msg(p.ver, p.min_reader_ver)?; - let v = Self { - user_name: p.user_name, - }; + let v = Self {}; Ok(v) } @@ -45,7 +43,6 @@ impl FromToProto for mt::ClientSession { let p = pb::ClientSession { ver: VER, min_reader_ver: MIN_READER_VER, - user_name: self.user_name.clone(), }; Ok(p) } diff --git a/src/meta/protos/proto/client_session.proto b/src/meta/protos/proto/client_session.proto index 7445c1c2936f..af934109d28f 100644 --- a/src/meta/protos/proto/client_session.proto +++ b/src/meta/protos/proto/client_session.proto @@ -23,5 +23,7 @@ package databend_proto; message ClientSession { uint64 ver = 100; uint64 min_reader_ver = 101; - string user_name = 2; + + //string user_name = 2; is removed + reserved 2; } diff --git a/src/query/management/src/client_session.rs b/src/query/management/src/client_session.rs index eb82ade59463..684edde84cf2 100644 --- a/src/query/management/src/client_session.rs +++ b/src/query/management/src/client_session.rs @@ -20,6 +20,7 @@ use databend_common_meta_api::kv_pb_api::KVPbApi; use databend_common_meta_api::kv_pb_api::UpsertPB; use databend_common_meta_app::principal::client_session::ClientSession; use databend_common_meta_app::principal::client_session_ident::ClientSessionIdent; +use databend_common_meta_app::principal::client_session_ident::UserSessionId; use databend_common_meta_app::principal::user_token::QueryTokenInfo; use databend_common_meta_app::principal::user_token_ident::TokenIdent; use databend_common_meta_app::tenant::Tenant; @@ -47,8 +48,12 @@ impl ClientSessionMgr { fn token_ident(&self, token_hash: &str) -> TokenIdent { TokenIdent::new(self.tenant.clone(), token_hash) } - fn session_ident(&self, client_session_id: &str) -> ClientSessionIdent { - ClientSessionIdent::new(self.tenant.clone(), client_session_id) + fn session_ident(&self, user_name: &str, session_id: &str) -> ClientSessionIdent { + let id = UserSessionId { + user_name: user_name.to_string(), + session_id: session_id.to_string(), + }; + ClientSessionIdent::new_generic(self.tenant.clone(), id) } } @@ -104,13 +109,15 @@ impl ClientSessionMgr { #[fastrace::trace] pub async fn upsert_client_session_id( &self, + user_name: &str, client_session_id: &str, - value: ClientSession, ttl: Duration, ) -> Result { - let ident = self.session_ident(client_session_id); + let ident = self.session_ident(user_name, client_session_id); let seq = MatchSeq::GE(0); - let upsert = UpsertPB::update(ident, value).with(seq).with_ttl(ttl); + let upsert = UpsertPB::update(ident, ClientSession {}) + .with(seq) + .with_ttl(ttl); let res = self.kv_api.upsert_pb(&upsert).await?; @@ -121,9 +128,10 @@ impl ClientSessionMgr { #[fastrace::trace] pub async fn get_client_session( &self, + user_name: &str, client_session_id: &str, ) -> Result> { - let ident = self.session_ident(client_session_id); + let ident = self.session_ident(user_name, client_session_id); let res = self.kv_api.get_pb(&ident).await?; Ok(res.map(|r| r.data)) @@ -131,8 +139,14 @@ impl ClientSessionMgr { #[async_backtrace::framed] #[fastrace::trace] - pub async fn drop_client_session_id(&self, client_session_id: &str) -> Result<()> { - let key = self.session_ident(client_session_id).to_string_key(); + pub async fn drop_client_session_id( + &self, + user_name: &str, + client_session_id: &str, + ) -> Result<()> { + let key = self + .session_ident(user_name, client_session_id) + .to_string_key(); // simply ignore the result self.kv_api diff --git a/src/query/service/src/servers/http/v1/query/http_query.rs b/src/query/service/src/servers/http/v1/query/http_query.rs index f714a72b9ba0..c42e654f6d58 100644 --- a/src/query/service/src/servers/http/v1/query/http_query.rs +++ b/src/query/service/src/servers/http/v1/query/http_query.rs @@ -529,7 +529,7 @@ impl HttpQuery { let user_name = session.get_current_user()?.name; let has_temp_table_before_run = if let Some(cid) = session.get_client_session_id() { - ClientSessionManager::instance().on_query_start(&cid, &session); + ClientSessionManager::instance().on_query_start(&cid, &user_name, &session); true } else { false @@ -671,6 +671,7 @@ impl HttpQuery { *guard = Some(not_empty); ClientSessionManager::instance().on_query_finish( cid, + &self.user_name, session_state.temp_tbl_mgr, !not_empty, not_empty != self.has_temp_table_before_run, diff --git a/src/query/service/src/servers/http/v1/session/client_session_manager.rs b/src/query/service/src/servers/http/v1/session/client_session_manager.rs index 294dc8e66f57..e92defa3478f 100644 --- a/src/query/service/src/servers/http/v1/session/client_session_manager.rs +++ b/src/query/service/src/servers/http/v1/session/client_session_manager.rs @@ -23,7 +23,6 @@ use databend_common_cache::LruCache; use databend_common_config::InnerConfig; use databend_common_exception::ErrorCode; use databend_common_exception::Result; -use databend_common_meta_app::principal::client_session::ClientSession; use databend_common_meta_app::principal::user_token::QueryTokenInfo; use databend_common_meta_app::principal::user_token::TokenType; use databend_common_meta_app::tenant::Tenant; @@ -108,6 +107,10 @@ impl ClientSessionManager { GlobalInstance::get() } + pub fn state_key(client_session_id: &str, user_name: &str) -> String { + format!("{client_session_id}:{user_name}") + } + #[async_backtrace::framed] pub async fn init(_cfg: &InnerConfig) -> Result<()> { let mgr = Arc::new(Self { @@ -154,7 +157,7 @@ impl ClientSessionManager { client_session_api .upsert_client_session_id( client_session_id, - ClientSession { user_name }, + &user_name, REFRESH_TOKEN_TTL + TTL_GRACE_PERIOD_META, ) .await?; @@ -214,7 +217,7 @@ impl ClientSessionManager { client_session_api .upsert_token(&old, token_info, TOMBSTONE_TTL, true) .await?; - self.refresh_in_memory_states(&client_session_id); + self.refresh_in_memory_states(&client_session_id, &user); }; self.refresh_tokens .write() @@ -251,9 +254,7 @@ impl ClientSessionManager { client_session_api .upsert_client_session_id( &client_session_id, - ClientSession { - user_name: claim.user.clone(), - }, + &claim.user, REFRESH_TOKEN_TTL + TTL_GRACE_PERIOD_META, ) .await?; @@ -332,10 +333,11 @@ impl ClientSessionManager { .await .ok(); client_session_api - .drop_client_session_id(&claim.session_id) + .drop_client_session_id(&claim.session_id, &claim.user) .await .ok(); - let state = self.session_state.lock().remove(&claim.session_id); + let state_key = Self::state_key(&claim.session_id, &claim.user); + let state = self.session_state.lock().remove(&state_key); if let Some(state) = state { drop_all_temp_tables(&claim.session_id, state.temp_tbl_mgr).await?; } @@ -343,18 +345,20 @@ impl ClientSessionManager { Ok(()) } - pub fn refresh_in_memory_states(&self, client_session_id: &str) { + pub fn refresh_in_memory_states(&self, client_session_id: &str, user_name: &str) { + let key = Self::state_key(client_session_id, user_name); let mut guard = self.session_state.lock(); - guard.entry(client_session_id.to_string()).and_modify(|e| { - e.query_state = QueryState::InUse; + guard.entry(key).and_modify(|e| { + e.query_state = QueryState::Idle(Instant::now()); }); } - pub fn on_query_start(&self, client_session_id: &str, session: &Arc) { + pub fn on_query_start(&self, client_session_id: &str, user_name: &str, session: &Arc) { + let key = Self::state_key(client_session_id, user_name); let mut guard = self.session_state.lock(); - guard.entry(client_session_id.to_string()).and_modify(|e| { + guard.entry(key).and_modify(|e| { if matches!(e.query_state, QueryState::Idle(_)) { - e.query_state = QueryState::Idle(Instant::now()); + e.query_state = QueryState::InUse; session.set_temp_tbl_mgr(e.temp_tbl_mgr.clone()); } }); @@ -362,13 +366,15 @@ impl ClientSessionManager { pub fn on_query_finish( &self, client_session_id: &str, + user_name: &str, temp_tbl_mgr: TempTblMgrRef, is_empty: bool, just_changed: bool, ) { + let key = Self::state_key(client_session_id, user_name); if !is_empty || just_changed { let mut guard = self.session_state.lock(); - match guard.entry(client_session_id.to_string()) { + match guard.entry(key) { Entry::Vacant(e) => { if !is_empty { e.insert(SessionState { diff --git a/src/query/service/src/servers/http/v1/session/refresh_handler.rs b/src/query/service/src/servers/http/v1/session/refresh_handler.rs index 830330b7c140..dd2d962031c3 100644 --- a/src/query/service/src/servers/http/v1/session/refresh_handler.rs +++ b/src/query/service/src/servers/http/v1/session/refresh_handler.rs @@ -53,7 +53,7 @@ pub async fn refresh_handler( "JWT session should provide session_id when refresh session", )) })?; - mgr.refresh_in_memory_states(&session_id); + mgr.refresh_in_memory_states(&session_id, &ctx.user_name); let tenant = ctx.session.get_current_tenant(); mgr.refresh_session_handle(tenant, ctx.user_name.clone(), &session_id) diff --git a/src/query/service/src/servers/mysql/mysql_interactive_worker.rs b/src/query/service/src/servers/mysql/mysql_interactive_worker.rs index d5beedc846d9..796697c30c3f 100644 --- a/src/query/service/src/servers/mysql/mysql_interactive_worker.rs +++ b/src/query/service/src/servers/mysql/mysql_interactive_worker.rs @@ -29,7 +29,6 @@ use databend_common_expression::DataBlock; use databend_common_expression::DataSchemaRef; use databend_common_expression::SendableDataBlockStream; use databend_common_io::prelude::FormatSettings; -use databend_common_meta_app::principal::client_session::ClientSession; use databend_common_meta_app::principal::UserIdentity; use databend_common_metrics::mysql::*; use databend_common_users::CertifiedInfo; @@ -502,9 +501,7 @@ impl InteractiveWorker { .client_session_api(&tenant) .upsert_client_session_id( &session_id, - ClientSession { - user_name: user_name.clone(), - }, + &user_name, Duration::from_secs(3600 + 600), ) .await diff --git a/src/query/service/src/servers/mysql/mysql_session.rs b/src/query/service/src/servers/mysql/mysql_session.rs index 2ab9f93ee44a..e7ac832cb322 100644 --- a/src/query/service/src/servers/mysql/mysql_session.rs +++ b/src/query/service/src/servers/mysql/mysql_session.rs @@ -95,9 +95,10 @@ impl MySQLConnection { let tenant = session.get_current_tenant(); let session_id = session.get_id(); + let user = session.get_current_user()?.name; UserApiProvider::instance() .client_session_api(&tenant) - .drop_client_session_id(&session_id) + .drop_client_session_id(&session_id, &user) .await .ok(); drop_all_temp_tables(&session_id, session.temp_tbl_mgr()).await diff --git a/src/query/service/src/sessions/session.rs b/src/query/service/src/sessions/session.rs index 25b184afcce4..354d3649d353 100644 --- a/src/query/service/src/sessions/session.rs +++ b/src/query/service/src/sessions/session.rs @@ -380,21 +380,24 @@ impl Session { } pub fn get_temp_table_prefix(&self) -> Result { let typ = self.typ.read().clone(); - match typ { - SessionType::MySQL => Ok(self.id.clone()), + let session_id = match typ { + SessionType::MySQL => self.id.clone(), SessionType::HTTPQuery => { if let Some(id) = self.get_client_session_id() { - Ok(id) + id } else { - Err(ErrorCode::BadArguments( + return Err(ErrorCode::BadArguments( "can not use temp table in http handler if token is not used", - )) + )); } } - t => Err(ErrorCode::BadArguments(format!( - "can not use temp table in session type {t}" - ))), - } + t => { + return Err(ErrorCode::BadArguments(format!( + "can not use temp table in session type {t}" + ))); + } + }; + Ok(format!("{}/{session_id}", self.get_current_user()?.name)) } } diff --git a/src/query/storages/fuse/src/table_functions/fuse_vacuum_temporary_table.rs b/src/query/storages/fuse/src/table_functions/fuse_vacuum_temporary_table.rs index 8d2e90049ee2..708afe46d4ad 100644 --- a/src/query/storages/fuse/src/table_functions/fuse_vacuum_temporary_table.rs +++ b/src/query/storages/fuse/src/table_functions/fuse_vacuum_temporary_table.rs @@ -30,7 +30,6 @@ use databend_common_users::UserApiProvider; use databend_storages_common_table_meta::meta::TEMP_TABLE_STORAGE_PREFIX; use futures_util::TryStreamExt; use log::info; -use uuid::Uuid; use crate::sessions::TableContext; use crate::table_functions::SimpleTableFunc; @@ -62,26 +61,24 @@ impl SimpleTableFunc for FuseVacuumTemporaryTable { .recursive(true) .await?; let client_session_mgr = UserApiProvider::instance().client_session_api(&ctx.get_tenant()); - let mut session_ids = HashSet::new(); + let mut user_session_ids = HashSet::new(); while let Some(entry) = lister.try_next().await? { let path = entry.path(); - if let Some(session_id) = path.split('/').nth(1) { - if session_id.is_empty() { - continue; - } - // check if session_id is a valid uuid - let _ = Uuid::parse_str(session_id) - .map_err(|e| ErrorCode::Internal(format!("Invalid session_id: {}", e)))?; - session_ids.insert(session_id.to_string()); - } + let parts: Vec<_> = path.split('/').collect(); + if parts.len() < 3 { + return Err(ErrorCode::Internal(format!( + "invalid path for temp table: {path}" + ))); + }; + user_session_ids.insert((parts[1].to_string(), parts[2].to_string())); } - for session_id in session_ids { + for (user_name, session_id) in user_session_ids { if client_session_mgr - .get_client_session(&session_id) + .get_client_session(&user_name, &session_id) .await? .is_none() { - let path = format!("{}/{}", TEMP_TABLE_STORAGE_PREFIX, session_id); + let path = format!("{}/{}/{}", TEMP_TABLE_STORAGE_PREFIX, user_name, session_id); info!("Removing temporary table: {}", path); op.remove_all(&path).await?; }