Skip to content

Commit

Permalink
Merge branch 'feature_4_3_2_bp1_fix_obkv_bug' into '4_3_2_release'
Browse files Browse the repository at this point in the history
Co-authored-by: zhiyunZDW <[email protected]>
  • Loading branch information
2 people authored and ob-robot committed Nov 28, 2024
1 parent ec1b875 commit 220bf9d
Show file tree
Hide file tree
Showing 19 changed files with 329 additions and 152 deletions.
4 changes: 2 additions & 2 deletions src/obproxy/obkv/table/ob_rpc_struct.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ int ObRpcRequest::init_rowkey_info(int64_t sub_req_count)
int ret = OB_SUCCESS;
if (request_info_inited_) {
ret = OB_ERR_UNEXPECTED;
LOG_WDIAG("rpc request already inited", K(ret));
LOG_WDIAG("rpc request already inited", K(sub_req_count), K(ret));
} else if (sub_req_count == 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WDIAG("empty request to init", K(sub_req_count), K(ret));
Expand Down Expand Up @@ -507,7 +507,7 @@ int ObRpcRequest::calc_partition_id_by_sub_rowkey(ObArenaAllocator &allocator,
if (OB_FAIL(ObRpcExprCalcTool::do_partition_id_calc_for_obkv(resolve_result, part_info, allocator, partition_ids,
ls_ids))) {
LOG_WDIAG("fail to calc partition id for table", K(ret));
} else if (partition_ids.count() != 1 || ls_ids.count() != 1) {
} else if (partition_ids.count() != 1 || ls_ids.count() > 1) {
// client_info.
// TODO RPC need update it is a shard request
ret = OB_ERR_UNEXPECTED;
Expand Down
3 changes: 2 additions & 1 deletion src/obproxy/obkv/table/ob_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ ObTableEntity::~ObTableEntity()
void ObTableEntity::reset()
{
rowkey_.reset();
rowkey_names_.reset();
properties_names_.reset();
properties_values_.reset();
reset_properties_buf();
Expand Down Expand Up @@ -2354,7 +2355,7 @@ ODP_DEF_DESERIALIZE_PAYLOAD(ObTableTabletOp)
if (OB_SUCC(ret)) {
if (OB_FAIL(rpc_request->init_rowkey_info(single_op_size))) {
LOG_WDIAG("fail to init rpc request", K(ret));
} if (OB_FAIL(single_ops_.prepare_allocate(single_op_size))) {
} else if (OB_FAIL(single_ops_.prepare_allocate(single_op_size))) {
LOG_WDIAG("fail to prepare allocatate single ops", K(ret), K(single_op_size));
}
for (int64_t i = 0; OB_SUCC(ret) && i < single_op_size; ++i) {
Expand Down
6 changes: 5 additions & 1 deletion src/obproxy/obkv/table/ob_table_rpc_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,11 @@ class ObRpcTableLSOperationRequest : public ObRpcRequest
{
public:
ObRpcTableLSOperationRequest();
~ObRpcTableLSOperationRequest() {}
~ObRpcTableLSOperationRequest()
{
tablet_id_index_map_.destroy();
ls_id_tablet_id_map_.destroy();
}
const ObTableLSOp &get_operation() const {return ls_request_.ls_op_;}
ObTableLSOp &get_operation() {return ls_request_.ls_op_;}

Expand Down
8 changes: 4 additions & 4 deletions src/obproxy/obutils/ob_proxy_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ class ObProxyConfig : public common::ObCommonConfig
DEF_INT(rpc_request_timeout, "5000000", "[0,)", "rpc resquest timeout, unit is us, default 5000000us, 0 means no timeout", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_SYS, CFG_MULTI_LEVEL_GLOBAL);
DEF_INT(rpc_net_timeout_base, "5", "[0,)", "rpc network timeout base, controls the overall network timeout time, defaults to 1 bytes/5ms, and the timeout time for 400 bytes of data is 2 seconds", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_SYS, CFG_MULTI_LEVEL_GLOBAL);
DEF_BOOL(rpc_enable_reroute, "true", "rpc request enable handle the move response for rpc request reroute", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_USER, CFG_MULTI_LEVEL_GLOBAL);
DEF_BOOL(rpc_enable_congestion, "true", "rpc request enable congestion feature or not for write request", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_USER, CFG_MULTI_LEVEL_GLOBAL);
DEF_BOOL(rpc_enable_congestion, "false", "rpc request enable congestion feature or not for write request", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_USER, CFG_MULTI_LEVEL_GLOBAL);
DEF_INT(rpc_server_net_invalid_time_us, "10000000", "rpc server net invalid time us", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_USER, CFG_MULTI_LEVEL_GLOBAL);
DEF_INT(rpc_server_net_max_pending_request, "10", "rpc server net max pending request", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_USER, CFG_MULTI_LEVEL_GLOBAL);
DEF_BOOL(rpc_enable_global_index, "true", "rpc request enable global index route for query request", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_USER, CFG_MULTI_LEVEL_GLOBAL);
Expand All @@ -553,14 +553,14 @@ class ObProxyConfig : public common::ObCommonConfig
DEF_INT(rpc_async_pull_batch_max_size, "10", "[2,50]", "max batch size for async pull, [2, 50]", CFG_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_USER, CFG_MULTI_LEVEL_GLOBAL);
DEF_INT(rpc_async_pull_batch_max_times, "0", "[0,)", "max batch fetch times for single entry cont", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_SYS, CFG_MULTI_LEVEL_GLOBAL);
DEF_TIME(rpc_async_pull_batch_wait_interval, "10ms", "[1ms, 1s]", "wait interval for async batch pull, [1ms, 1s]", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_USER, CFG_MULTI_LEVEL_GLOBAL);
DEF_INT(rpc_sub_req_max_retries, "10", "[0, 50]", "rpc sub request max retry times when sub request failed", CFG_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_SYS, CFG_MULTI_LEVEL_GLOBAL);
DEF_INT(rpc_sub_req_max_retries, "0", "[0, 50]", "rpc sub request max retry times when sub request failed", CFG_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_SYS, CFG_MULTI_LEVEL_GLOBAL);
DEF_BOOL(enable_rpc_throttle, "false", "if enabled, will be able to limit rpc req", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_USER, CFG_MULTI_LEVEL_GLOBAL);
DEF_INT(rpc_throttle_trigger_percentage, "50", "[0, 100)", "begin throttle when reach the percentage of mem occupied by rpc req", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_USER, CFG_MULTI_LEVEL_GLOBAL);
DEF_INT(rpc_throttle_limit_qps_qa, "0", "[0,)", "rpc req limit qps when throttle trigger", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_SYS, CFG_MULTI_LEVEL_GLOBAL);
DEF_TIME(rpc_request_throttle_waiting_time, "1ms", "[0ms,10s]", "rpc request throttle waiting time, [0ms, 10s]", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_USER, CFG_MULTI_LEVEL_GLOBAL);
DEF_INT(rpc_async_task_thread_num, "8", "[0,128]", "proxy rpc task thread num, default is 8, if 0, use (real work thread num/2), [0, 128]", CFG_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_USER, CFG_MULTI_LEVEL_GLOBAL);
DEF_BOOL(rpc_enable_async_analyze, "true", "if enabled, will async analyze large packet", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_SYS, CFG_MULTI_LEVEL_GLOBAL);
DEF_INT(rpc_sub_request_isolation_mode, "2", "[0,2]", "rpc sub req handle mode, 0: not isolate sub request, 1: isolate all sub req to async thread, 2: isolate sub req to part of async thread", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_SYS, CFG_MULTI_LEVEL_GLOBAL);
DEF_BOOL(rpc_enable_async_analyze, "false", "if enabled, will async analyze large packet", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_SYS, CFG_MULTI_LEVEL_GLOBAL);
DEF_INT(rpc_sub_request_isolation_mode, "0", "[0,2]", "rpc sub req handle mode, 0: not isolate sub request, 1: isolate all sub req to async thread, 2: isolate sub req to part of async thread", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_SYS, CFG_MULTI_LEVEL_GLOBAL);
DEF_INT(rpc_sub_request_weight, "10", "[0,]", "rpc sub request weight, recommended range [0, 100]", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_SYS, CFG_MULTI_LEVEL_GLOBAL);
DEF_INT(rpc_service_mode, "3", "[1, 3]","rpc service mode flag, option flag out, bit 1: OBKV service, bit 2: OB-Redis servcie", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_USER, CFG_MULTI_LEVEL_GLOBAL);
DEF_STR(rpc_redis_default_database_name, "obkv_redis", "obkv-redis default database name", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_USER, CFG_MULTI_LEVEL_GLOBAL);
Expand Down
4 changes: 2 additions & 2 deletions src/obproxy/proxy/route/ob_route_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ int ObRouteUtils::fetch_part_info(ObResultSetFetcher &rs_fetcher, ObProxyPartInf
} else {
memcpy(buf, part_range_type.ptr(), part_range_type.length());
part_expr.assign_ptr(buf, part_range_type.length());
part_info.set_part_range_type(part_range_type);
part_info.set_part_range_type(part_expr);
}

PROXY_EXTRACT_VARCHAR_FIELD_MYSQL(rs_fetcher, "sub_part_range_type", sub_part_range_type);
Expand All @@ -773,7 +773,7 @@ int ObRouteUtils::fetch_part_info(ObResultSetFetcher &rs_fetcher, ObProxyPartInf
} else {
memcpy(buf, sub_part_range_type.ptr(), sub_part_range_type.length());
part_expr.assign_ptr(buf, sub_part_range_type.length());
part_info.set_sub_part_range_type(sub_part_range_type);
part_info.set_sub_part_range_type(part_expr);
}
// get part expr
PROXY_EXTRACT_VARCHAR_FIELD_MYSQL(rs_fetcher, "part_expr", part_expr);
Expand Down
2 changes: 1 addition & 1 deletion src/obproxy/proxy/route/obproxy_expr_calculator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1040,7 +1040,7 @@ int ObRpcExprCalcTool::calculate_partition_id_with_rowkey(common::ObArenaAllocat
if (OB_FAIL(ObRpcExprCalcTool::do_partition_id_calc_for_obkv(resolve_result, part_info, allocator, partition_ids,
ls_ids))) {
LOG_WDIAG("fail to calc partition id for table", K(ret));
} else if (partition_ids.count() != 1 || ls_ids.count() != 1) {
} else if (partition_ids.count() != 1 || ls_ids.count() > 1) {
ret = OB_ERR_UNEXPECTED;
LOG_WDIAG("obkv single rowkey get part ids/ log stream ids is not one", K(partition_ids), K(ls_ids),
K(ret));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ int ObProxyRpcReqTableScanOp::handle_shard_rpc_obkv_batch_request(proxy::ObRpcRe
cont_index_++;
param.partition_id_ = partition_id; //not have any used
param.request_ = sub_rpc_req;
request_sm->set_execute_thread(NULL); //not set root ethread to root request, for scheduled error when cleanup
LOG_DEBUG("handle_shard_rpc_obkv_batch_request ", K(sub_rpc_req), "table_id", sub_obkv_info.get_table_id(), "partition_id", sub_obkv_info.get_partition_id(), K_(rpc_trace_id));
parallel_param.push_back(param);
rpc_reqs.push_back(sub_rpc_req);
Expand Down Expand Up @@ -246,6 +247,7 @@ int ObProxyRpcReqTableScanOp::handle_shard_rpc_ls_request(
cont_index_++;
param.partition_id_ = tablet_id; //not have any used
param.request_ = sub_rpc_req;
request_sm->set_execute_thread(NULL); //not set root ethread to root request, for scheduled error when cleanup
LOG_DEBUG("handle_shard_rpc_obkv_ls_request ", K(*sub_ls_req), K(sub_rpc_req), "table_id", sub_obkv_info.get_table_id(),
"partition_id", sub_obkv_info.get_partition_id(), K_(rpc_trace_id));
parallel_param.push_back(param);
Expand Down Expand Up @@ -335,6 +337,7 @@ int ObProxyRpcReqTableScanOp::handle_shard_rpc_obkv_query_request(
executor::ObProxyRpcParallelParam param;
param.partition_id_ = partition_id;
param.request_ = sub_rpc_req;
request_sm->set_execute_thread(NULL); //not set root ethread to root request, for scheduled error when cleanup

LOG_DEBUG("sub rpc_req init done", KPC(rpc_req), KPC(sub_rpc_req), K_(rpc_trace_id));
cont_index_++;
Expand Down Expand Up @@ -405,6 +408,11 @@ int ObProxyRpcReqTableScanOp::execute_rpc_request()
if (OB_NOT_NULL(parallel_param_.at(i).request_)) {
LOG_DEBUG("init sub req", "req", parallel_param_.at(i).request_, K_(rpc_trace_id));
parallel_param_.at(i).request_->set_sub_req_inited(true);
/* forbidden that sub request and request_sm cleanup before the ObProxyRpcReqParallelExecuteCont
* scheduled in queue but not called, because the snet will not be rewrite when root request cleanup
* cnet_state / sm_state will be reset when cleanup sub request by root request. So just use snet_state
* to forbiden that, and will be canceled when scheduled in ObProxyRpcReqParallelExecuteCont. */
parallel_param_.at(i).request_->set_snet_state(proxy::ObRpcReq::ServerNetState::RPC_REQ_SERVER_SHARDING_REQUEST_HANDLING_IDEL);
}
LOG_DEBUG("sub_rpc before send", K(i), "rpc request", parallel_param_.at(i).request_,
"info", parallel_param_.at(i).request_->get_rpc_request()->get_packet_meta(), K_(timeout_ms), K_(rpc_trace_id));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,38 @@ int ObProxyRpcReqParallelExecuteCont::init_task()
// init inner request sm
if (OB_FAIL(request_sm->init_inner_request(this, mutex_))) {
LOG_WDIAG("fail to init inner rpc request", K(ret));
} else {
}
//ignore error for
if (ret != common::OB_ALLOCATE_MEMORY_FAILED) {
/* to recover snet state after ObProxyRpcReqParallelExecuteCont called to execute */
rpc_request_->set_snet_state(proxy::ObRpcReq::ServerNetState::RPC_REQ_SERVER_INIT);

if (OB_FAIL(request_sm->setup_rpc_get_cluster())) { //skip parser rpc request
// todo: setup_rpc_get_cluster in async task to avoid ObProxyRpcReqParallelExecuteCont init fail.
LOG_WDIAG("ObProxyRpcReqParallelExecuteCont invalid to handle rpc request");
LOG_WDIAG("ObProxyRpcReqParallelExecuteCont invalid to handle rpc request", K(ret));
} else {
LOG_DEBUG("ObProxyRpcReqParallelExecuteCont success to handle the inner rpc request", K(rpc_request_), K(request_sm));
}
}
if (OB_FAIL(ret)) {

if (OB_FAIL(ret)) {
// request_sm->set_inner_cont(NULL);
// //maybe some situation that the main thread has schedule DONE/DESTROY event for this cont_, but it
// //clould't be scheduled for that the cont will be destroyed by itself when meet error, so to avoid NULL
// //handler to execute when thread scheduled.
// request_sm->cancel_child_callback_action(); //request_sm inner_cont has set to be NULL, could be scheduled again, just cancel scheduled
LOG_WDIAG("ObProxyRpcReqParallelExecuteCont init all schdule cont return failed, to ignore it in init_task", K(ret),
K(this), K(rpc_request_), KPC(rpc_request_));
//destory ObProxyRpcReqParallelExecuteCont by request_sm callabck
ret = OB_SUCCESS;
}
} else {
//not set inner_cont any more, so need delete at here
request_sm->set_inner_cont(NULL);
request_sm->cancel_child_callback_action(); //request_sm inner_cont has set to be NULL, could be scheduled again, just cancel scheduled
}
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WDIAG("invalid rpc_request to handle");
LOG_WDIAG("invalid rpc_request to handle", K(ret), K_(rpc_request));
}

return ret;
Expand All @@ -84,6 +102,7 @@ int ObProxyRpcReqParallelExecuteCont::finish_task(void *data)
proxy::ObRpcReq *resp = rpc_request_;
resp->set_cont_index(cont_index_); //set final cont_index
LOG_DEBUG("ObProxyRpcReqParallelExecuteCont finish task", K(data), K(resp));
rpc_request_ = NULL;
}

return ret;
Expand All @@ -97,6 +116,18 @@ void ObProxyRpcReqParallelExecuteCont::destroy()
cancel_pending_action();
cancel_inform_out_action();

//sub rpc request not schedued to execute init, but need to release
if (OB_NOT_NULL(rpc_request_)
&& rpc_request_->get_snet_state() == proxy::ObRpcReq::ServerNetState::RPC_REQ_SERVER_SHARDING_REQUEST_HANDLING_IDEL
&& OB_NOT_NULL(rpc_request_->get_request_sm())) {
// rpc_request_->get_request_sm()->set_execute_thread()
rpc_request_->get_request_sm()->init_inner_request_simple(NULL, mutex_);
rpc_request_->server_net_cancel_request();
LOG_WDIAG("has not inited task but need to destroy", K(this), K_(rpc_request));
ObRpcReq::ObRpcReqCleanupParams cleanup_params(ObRpcReq::ServerNetState::RPC_REQ_SERVER_CANCLED);
rpc_request_->cleanup(cleanup_params);
}

// 父类里最后是用的 delete, 但是本 Cont 是用的 op_alloc 分配出来的,
// 所以只能把父类里的 destroy 方法拷贝到这里
cb_cont_ = NULL;
Expand Down
Loading

0 comments on commit 220bf9d

Please sign in to comment.