From 878f8a512a6937043d31e4214a71be3895926e7d Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Mon, 31 Jul 2023 14:46:10 -0700 Subject: [PATCH] Introduce mutex in `ucxx::Request` When running with the progress thread, concurrency issues may happen, which a mutex can resolve in `ucxx::Request`. A common case is when `ucxx::Endpoint` is registering an inflight request and `setStatus()` has just begun running, in which case the inflight request will be removed (a no-op as it hasn't been previously registered) and then actually registered, which will cause the `ucxx::Request` to never cleanup. --- cpp/include/ucxx/request.h | 1 + cpp/src/request.cpp | 54 +++++++++++++++++++++++++++----------- 2 files changed, 39 insertions(+), 16 deletions(-) diff --git a/cpp/include/ucxx/request.h b/cpp/include/ucxx/request.h index bedf3aa3..40509728 100644 --- a/cpp/include/ucxx/request.h +++ b/cpp/include/ucxx/request.h @@ -39,6 +39,7 @@ class Request : public Component { nullptr}; ///< The submission object that will dispatch the request std::string _operationName{ "request_undefined"}; ///< Human-readable operation name, mostly used for log messages + std::mutex _mutex{}; ///< Mutex to prevent checking status while it's being set bool _enablePythonFuture{true}; ///< Whether Python future is enabled for this request /** diff --git a/cpp/src/request.cpp b/cpp/src/request.cpp index 29f868bc..85fd742f 100644 --- a/cpp/src/request.cpp +++ b/cpp/src/request.cpp @@ -82,19 +82,33 @@ void Request::cancel() } } -ucs_status_t Request::getStatus() { return _status; } +ucs_status_t Request::getStatus() +{ + std::lock_guard lock(_mutex); + return _status; +} -void* Request::getFuture() { return _future ? _future->getHandle() : nullptr; } +void* Request::getFuture() +{ + std::lock_guard lock(_mutex); + return _future ? _future->getHandle() : nullptr; +} void Request::checkError() { + std::lock_guard lock(_mutex); + // Only load the atomic variable once auto status = _status.load(); utils::ucsErrorThrow(status, status == UCS_ERR_MESSAGE_TRUNCATED ? _status_msg : std::string()); } -bool Request::isCompleted() { return _status != UCS_INPROGRESS; } +bool Request::isCompleted() +{ + std::lock_guard lock(_mutex); + return _status != UCS_INPROGRESS; +} void Request::callback(void* request, ucs_status_t status) { @@ -165,24 +179,32 @@ void Request::process() void Request::setStatus(ucs_status_t status) { - if (_endpoint != nullptr) _endpoint->removeInflightRequest(this); - _worker->removeInflightRequest(this); + { + std::lock_guard lock(_mutex); - ucxx_trace_req_f(_ownerString.c_str(), - _request, - _operationName.c_str(), - "callback called with status %d (%s)", - status, - ucs_status_string(status)); + if (_endpoint != nullptr) _endpoint->removeInflightRequest(this); + _worker->removeInflightRequest(this); - if (_status != UCS_INPROGRESS) ucxx_error("setStatus called but the status was already set"); - _status.store(status); + ucxx_trace_req_f(_ownerString.c_str(), + _request, + _operationName.c_str(), + "callback called with status %d (%s)", + status, + ucs_status_string(status)); - if (_enablePythonFuture) { - auto future = std::static_pointer_cast(_future); - future->notify(status); + if (_status != UCS_INPROGRESS) ucxx_error("setStatus called but the status was already set"); + _status.store(status); + + if (_enablePythonFuture) { + auto future = std::static_pointer_cast(_future); + future->notify(status); + } } + /** + * The callback may execute arbitrary user code (e.g., `getStatus()`) and that may require + * the mutex. + */ ucxx_trace_req_f(_ownerString.c_str(), _request, _operationName.c_str(),