Skip to content

Commit

Permalink
Make amPutDone non-blocking (#26123)
Browse files Browse the repository at this point in the history
Modify amPutDone so that the PUT of the "done" indicator is
non-blocking. This is invoked by an active message handler and notifies
the sender that the active message has completed. There is no need for
the active message handler to wait for the PUT to complete because, by
definition, the active message has completed so there is nothing to be
performed after the PUT. One complication is that the transmit context
must be progressed; normally it would be progressed while waiting for
the PUT to complete. Instead, the transmit context is progressed either
when it is freed, or during shutdown, whichever comes first, unless it
is the transmit context for the active message thread in which case it
is progressed continually by that thread.

As a consequence of these changes, RMA PUT injection has been removed
and CHPL_RT_COMM_OFI_INJECT_RMA is now obsolete. The benefit of
injection is that it makes the source buffer available for reuse once
the operation has been initiated, but 1) this isn't advantageous to
blocking PUTs because the task is going to block until the PUT completes
anyway, 2) the semantics of non-blocking PUTs (via chpl_comm_put_nb)
require that the source buffer not be modified until the PUT completes,
and 3) `amPutDone` uses a persistent source buffer containing a constant
value, so re-use is not an issue.

[Reviewed by @jabraham17, thank you.]
  • Loading branch information
jhh67 authored Oct 23, 2024
2 parents 6d270ac + f1f3d31 commit 69e5713
Showing 1 changed file with 47 additions and 51 deletions.
98 changes: 47 additions & 51 deletions runtime/src/comm/ofi/comm-ofi.c
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ static void checkTxCmplsCntr(struct perTxCtxInfo_t*);
static size_t readCQ(struct fid_cq*, void*, size_t);
static void reportCQError(struct fid_cq*);
static void waitForTxnComplete(struct perTxCtxInfo_t*, void* ctx);
static void waitForAllTxnsComplete(struct perTxCtxInfo_t*);
static void forceMemFxVisOneNode(c_nodeid_t, chpl_bool, chpl_bool,
struct perTxCtxInfo_t*);
static void forceMemFxVisAllNodes(chpl_bool, chpl_bool, c_nodeid_t,
Expand Down Expand Up @@ -1109,6 +1110,10 @@ void chpl_comm_init(int *argc_p, char ***argv_p) {
// TODO: default to false to workaround non-blocking ofi issue
// these should be changed back to true when that is fixed
envInjectRMA = chpl_env_rt_get_bool("COMM_OFI_INJECT_RMA", false);
if ((envInjectRMA == true) && (chpl_nodeID == 0)) {
chpl_warning("CHPL_RT_COMM_OFI_INJECT_RMA is obsolete, ignoring.", 0,
0);
}
envInjectAMO = chpl_env_rt_get_bool("COMM_OFI_INJECT_AMO", false);
envInjectAM = chpl_env_rt_get_bool("COMM_OFI_INJECT_AM", false);

Expand Down Expand Up @@ -5022,6 +5027,14 @@ void amHandler(void* argNil) {
}
}

//
// If there are outstanding transmits we must ensure they complete before we
// exit. They would be transmitted in tciFree, but that is pretty late in
// the shutdown process and might cause a hang, it's better to transmit
// them now.
//
waitForAllTxnsComplete(tcip);

//
// Un-count this AM handler thread. Whoever told us to exit wants to
// be released once all the AM handler threads are done, so if we're
Expand Down Expand Up @@ -5383,6 +5396,14 @@ void amHandleAMO(struct amRequest_AMO_t* amo) {
DBG_PRINTF(DBG_AM | DBG_AM_RECV, "%s", am_reqDoneStr((amRequest_t*) amo));
}

//
// amPutDone
//
// Sets the "done" flag via a non-blocking PUT. We are never going to wait for
// this PUT to complete because we don't care when it completes; as a result
// the ofi_put_nb code path isn't used because we don't need the overhad of a
// non-blocking handle, etc. The PUT will be forced to complete either when a
// non-bound tci is freed, or during shutdown, whichever comes first.

static inline
void amPutDone(c_nodeid_t node, amDone_t* pAmDone) {
Expand Down Expand Up @@ -5418,14 +5439,12 @@ void amPutDone(c_nodeid_t node, amDone_t* pAmDone) {
uint64_t mrRaddr = 0;
uint64_t flags = 0;
chpl_atomic_bool txnDone;
void *ctx = TX_CTX_INIT(tcip, true /*blocking*/, &txnDone);
void *ctx = TX_CTX_INIT(tcip, false /*blocking*/, &txnDone);


CHK_TRUE(mrGetKey(&mrKey, &mrRaddr, node, pAmDone, sizeof(*pAmDone)));
ofi_put_lowLevel(amDone, mrDesc, node, mrRaddr, mrKey, sizeof(*pAmDone),
ctx, flags, tcip);
waitForTxnComplete(tcip, ctx);
txCtxCleanup(ctx);
if (amTcip == NULL) {
tciFree(tcip);
}
Expand Down Expand Up @@ -6069,6 +6088,7 @@ void tciFree(struct perTxCtxInfo_t* tcip) {
//
if (!tcip->bound) {
DBG_PRINTF(DBG_TCIPS, "free tciTab[%td] %p", tcip - tciTab, tcip);
waitForAllTxnsComplete(tcip);
forceMemFxVisAllNodes(true, true, -1, tcip);
atomic_store_bool(&tcip->allocated, false);
}
Expand Down Expand Up @@ -6265,17 +6285,6 @@ void rmaPutFn_msgOrdFence(nb_handle_t handle, void* myAddr, void* mrDesc,
struct perTxCtxInfo_t* tcip) {
uint64_t flags = 0;

if (tcip->bound
&& size <= ofi_info->tx_attr->inject_size
&& envInjectRMA) {
//
// Special case: write injection has the least latency. We can use it if
// this PUT doesn't exceed the injection size
// limit, and we have a bound tx context so we can delay forcing the
// memory visibility until later.
//
flags = FI_INJECT;
}
if (bitmapTest(tcip->amoVisBitmap, node)) {
//
// Special case: If our last operation was an AMO then we need to do a
Expand All @@ -6289,8 +6298,8 @@ void rmaPutFn_msgOrdFence(nb_handle_t handle, void* myAddr, void* mrDesc,
flags |= FI_FENCE;
}
void *ctx = txCtxInit(tcip, __LINE__, &handle->complete);
(void) wrap_fi_writemsg(myAddr, mrDesc, node, mrRaddr, mrKey, size,
ctx, flags, tcip);
ofi_put_lowLevel(myAddr, mrDesc, node, mrRaddr, mrKey, size, ctx, flags,
tcip);
//
// When using message ordering we have to do something after the PUT
// to force it into visibility, and on the same tx context as the PUT
Expand All @@ -6311,21 +6320,8 @@ void rmaPutFn_msgOrd(nb_handle_t handle, void* myAddr, void* mrDesc,
size_t size,
struct perTxCtxInfo_t* tcip) {

uint64_t flags = 0;

if (tcip->bound
&& size <= ofi_info->tx_attr->inject_size
&& envInjectRMA) {
//
// Special case: write injection has the least latency. We can use
// that if this PUT's size doesn't exceed the injection size limit
// and we have a bound tx context so we can delay forcing the
// memory visibility until later.
flags = FI_INJECT;
}
void *ctx = txCtxInit(tcip, __LINE__, &handle->complete);
(void) wrap_fi_writemsg(myAddr, mrDesc, node, mrRaddr, mrKey, size,
ctx, flags, tcip);
ofi_put_lowLevel(myAddr, mrDesc, node, mrRaddr, mrKey, size, ctx, 0, tcip);
bitmapSet(tcip->putVisBitmap, node);
}

Expand All @@ -6340,8 +6336,7 @@ void rmaPutFn_dlvrCmplt(nb_handle_t handle, void* myAddr, void* mrDesc,
size_t size,
struct perTxCtxInfo_t* tcip) {
void *ctx = txCtxInit(tcip, __LINE__, &handle->complete);
(void) wrap_fi_write(myAddr, mrDesc, node, mrRaddr, mrKey,
size, ctx, tcip);
ofi_put_lowLevel(myAddr, mrDesc, node, mrRaddr, mrKey, size, ctx, 0, tcip);
}


Expand All @@ -6354,15 +6349,15 @@ ssize_t wrap_fi_write(const void* addr, void* mrDesc,
DBG_PRINTF(DBG_RMA | DBG_RMA_WRITE,
"tx write: %d:%#" PRIx64 " <= %p, size %zd, ctx %p",
(int) node, mrRaddr, addr, size, ctx);
OFI_RIDE_OUT_EAGAIN(tcip,
fi_write(tcip->txCtx, addr, size,
mrDesc, rxAddr(tcip, node),
mrRaddr, mrKey, ctx));
OFI_RIDE_OUT_EAGAIN(tcip, fi_write(tcip->txCtx, addr, size, mrDesc,
rxAddr(tcip, node), mrRaddr, mrKey,
ctx));
tcip->numTxnsOut++;
tcip->numTxnsSent++;
return FI_SUCCESS;
}


static inline
ssize_t wrap_fi_writemsg(const void* addr, void* mrDesc,
c_nodeid_t node,
Expand All @@ -6385,9 +6380,9 @@ ssize_t wrap_fi_writemsg(const void* addr, void* mrDesc,
.rma_iov_count = 1,
.context = ctx };

if ((flags & FI_INJECT) && (size > ofi_info->tx_attr->inject_size)) {
flags &= ~FI_INJECT;
}
// injection has a size limit
assert(((flags & FI_INJECT) == 0) ||
(size <= ofi_info->tx_attr->inject_size));
DBG_PRINTF(DBG_RMA | DBG_RMA_WRITE,
"tx write msg: %d:%#" PRIx64 " <= %p, size %zd, ctx %p, "
"flags %#" PRIx64 " tcip %p",
Expand All @@ -6404,11 +6399,6 @@ void ofi_put_lowLevel(const void* addr, void* mrDesc, c_nodeid_t node,
uint64_t mrRaddr, uint64_t mrKey, size_t size,
void* ctx, uint64_t flags,
struct perTxCtxInfo_t* tcip) {

// Can't inject a buffer that is too large
if ((flags & FI_INJECT) && (size > ofi_info->tx_attr->inject_size)) {
flags &= ~FI_INJECT;
}
if (flags == 0) {
(void) wrap_fi_write(addr, mrDesc, node, mrRaddr, mrKey, size, ctx, tcip);
} else {
Expand Down Expand Up @@ -7177,8 +7167,6 @@ chpl_comm_nb_handle_t amoFn_dlvrCmplt(struct amoBundle_t *ab,
}




static inline
ssize_t wrap_fi_atomicmsg(struct amoBundle_t* ab, uint64_t flags,
struct perTxCtxInfo_t* tcip) {
Expand Down Expand Up @@ -7475,6 +7463,16 @@ void reportCQError(struct fid_cq* cq) {
}


// wait for all outstanding transmissions to complete
static inline
void waitForAllTxnsComplete(struct perTxCtxInfo_t* tcip) {
while (tcip->numTxnsOut > 0) {
sched_yield();
(*tcip->ensureProgressFn)(tcip);
}
}


static inline
void waitForTxnComplete(struct perTxCtxInfo_t* tcip, void* ctx) {
(*tcip->ensureProgressFn)(tcip);
Expand All @@ -7487,11 +7485,9 @@ void waitForTxnComplete(struct perTxCtxInfo_t* tcip, void* ctx) {
(*tcip->ensureProgressFn)(tcip);
}
} else {
// wait for all outstanding transmissions to complete
while (tcip->numTxnsOut > 0) {
sched_yield();
(*tcip->ensureProgressFn)(tcip);
}
// we don't a way to wait for the individual transmission, so wait
// for all outstanding transmissions to complete
waitForAllTxnsComplete(tcip);
}
}

Expand Down

0 comments on commit 69e5713

Please sign in to comment.