From f1f3d31c15c6e27609f9822d536fe33cf68331b8 Mon Sep 17 00:00:00 2001 From: "John H. Hartman" Date: Tue, 22 Oct 2024 07:16:11 -0700 Subject: [PATCH] Make amPutDone non-blocking Modify amPutDone so that the PUT of the "done" indicator is non-blocking. This is invoked by an active message handler and notifies the sender that the active message has completed. There is no need for the active message handler to wait for the PUT to complete because, by definition, the active message has completed so there is nothing to be performed after the PUT. One complication is that the transmit context must be progressed; normally it would be progressed while waiting for the PUT to complete. Instead, the transmit context is progressed either when it is freed, or during shutdown, whichever comes first. Signed-off-by: John H. Hartman --- runtime/src/comm/ofi/comm-ofi.c | 98 ++++++++++++++++----------------- 1 file changed, 47 insertions(+), 51 deletions(-) diff --git a/runtime/src/comm/ofi/comm-ofi.c b/runtime/src/comm/ofi/comm-ofi.c index 8f7d79a1a88b..4390c462b7cb 100644 --- a/runtime/src/comm/ofi/comm-ofi.c +++ b/runtime/src/comm/ofi/comm-ofi.c @@ -419,6 +419,7 @@ static void checkTxCmplsCntr(struct perTxCtxInfo_t*); static size_t readCQ(struct fid_cq*, void*, size_t); static void reportCQError(struct fid_cq*); static void waitForTxnComplete(struct perTxCtxInfo_t*, void* ctx); +static void waitForAllTxnsComplete(struct perTxCtxInfo_t*); static void forceMemFxVisOneNode(c_nodeid_t, chpl_bool, chpl_bool, struct perTxCtxInfo_t*); static void forceMemFxVisAllNodes(chpl_bool, chpl_bool, c_nodeid_t, @@ -1109,6 +1110,10 @@ void chpl_comm_init(int *argc_p, char ***argv_p) { // TODO: default to false to workaround non-blocking ofi issue // these should be changed back to true when that is fixed envInjectRMA = chpl_env_rt_get_bool("COMM_OFI_INJECT_RMA", false); + if ((envInjectRMA == true) && (chpl_nodeID == 0)) { + chpl_warning("CHPL_RT_COMM_OFI_INJECT_RMA is obsolete, ignoring.", 0, + 0); + } envInjectAMO = chpl_env_rt_get_bool("COMM_OFI_INJECT_AMO", false); envInjectAM = chpl_env_rt_get_bool("COMM_OFI_INJECT_AM", false); @@ -5022,6 +5027,14 @@ void amHandler(void* argNil) { } } + // + // If there are outstanding transmits we must ensure they complete before we + // exit. They would be transmitted in tciFree, but that is pretty late in + // the shutdown process and might cause a hang, it's better to transmit + // them now. + // + waitForAllTxnsComplete(tcip); + // // Un-count this AM handler thread. Whoever told us to exit wants to // be released once all the AM handler threads are done, so if we're @@ -5383,6 +5396,14 @@ void amHandleAMO(struct amRequest_AMO_t* amo) { DBG_PRINTF(DBG_AM | DBG_AM_RECV, "%s", am_reqDoneStr((amRequest_t*) amo)); } +// +// amPutDone +// +// Sets the "done" flag via a non-blocking PUT. We are never going to wait for +// this PUT to complete because we don't care when it completes; as a result +// the ofi_put_nb code path isn't used because we don't need the overhad of a +// non-blocking handle, etc. The PUT will be forced to complete either when a +// non-bound tci is freed, or during shutdown, whichever comes first. static inline void amPutDone(c_nodeid_t node, amDone_t* pAmDone) { @@ -5418,14 +5439,12 @@ void amPutDone(c_nodeid_t node, amDone_t* pAmDone) { uint64_t mrRaddr = 0; uint64_t flags = 0; chpl_atomic_bool txnDone; - void *ctx = TX_CTX_INIT(tcip, true /*blocking*/, &txnDone); + void *ctx = TX_CTX_INIT(tcip, false /*blocking*/, &txnDone); CHK_TRUE(mrGetKey(&mrKey, &mrRaddr, node, pAmDone, sizeof(*pAmDone))); ofi_put_lowLevel(amDone, mrDesc, node, mrRaddr, mrKey, sizeof(*pAmDone), ctx, flags, tcip); - waitForTxnComplete(tcip, ctx); - txCtxCleanup(ctx); if (amTcip == NULL) { tciFree(tcip); } @@ -6069,6 +6088,7 @@ void tciFree(struct perTxCtxInfo_t* tcip) { // if (!tcip->bound) { DBG_PRINTF(DBG_TCIPS, "free tciTab[%td] %p", tcip - tciTab, tcip); + waitForAllTxnsComplete(tcip); forceMemFxVisAllNodes(true, true, -1, tcip); atomic_store_bool(&tcip->allocated, false); } @@ -6265,17 +6285,6 @@ void rmaPutFn_msgOrdFence(nb_handle_t handle, void* myAddr, void* mrDesc, struct perTxCtxInfo_t* tcip) { uint64_t flags = 0; - if (tcip->bound - && size <= ofi_info->tx_attr->inject_size - && envInjectRMA) { - // - // Special case: write injection has the least latency. We can use it if - // this PUT doesn't exceed the injection size - // limit, and we have a bound tx context so we can delay forcing the - // memory visibility until later. - // - flags = FI_INJECT; - } if (bitmapTest(tcip->amoVisBitmap, node)) { // // Special case: If our last operation was an AMO then we need to do a @@ -6289,8 +6298,8 @@ void rmaPutFn_msgOrdFence(nb_handle_t handle, void* myAddr, void* mrDesc, flags |= FI_FENCE; } void *ctx = txCtxInit(tcip, __LINE__, &handle->complete); - (void) wrap_fi_writemsg(myAddr, mrDesc, node, mrRaddr, mrKey, size, - ctx, flags, tcip); + ofi_put_lowLevel(myAddr, mrDesc, node, mrRaddr, mrKey, size, ctx, flags, + tcip); // // When using message ordering we have to do something after the PUT // to force it into visibility, and on the same tx context as the PUT @@ -6311,21 +6320,8 @@ void rmaPutFn_msgOrd(nb_handle_t handle, void* myAddr, void* mrDesc, size_t size, struct perTxCtxInfo_t* tcip) { - uint64_t flags = 0; - - if (tcip->bound - && size <= ofi_info->tx_attr->inject_size - && envInjectRMA) { - // - // Special case: write injection has the least latency. We can use - // that if this PUT's size doesn't exceed the injection size limit - // and we have a bound tx context so we can delay forcing the - // memory visibility until later. - flags = FI_INJECT; - } void *ctx = txCtxInit(tcip, __LINE__, &handle->complete); - (void) wrap_fi_writemsg(myAddr, mrDesc, node, mrRaddr, mrKey, size, - ctx, flags, tcip); + ofi_put_lowLevel(myAddr, mrDesc, node, mrRaddr, mrKey, size, ctx, 0, tcip); bitmapSet(tcip->putVisBitmap, node); } @@ -6340,8 +6336,7 @@ void rmaPutFn_dlvrCmplt(nb_handle_t handle, void* myAddr, void* mrDesc, size_t size, struct perTxCtxInfo_t* tcip) { void *ctx = txCtxInit(tcip, __LINE__, &handle->complete); - (void) wrap_fi_write(myAddr, mrDesc, node, mrRaddr, mrKey, - size, ctx, tcip); + ofi_put_lowLevel(myAddr, mrDesc, node, mrRaddr, mrKey, size, ctx, 0, tcip); } @@ -6354,15 +6349,15 @@ ssize_t wrap_fi_write(const void* addr, void* mrDesc, DBG_PRINTF(DBG_RMA | DBG_RMA_WRITE, "tx write: %d:%#" PRIx64 " <= %p, size %zd, ctx %p", (int) node, mrRaddr, addr, size, ctx); - OFI_RIDE_OUT_EAGAIN(tcip, - fi_write(tcip->txCtx, addr, size, - mrDesc, rxAddr(tcip, node), - mrRaddr, mrKey, ctx)); + OFI_RIDE_OUT_EAGAIN(tcip, fi_write(tcip->txCtx, addr, size, mrDesc, + rxAddr(tcip, node), mrRaddr, mrKey, + ctx)); tcip->numTxnsOut++; tcip->numTxnsSent++; return FI_SUCCESS; } + static inline ssize_t wrap_fi_writemsg(const void* addr, void* mrDesc, c_nodeid_t node, @@ -6385,9 +6380,9 @@ ssize_t wrap_fi_writemsg(const void* addr, void* mrDesc, .rma_iov_count = 1, .context = ctx }; - if ((flags & FI_INJECT) && (size > ofi_info->tx_attr->inject_size)) { - flags &= ~FI_INJECT; - } + // injection has a size limit + assert(((flags & FI_INJECT) == 0) || + (size <= ofi_info->tx_attr->inject_size)); DBG_PRINTF(DBG_RMA | DBG_RMA_WRITE, "tx write msg: %d:%#" PRIx64 " <= %p, size %zd, ctx %p, " "flags %#" PRIx64 " tcip %p", @@ -6404,11 +6399,6 @@ void ofi_put_lowLevel(const void* addr, void* mrDesc, c_nodeid_t node, uint64_t mrRaddr, uint64_t mrKey, size_t size, void* ctx, uint64_t flags, struct perTxCtxInfo_t* tcip) { - - // Can't inject a buffer that is too large - if ((flags & FI_INJECT) && (size > ofi_info->tx_attr->inject_size)) { - flags &= ~FI_INJECT; - } if (flags == 0) { (void) wrap_fi_write(addr, mrDesc, node, mrRaddr, mrKey, size, ctx, tcip); } else { @@ -7177,8 +7167,6 @@ chpl_comm_nb_handle_t amoFn_dlvrCmplt(struct amoBundle_t *ab, } - - static inline ssize_t wrap_fi_atomicmsg(struct amoBundle_t* ab, uint64_t flags, struct perTxCtxInfo_t* tcip) { @@ -7475,6 +7463,16 @@ void reportCQError(struct fid_cq* cq) { } +// wait for all outstanding transmissions to complete +static inline +void waitForAllTxnsComplete(struct perTxCtxInfo_t* tcip) { + while (tcip->numTxnsOut > 0) { + sched_yield(); + (*tcip->ensureProgressFn)(tcip); + } +} + + static inline void waitForTxnComplete(struct perTxCtxInfo_t* tcip, void* ctx) { (*tcip->ensureProgressFn)(tcip); @@ -7487,11 +7485,9 @@ void waitForTxnComplete(struct perTxCtxInfo_t* tcip, void* ctx) { (*tcip->ensureProgressFn)(tcip); } } else { - // wait for all outstanding transmissions to complete - while (tcip->numTxnsOut > 0) { - sched_yield(); - (*tcip->ensureProgressFn)(tcip); - } + // we don't a way to wait for the individual transmission, so wait + // for all outstanding transmissions to complete + waitForAllTxnsComplete(tcip); } }