Skip to content

Commit

Permalink
feat: introduce WriteHint to RegionPutRequest
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Jan 21, 2025
1 parent 8c88843 commit 657542e
Show file tree
Hide file tree
Showing 10 changed files with 70 additions and 13 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion src/metric-engine/src/engine/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use api::v1::Rows;
use common_telemetry::{error, info};
use snafu::{ensure, OptionExt};
use store_api::codec::PrimaryKeyEncoding;
use store_api::region_engine::WriteHint;
use store_api::region_request::{AffectedRows, RegionPutRequest};
use store_api::storage::{RegionId, TableId};

Expand Down Expand Up @@ -85,6 +86,9 @@ impl MetricEngineInner {
&mut request.rows,
encoding,
)?;
if encoding == PrimaryKeyEncoding::Sparse {
request.hint = WriteHint::PRIMARY_KEY_ENCODED | WriteHint::SPARSE_KEY_ENCODING;
}
self.data_region.write_data(data_region_id, request).await
}

Expand Down Expand Up @@ -162,7 +166,7 @@ impl MetricEngineInner {
#[cfg(test)]
mod tests {
use common_recordbatch::RecordBatches;
use store_api::region_engine::RegionEngine;
use store_api::region_engine::{RegionEngine, WriteHint};
use store_api::region_request::RegionRequest;
use store_api::storage::ScanRequest;

Expand All @@ -179,6 +183,7 @@ mod tests {
let rows = test_util::build_rows(1, 5);
let request = RegionRequest::Put(RegionPutRequest {
rows: Rows { schema, rows },
hint: WriteHint::empty(),
});

// write data
Expand Down Expand Up @@ -252,6 +257,7 @@ mod tests {
let rows = test_util::build_rows(3, 100);
let request = RegionRequest::Put(RegionPutRequest {
rows: Rows { schema, rows },
hint: WriteHint::empty(),
});

// write data
Expand All @@ -273,6 +279,7 @@ mod tests {
let rows = test_util::build_rows(1, 100);
let request = RegionRequest::Put(RegionPutRequest {
rows: Rows { schema, rows },
hint: WriteHint::empty(),
});

engine
Expand All @@ -292,6 +299,7 @@ mod tests {
let rows = test_util::build_rows(1, 100);
let request = RegionRequest::Put(RegionPutRequest {
rows: Rows { schema, rows },
hint: WriteHint::empty(),
});

engine
Expand Down
7 changes: 5 additions & 2 deletions src/metric-engine/src/metadata_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use store_api::metric_engine_consts::{
METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, METADATA_SCHEMA_VALUE_COLUMN_INDEX,
METADATA_SCHEMA_VALUE_COLUMN_NAME,
};
use store_api::region_engine::RegionEngine;
use store_api::region_engine::{RegionEngine, WriteHint};
use store_api::region_request::{RegionDeleteRequest, RegionPutRequest};
use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
Expand Down Expand Up @@ -512,7 +512,10 @@ impl MetadataRegion {
}],
};

RegionPutRequest { rows }
RegionPutRequest {
rows,
hint: WriteHint::empty(),
}
}

fn build_delete_request(keys: &[String]) -> RegionDeleteRequest {
Expand Down
9 changes: 8 additions & 1 deletion src/mito2/src/engine/basic_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use datatypes::schema::ColumnSchema;
use rstest::rstest;
use rstest_reuse::{self, apply};
use store_api::metadata::ColumnMetadata;
use store_api::region_engine::WriteHint;
use store_api::region_request::{RegionCreateRequest, RegionOpenRequest, RegionPutRequest};
use store_api::storage::RegionId;

Expand Down Expand Up @@ -530,7 +531,13 @@ async fn test_absent_and_invalid_columns() {
rows,
};
let err = engine
.handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows }))
.handle_request(
region_id,
RegionRequest::Put(RegionPutRequest {
rows,
hint: WriteHint::empty(),
}),
)
.await
.unwrap_err();
assert_eq!(StatusCode::InvalidArguments, err.status_code());
Expand Down
7 changes: 5 additions & 2 deletions src/mito2/src/engine/open_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use api::v1::Rows;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_recordbatch::RecordBatches;
use store_api::region_engine::{RegionEngine, RegionRole};
use store_api::region_engine::{RegionEngine, RegionRole, WriteHint};
use store_api::region_request::{
RegionCloseRequest, RegionOpenRequest, RegionPutRequest, RegionRequest,
};
Expand Down Expand Up @@ -129,7 +129,10 @@ async fn test_engine_open_readonly() {
let err = engine
.handle_request(
region_id,
RegionRequest::Put(RegionPutRequest { rows: rows.clone() }),
RegionRequest::Put(RegionPutRequest {
rows: rows.clone(),
hint: WriteHint::empty(),
}),
)
.await
.unwrap_err();
Expand Down
15 changes: 12 additions & 3 deletions src/mito2/src/engine/set_role_state_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use api::v1::Rows;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use store_api::region_engine::{
RegionEngine, RegionRole, SetRegionRoleStateResponse, SettableRegionRoleState,
RegionEngine, RegionRole, SetRegionRoleStateResponse, SettableRegionRoleState, WriteHint,
};
use store_api::region_request::{RegionPutRequest, RegionRequest};
use store_api::storage::RegionId;
Expand Down Expand Up @@ -74,7 +74,10 @@ async fn test_set_role_state_gracefully() {
let error = engine
.handle_request(
region_id,
RegionRequest::Put(RegionPutRequest { rows: rows.clone() }),
RegionRequest::Put(RegionPutRequest {
rows: rows.clone(),
hint: WriteHint::empty(),
}),
)
.await
.unwrap_err();
Expand Down Expand Up @@ -152,7 +155,13 @@ async fn test_write_downgrading_region() {
rows: build_rows(0, 42),
};
let err = engine
.handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows }))
.handle_request(
region_id,
RegionRequest::Put(RegionPutRequest {
rows: rows.clone(),
hint: WriteHint::empty(),
}),
)
.await
.unwrap_err();
assert_eq!(err.status_code(), StatusCode::RegionNotReady)
Expand Down
10 changes: 8 additions & 2 deletions src/mito2/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use rskafka::client::{Client, ClientBuilder};
use rskafka::record::Record;
use rstest_reuse::template;
use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
use store_api::region_engine::{RegionEngine, RegionRole};
use store_api::region_engine::{RegionEngine, RegionRole, WriteHint};
use store_api::region_request::{
RegionCloseRequest, RegionCreateRequest, RegionDeleteRequest, RegionFlushRequest,
RegionOpenRequest, RegionPutRequest, RegionRequest,
Expand Down Expand Up @@ -1049,7 +1049,13 @@ pub fn delete_rows_schema(request: &RegionCreateRequest) -> Vec<api::v1::ColumnS
pub async fn put_rows(engine: &MitoEngine, region_id: RegionId, rows: Rows) {
let num_rows = rows.rows.len();
let result = engine
.handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows }))
.handle_request(
region_id,
RegionRequest::Put(RegionPutRequest {
rows,
hint: WriteHint::empty(),
}),
)
.await
.unwrap();
assert_eq!(num_rows, result.affected_rows);
Expand Down
1 change: 1 addition & 0 deletions src/store-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ workspace = true
api.workspace = true
aquamarine.workspace = true
async-trait.workspace = true
bitflags.workspace = true
common-base.workspace = true
common-error.workspace = true
common-macro.workspace = true
Expand Down
9 changes: 9 additions & 0 deletions src/store-api/src/region_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::sync::{Arc, Mutex};
use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole as PbRegionRole};
use api::region::RegionResponse;
use async_trait::async_trait;
use bitflags::bitflags;
use common_error::ext::{BoxedError, PlainError};
use common_error::status_code::StatusCode;
use common_recordbatch::SendableRecordBatchStream;
Expand All @@ -36,6 +37,14 @@ use crate::metadata::RegionMetadataRef;
use crate::region_request::{RegionOpenRequest, RegionRequest};
use crate::storage::{RegionId, ScanRequest};

bitflags! {
#[derive(Debug, Clone, Copy)]
pub struct WriteHint: u8 {
const PRIMARY_KEY_ENCODED = 1;
const SPARSE_KEY_ENCODING = 1 << 1;
}
}

/// The settable region role state.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum SettableRegionRoleState {
Expand Down
14 changes: 12 additions & 2 deletions src/store-api/src/region_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use crate::mito_engine_options::{
TWCS_TIME_WINDOW,
};
use crate::path_utils::region_dir;
use crate::region_engine::WriteHint;
use crate::storage::{ColumnId, RegionId, ScanRequest};

#[derive(Debug, IntoStaticStr)]
Expand Down Expand Up @@ -95,8 +96,15 @@ fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequ
.into_iter()
.filter_map(|r| {
let region_id = r.region_id.into();
r.rows
.map(|rows| (region_id, RegionRequest::Put(RegionPutRequest { rows })))
r.rows.map(|rows| {
(
region_id,
RegionRequest::Put(RegionPutRequest {
rows,
hint: WriteHint::empty(),
}),
)
})
})
.collect();
Ok(requests)
Expand Down Expand Up @@ -232,6 +240,8 @@ fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, Regi
pub struct RegionPutRequest {
/// Rows to put.
pub rows: Rows,
/// Write hint.
pub hint: WriteHint,
}

#[derive(Debug)]
Expand Down

0 comments on commit 657542e

Please sign in to comment.