Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCP/PROTO/RECONFIG: Fix copy header handling in reconfig. #10452

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions src/ucp/proto/proto_reconfig.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@

#include "proto_debug.h"
#include "proto_select.h"
#include "proto_am.inl"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Order

#include "proto_common.inl"

#include <ucp/am/ucp_am.inl>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Order

#include <ucp/core/ucp_worker.inl>


Expand All @@ -30,11 +32,21 @@ static ucs_status_t ucp_proto_reconfig_select_progress(uct_pending_req_t *self)
return req->send.uct.func(&req->send.uct);
}

static void ucp_proto_reconfig_abort(ucp_request_t *req, ucs_status_t status)
{
if (ucp_proto_config_is_am(req->send.proto_config)) {
ucp_am_release_user_header(req);
}

ucp_request_complete_send(req, status);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add blank line

}

static ucs_status_t ucp_proto_reconfig_progress(uct_pending_req_t *self)
{
ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct);
ucp_ep_h ep = req->send.ep;
UCS_STRING_BUFFER_ONSTACK(strb, 256);
ucs_status_t status;

/* This protocol should not be selected for valid and connected endpoint */
if (ep->flags & UCP_EP_FLAG_REMOTE_CONNECTED) {
Expand All @@ -47,10 +59,19 @@ static ucs_status_t ucp_proto_reconfig_progress(uct_pending_req_t *self)
ucp_operation_names, &strb);
ucs_error("cannot find remote protocol for: %s",
ucs_string_buffer_cstr(&strb));
ucp_request_complete_send(req, UCS_ERR_CANCELED);
ucp_proto_request_abort(req, UCS_ERR_CANCELED);
return UCS_OK;
}

if (ucp_proto_config_is_am(req->send.proto_config) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we reuse ucp_am_handle_user_header_send_status_or_abort?

Copy link
Contributor Author

@ofirfarjun7 ofirfarjun7 Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How we will know if ucp_am_handle_user_header_send_status_or_abort succeed or failed? in case of a failure we need to return.
The only way is to compare status we pass to ucp_am_handle_user_header_send_status_or_abort with the return value of ucp_am_handle_user_header_send_status_or_abort, but I think
we discussed it and decided that it's not a good practice.

(req->send.msg_proto.am.flags & UCP_AM_SEND_FLAG_COPY_HEADER)) {
status = ucp_proto_am_req_copy_header(req);
if (status != UCS_OK) {
ucp_proto_request_abort(req, status);
return UCS_OK;
}
}

if (ep->cfg_index != req->send.proto_config->ep_cfg_index) {
ucp_trace_req(req,
"ep configuration changed from %d to %d,"
Expand Down Expand Up @@ -96,6 +117,6 @@ ucp_proto_t ucp_reconfig_proto = {
.probe = ucp_proto_reconfig_probe,
.query = ucp_proto_default_query,
.progress = {ucp_proto_reconfig_progress},
.abort = ucp_request_complete_send,
.abort = ucp_proto_reconfig_abort,
.reset = (ucp_request_reset_func_t)ucs_empty_function_return_success
};
20 changes: 14 additions & 6 deletions test/gtest/ucp/test_ucp_sockaddr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2526,6 +2526,7 @@ class test_ucp_sockaddr_protocols : public test_ucp_sockaddr {
std::string sb(size, 'x');
std::string rb(size, 'y');
std::string shdr(hdr_size, 'x');
std::string shdr_copy = shdr;
std::string rhdr(hdr_size, 'y');
ucp_mem_h smemh(NULL);
ucp_mem_h rmemh(NULL);
Expand All @@ -2552,6 +2553,14 @@ class test_ucp_sockaddr_protocols : public test_ucp_sockaddr {
ucs_status_ptr_t sreq = ucp_am_send_nbx(sender().ep(), 0,
&shdr[0], hdr_size,
&sb[0], size, &param);
/* First message request triggers connection establishment and
* is placed into pending queue.
* To check UCP_AM_SEND_FLAG_COPY_HEADER we change AM header
* content while the request is still in pending queue.*/
if (flags & UCP_AM_SEND_FLAG_COPY_HEADER) {
shdr.assign(shdr.size(), 'a');
}

request_wait(sreq);
wait_for_flag(&arg.received);
// wait for receive request completion after 'received' flag set to
Expand All @@ -2560,7 +2569,7 @@ class test_ucp_sockaddr_protocols : public test_ucp_sockaddr {
EXPECT_TRUE(arg.received);

compare_buffers(sb, rb);
compare_buffers(shdr, rhdr);
compare_buffers(shdr_copy, rhdr);

set_am_data_handler(receiver(), 0, NULL, NULL);

Expand Down Expand Up @@ -2794,28 +2803,27 @@ UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_protocols,
test_am_send_recv(64 * UCS_KBYTE, 0, 2, true, true);
}
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_protocols, am_short_reset,
RUNNING_ON_VALGRIND, "PROTO_ENABLE=n", "ZCOPY_THRESH=inf")
RUNNING_ON_VALGRIND, "ZCOPY_THRESH=inf")
{
test_am_send_recv(16, 8, 1, false, false, UCP_AM_SEND_FLAG_COPY_HEADER);
}

UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_protocols, am_bcopy_reset,
RUNNING_ON_VALGRIND,
"PROTO_ENABLE=n", "ZCOPY_THRESH=inf")
RUNNING_ON_VALGRIND, "ZCOPY_THRESH=inf")
{
test_am_send_recv(2 * UCS_KBYTE, 8, 1, false, false,
UCP_AM_SEND_FLAG_COPY_HEADER);
}

UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_protocols, am_zcopy_reset,
RUNNING_ON_VALGRIND, "PROTO_ENABLE=n")
RUNNING_ON_VALGRIND)
{
test_am_send_recv(16 * UCS_KBYTE, 8, 1, false, false,
UCP_AM_SEND_FLAG_COPY_HEADER);
}

UCS_TEST_SKIP_COND_P(test_ucp_sockaddr_protocols, am_rndv_reset,
RUNNING_ON_VALGRIND, "PROTO_ENABLE=n", "RNDV_THRESH=0")
RUNNING_ON_VALGRIND, "RNDV_THRESH=0")
{
test_am_send_recv(16 * UCS_KBYTE, 8, 1, false, false,
UCP_AM_SEND_FLAG_COPY_HEADER);
Expand Down
Loading