Skip to content

Commit

Permalink
Introduce mutex in ucxx::Request
Browse files Browse the repository at this point in the history
When running with the progress thread, concurrency issues may happen,
which a mutex can resolve in `ucxx::Request`. A common case is when
`ucxx::Endpoint` is registering an inflight request and `setStatus()`
has just begun running, in which case the inflight request will be
removed (a no-op as it hasn't been previously registered) and then
actually registered, which will cause the `ucxx::Request` to never
cleanup.
  • Loading branch information
pentschev committed Jul 31, 2023
1 parent aaba6f4 commit 878f8a5
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 16 deletions.
1 change: 1 addition & 0 deletions cpp/include/ucxx/request.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class Request : public Component {
nullptr}; ///< The submission object that will dispatch the request
std::string _operationName{
"request_undefined"}; ///< Human-readable operation name, mostly used for log messages
std::mutex _mutex{}; ///< Mutex to prevent checking status while it's being set
bool _enablePythonFuture{true}; ///< Whether Python future is enabled for this request

/**
Expand Down
54 changes: 38 additions & 16 deletions cpp/src/request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,19 +82,33 @@ void Request::cancel()
}
}

ucs_status_t Request::getStatus() { return _status; }
ucs_status_t Request::getStatus()
{
std::lock_guard<std::mutex> lock(_mutex);
return _status;
}

void* Request::getFuture() { return _future ? _future->getHandle() : nullptr; }
void* Request::getFuture()
{
std::lock_guard<std::mutex> lock(_mutex);
return _future ? _future->getHandle() : nullptr;
}

void Request::checkError()
{
std::lock_guard<std::mutex> lock(_mutex);

// Only load the atomic variable once
auto status = _status.load();

utils::ucsErrorThrow(status, status == UCS_ERR_MESSAGE_TRUNCATED ? _status_msg : std::string());
}

bool Request::isCompleted() { return _status != UCS_INPROGRESS; }
bool Request::isCompleted()
{
std::lock_guard<std::mutex> lock(_mutex);
return _status != UCS_INPROGRESS;
}

void Request::callback(void* request, ucs_status_t status)
{
Expand Down Expand Up @@ -165,24 +179,32 @@ void Request::process()

void Request::setStatus(ucs_status_t status)
{
if (_endpoint != nullptr) _endpoint->removeInflightRequest(this);
_worker->removeInflightRequest(this);
{
std::lock_guard<std::mutex> lock(_mutex);

ucxx_trace_req_f(_ownerString.c_str(),
_request,
_operationName.c_str(),
"callback called with status %d (%s)",
status,
ucs_status_string(status));
if (_endpoint != nullptr) _endpoint->removeInflightRequest(this);
_worker->removeInflightRequest(this);

if (_status != UCS_INPROGRESS) ucxx_error("setStatus called but the status was already set");
_status.store(status);
ucxx_trace_req_f(_ownerString.c_str(),
_request,
_operationName.c_str(),
"callback called with status %d (%s)",
status,
ucs_status_string(status));

if (_enablePythonFuture) {
auto future = std::static_pointer_cast<ucxx::Future>(_future);
future->notify(status);
if (_status != UCS_INPROGRESS) ucxx_error("setStatus called but the status was already set");
_status.store(status);

if (_enablePythonFuture) {
auto future = std::static_pointer_cast<ucxx::Future>(_future);
future->notify(status);
}
}

/**
* The callback may execute arbitrary user code (e.g., `getStatus()`) and that may require
* the mutex.
*/
ucxx_trace_req_f(_ownerString.c_str(),
_request,
_operationName.c_str(),
Expand Down

0 comments on commit 878f8a5

Please sign in to comment.