Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: isolate session_id and session states of diff users. #16592

Merged
merged 5 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions src/meta/app/src/principal/client_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,4 @@ use serde::Deserialize;
use serde::Serialize;

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct ClientSession {
pub user_name: String,
}
pub struct ClientSession;
75 changes: 72 additions & 3 deletions src/meta/app/src/principal/client_session_ident.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,70 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_meta_kvapi::kvapi::KeyBuilder;
use databend_common_meta_kvapi::kvapi::KeyCodec;
use databend_common_meta_kvapi::kvapi::KeyError;
use databend_common_meta_kvapi::kvapi::KeyParser;

use crate::tenant_key::ident::TIdent;

#[derive(PartialEq, Debug)]
pub struct UserSessionId {
pub session_id: String,
pub user_name: String,
}

impl UserSessionId {
const ESCAPE_CHARS: [u8; 1] = [b':'];

pub fn parse(s: &str) -> Result<Self, KeyError> {
let parts = s.splitn(2, ':').collect::<Vec<&str>>();
if parts.len() != 2 {
return Err(KeyError::WrongNumberOfSegments {
expect: 2,
got: s.to_string(),
});
}

let session_id = KeyParser::unescape_specified(parts[0], &Self::ESCAPE_CHARS)?;

Ok(UserSessionId {
session_id,
user_name: parts[1].to_string(),
})
}

/// Encode the user identity into a string for constructing a meta-service key.
/// note this key is also used as a segment in S3 path.
///
/// Similar to meta server, Opendal handles URL percent_encoding automatically, we do not need
/// to care about it here.
///
/// Since the session ID is expected to be a UUID, only minimal escaping is needed.
/// This preserves readability without altering the UUID, making the result more human-friendly.
pub fn encode(&self) -> String {
format!(
"{}:{}",
KeyBuilder::escape_specified(&self.session_id, &Self::ESCAPE_CHARS),
&self.user_name,
)
}
}

impl KeyCodec for UserSessionId {
fn encode_key(&self, b: KeyBuilder) -> KeyBuilder {
b.push_str(&self.encode())
}

fn decode_key(parser: &mut KeyParser) -> Result<Self, KeyError>
where Self: Sized {
let s = parser.next_str()?;
Self::parse(&s)
}
}

/// Define the meta-service key for a user setting.
pub type ClientSessionIdent = TIdent<Resource>;
pub type ClientSessionIdent = TIdent<Resource, UserSessionId>;

pub use kvapi_impl::Resource;

Expand Down Expand Up @@ -48,13 +108,22 @@ mod tests {
use databend_common_meta_kvapi::kvapi::Key;

use crate::principal::client_session_ident::ClientSessionIdent;
use crate::principal::client_session_ident::UserSessionId;
use crate::tenant::Tenant;

#[test]
fn test_setting_ident() {
let tenant = Tenant::new_literal("tenant1");
let ident = ClientSessionIdent::new(tenant.clone(), "test");
assert_eq!("__fd_session/tenant1/test", ident.to_string_key());
let id = UserSessionId {
session_id: "x:y".to_string(),
user_name: "m:n".to_string(),
};
let ident = ClientSessionIdent::new_generic(tenant.clone(), id);
// encode to x%3a:y:m:n first
assert_eq!(
"__fd_session/tenant1/x%253ay%3am%3an",
ident.to_string_key()
);

let got = ClientSessionIdent::from_str_key(&ident.to_string_key()).unwrap();
assert_eq!(ident, got);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,14 @@ impl FromToProto for mt::ClientSession {
where Self: Sized {
reader_check_msg(p.ver, p.min_reader_ver)?;

let v = Self {
user_name: p.user_name,
};
let v = Self {};
Ok(v)
}

fn to_pb(&self) -> Result<Self::PB, Incompatible> {
let p = pb::ClientSession {
ver: VER,
min_reader_ver: MIN_READER_VER,
user_name: self.user_name.clone(),
};
Ok(p)
}
Expand Down
1 change: 0 additions & 1 deletion src/meta/protos/proto/client_session.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,4 @@ package databend_proto;
message ClientSession {
uint64 ver = 100;
uint64 min_reader_ver = 101;
string user_name = 2;
}
30 changes: 22 additions & 8 deletions src/query/management/src/client_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use databend_common_meta_api::kv_pb_api::KVPbApi;
use databend_common_meta_api::kv_pb_api::UpsertPB;
use databend_common_meta_app::principal::client_session::ClientSession;
use databend_common_meta_app::principal::client_session_ident::ClientSessionIdent;
use databend_common_meta_app::principal::client_session_ident::UserSessionId;
use databend_common_meta_app::principal::user_token::QueryTokenInfo;
use databend_common_meta_app::principal::user_token_ident::TokenIdent;
use databend_common_meta_app::tenant::Tenant;
Expand Down Expand Up @@ -47,8 +48,12 @@ impl ClientSessionMgr {
fn token_ident(&self, token_hash: &str) -> TokenIdent {
TokenIdent::new(self.tenant.clone(), token_hash)
}
fn session_ident(&self, client_session_id: &str) -> ClientSessionIdent {
ClientSessionIdent::new(self.tenant.clone(), client_session_id)
fn session_ident(&self, session_id: &str, user_name: &str) -> ClientSessionIdent {
let id = UserSessionId {
session_id: session_id.to_string(),
user_name: user_name.to_string(),
};
ClientSessionIdent::new_generic(self.tenant.clone(), id)
}
}

Expand Down Expand Up @@ -105,12 +110,14 @@ impl ClientSessionMgr {
pub async fn upsert_client_session_id(
&self,
client_session_id: &str,
value: ClientSession,
user_name: &str,
ttl: Duration,
) -> Result<bool> {
let ident = self.session_ident(client_session_id);
let ident = self.session_ident(client_session_id, user_name);
let seq = MatchSeq::GE(0);
let upsert = UpsertPB::update(ident, value).with(seq).with_ttl(ttl);
let upsert = UpsertPB::update(ident, ClientSession {})
.with(seq)
.with_ttl(ttl);

let res = self.kv_api.upsert_pb(&upsert).await?;

Expand All @@ -122,17 +129,24 @@ impl ClientSessionMgr {
pub async fn get_client_session(
&self,
client_session_id: &str,
user_name: &str,
) -> Result<Option<ClientSession>> {
let ident = self.session_ident(client_session_id);
let ident = self.session_ident(client_session_id, user_name);
let res = self.kv_api.get_pb(&ident).await?;

Ok(res.map(|r| r.data))
}

#[async_backtrace::framed]
#[fastrace::trace]
pub async fn drop_client_session_id(&self, client_session_id: &str) -> Result<()> {
let key = self.session_ident(client_session_id).to_string_key();
pub async fn drop_client_session_id(
&self,
client_session_id: &str,
user_name: &str,
) -> Result<()> {
let key = self
.session_ident(client_session_id, user_name)
.to_string_key();

// simply ignore the result
self.kv_api
Expand Down
3 changes: 2 additions & 1 deletion src/query/service/src/servers/http/v1/query/http_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ impl HttpQuery {
let user_name = session.get_current_user()?.name;

let has_temp_table_before_run = if let Some(cid) = session.get_client_session_id() {
ClientSessionManager::instance().on_query_start(&cid, &session);
ClientSessionManager::instance().on_query_start(&cid, &user_name, &session);
true
} else {
false
Expand Down Expand Up @@ -671,6 +671,7 @@ impl HttpQuery {
*guard = Some(not_empty);
ClientSessionManager::instance().on_query_finish(
cid,
&self.user_name,
session_state.temp_tbl_mgr,
!not_empty,
not_empty != self.has_temp_table_before_run,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use databend_common_cache::LruCache;
use databend_common_config::InnerConfig;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::principal::client_session::ClientSession;
use databend_common_meta_app::principal::user_token::QueryTokenInfo;
use databend_common_meta_app::principal::user_token::TokenType;
use databend_common_meta_app::tenant::Tenant;
Expand Down Expand Up @@ -108,6 +107,10 @@ impl ClientSessionManager {
GlobalInstance::get()
}

pub fn state_key(client_session_id: &str, user_name: &str) -> String {
format!("{client_session_id}:{user_name}")
}

#[async_backtrace::framed]
pub async fn init(_cfg: &InnerConfig) -> Result<()> {
let mgr = Arc::new(Self {
Expand Down Expand Up @@ -154,7 +157,7 @@ impl ClientSessionManager {
client_session_api
.upsert_client_session_id(
client_session_id,
ClientSession { user_name },
&user_name,
REFRESH_TOKEN_TTL + TTL_GRACE_PERIOD_META,
)
.await?;
Expand Down Expand Up @@ -214,7 +217,7 @@ impl ClientSessionManager {
client_session_api
.upsert_token(&old, token_info, TOMBSTONE_TTL, true)
.await?;
self.refresh_in_memory_states(&client_session_id);
self.refresh_in_memory_states(&client_session_id, &user);
};
self.refresh_tokens
.write()
Expand Down Expand Up @@ -251,9 +254,7 @@ impl ClientSessionManager {
client_session_api
.upsert_client_session_id(
&client_session_id,
ClientSession {
user_name: claim.user.clone(),
},
&claim.user,
REFRESH_TOKEN_TTL + TTL_GRACE_PERIOD_META,
)
.await?;
Expand Down Expand Up @@ -332,27 +333,30 @@ impl ClientSessionManager {
.await
.ok();
client_session_api
.drop_client_session_id(&claim.session_id)
.drop_client_session_id(&claim.session_id, &claim.user)
.await
.ok();
let state = self.session_state.lock().remove(&claim.session_id);
let state_key = Self::state_key(&claim.session_id, &claim.user);
let state = self.session_state.lock().remove(&state_key);
if let Some(state) = state {
drop_all_temp_tables(&claim.session_id, state.temp_tbl_mgr).await?;
}
};
Ok(())
}

pub fn refresh_in_memory_states(&self, client_session_id: &str) {
pub fn refresh_in_memory_states(&self, client_session_id: &str, user_name: &str) {
let key = Self::state_key(client_session_id, user_name);
let mut guard = self.session_state.lock();
guard.entry(client_session_id.to_string()).and_modify(|e| {
e.query_state = QueryState::InUse;
guard.entry(key).and_modify(|e| {
e.query_state = QueryState::Idle(Instant::now());
});
}

pub fn on_query_start(&self, client_session_id: &str, session: &Arc<Session>) {
pub fn on_query_start(&self, client_session_id: &str, user_name: &str, session: &Arc<Session>) {
let key = Self::state_key(client_session_id, user_name);
let mut guard = self.session_state.lock();
guard.entry(client_session_id.to_string()).and_modify(|e| {
guard.entry(key).and_modify(|e| {
if matches!(e.query_state, QueryState::Idle(_)) {
e.query_state = QueryState::Idle(Instant::now());
session.set_temp_tbl_mgr(e.temp_tbl_mgr.clone());
Expand All @@ -362,13 +366,15 @@ impl ClientSessionManager {
pub fn on_query_finish(
&self,
client_session_id: &str,
user_name: &str,
temp_tbl_mgr: TempTblMgrRef,
is_empty: bool,
just_changed: bool,
) {
let key = Self::state_key(client_session_id, user_name);
if !is_empty || just_changed {
let mut guard = self.session_state.lock();
match guard.entry(client_session_id.to_string()) {
match guard.entry(key) {
Entry::Vacant(e) => {
if !is_empty {
e.insert(SessionState {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub async fn refresh_handler(
"JWT session should provide session_id when refresh session",
))
})?;
mgr.refresh_in_memory_states(&session_id);
mgr.refresh_in_memory_states(&session_id, &ctx.user_name);

let tenant = ctx.session.get_current_tenant();
mgr.refresh_session_handle(tenant, ctx.user_name.clone(), &session_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use databend_common_expression::DataBlock;
use databend_common_expression::DataSchemaRef;
use databend_common_expression::SendableDataBlockStream;
use databend_common_io::prelude::FormatSettings;
use databend_common_meta_app::principal::client_session::ClientSession;
use databend_common_meta_app::principal::UserIdentity;
use databend_common_metrics::mysql::*;
use databend_common_users::CertifiedInfo;
Expand Down Expand Up @@ -502,9 +501,7 @@ impl InteractiveWorker {
.client_session_api(&tenant)
.upsert_client_session_id(
&session_id,
ClientSession {
user_name: user_name.clone(),
},
&user_name,
Duration::from_secs(3600 + 600),
)
.await
Expand Down
3 changes: 2 additions & 1 deletion src/query/service/src/servers/mysql/mysql_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,10 @@ impl MySQLConnection {

let tenant = session.get_current_tenant();
let session_id = session.get_id();
let user = session.get_current_user()?.name;
UserApiProvider::instance()
.client_session_api(&tenant)
.drop_client_session_id(&session_id)
.drop_client_session_id(&session_id, &user)
.await
.ok();
drop_all_temp_tables(&session_id, session.temp_tbl_mgr()).await
Expand Down
Loading
Loading