Skip to content

Commit

Permalink
HPCC-32483 Capture global sort/join blocked time
Browse files Browse the repository at this point in the history
Capture blocked time for global sort and joins.

Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Oct 16, 2024
1 parent 9d3734f commit 77028e9
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 25 deletions.
24 changes: 18 additions & 6 deletions thorlcr/activities/join/thjoinslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,10 @@ class JoinSlaveActivity : public CSlaveActivity, implements ILookAheadStopNotify
stopOtherInput();
throw;
}
asyncSecondaryStart.wait();
{
BlockedActivityTimer t(slaveTimerStats, timeActivities);
asyncSecondaryStart.wait();
}
if (secondaryStartException)
{
IException *e=secondaryStartException.getClear();
Expand Down Expand Up @@ -387,7 +390,10 @@ class JoinSlaveActivity : public CSlaveActivity, implements ILookAheadStopNotify
{
unsigned bn=noSortPartitionSide()?2:4;
ActPrintLog("JOIN waiting barrier.%d",bn);
barrier->wait(false);
{
BlockedActivityTimer t(slaveTimerStats, timeActivities);
barrier->wait(false);
}
ActPrintLog("JOIN barrier.%d raised",bn);
sorter->stopMerge();
}
Expand Down Expand Up @@ -564,8 +570,11 @@ class JoinSlaveActivity : public CSlaveActivity, implements ILookAheadStopNotify
return false;
}
ActPrintLog("JOIN waiting barrier.1");
if (!barrier->wait(false))
return false;
{
BlockedActivityTimer t(slaveTimerStats, timeActivities);
if (!barrier->wait(false))
return false;
}
ActPrintLog("JOIN barrier.1 raised");

// primaryWriter will keep as much in memory as possible.
Expand All @@ -575,8 +584,11 @@ class JoinSlaveActivity : public CSlaveActivity, implements ILookAheadStopNotify
primaryStream.setown(primaryWriter->getReader()); // NB: rhsWriter no longer needed after this point

ActPrintLog("JOIN waiting barrier.2");
if (!barrier->wait(false))
return false;
{
BlockedActivityTimer t(slaveTimerStats, timeActivities);
if (!barrier->wait(false))
return false;
}
ActPrintLog("JOIN barrier.2 raised");
sorter->stopMerge();
if (0 == sorter->getGlobalCount())
Expand Down
11 changes: 8 additions & 3 deletions thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
Semaphore sem;
CriticalSection crit;
bool enabled = true;
CSlaveActivity &activity;

void unblock()
{
Expand All @@ -848,10 +849,10 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
}
}
public:
CLimiter()
CLimiter(CSlaveActivity &_activity) : activity(_activity)
{
}
CLimiter(unsigned _max, unsigned _leewayPercent=0)
CLimiter(CSlaveActivity &_activity, unsigned _max, unsigned _leewayPercent=0): activity(_activity)
{
_set(_max, _leewayPercent);
}
Expand Down Expand Up @@ -894,7 +895,10 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
void inc()
{
while (incNonBlocking())
{
BlockedActivityTimer t(activity.getTotalCyclesRef(), activity.queryTimeActivities());
sem.wait();
}
}
void dec()
{
Expand All @@ -907,6 +911,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
}
void block()
{
BlockedActivityTimer t(activity.getTotalCyclesRef(), activity.queryTimeActivities());
sem.wait();
}
void disable()
Expand Down Expand Up @@ -2965,7 +2970,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
public:
IMPLEMENT_IINTERFACE_USING(PARENT);

CKeyedJoinSlave(CGraphElementBase *_container) : PARENT(_container, keyedJoinActivityStatistics), readAheadThread(*this)
CKeyedJoinSlave(CGraphElementBase *_container) : PARENT(_container, keyedJoinActivityStatistics), readAheadThread(*this), lookupThreadLimiter(*this), fetchThreadLimiter(*this), pendingKeyLookupLimiter(*this), doneListLimiter(*this)
{
helper = static_cast <IHThorKeyedJoinArg *> (queryHelper());
reInit = 0 != (helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) || (helper->getJoinFlags() & JFvarindexfilename);
Expand Down
1 change: 1 addition & 0 deletions thorlcr/activities/lookupjoin/thlookupjoinslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,7 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper<HELPER>,
}
inline void interChannelBarrier()
{
BlockedActivityTimer t(slaveTimerStats, timeActivities);
if (queryJob().queryJobChannels()>1)
{
if (channels[0]->incNotifyCountAndCheck())
Expand Down
9 changes: 6 additions & 3 deletions thorlcr/activities/msort/thmsortslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,12 @@ class MSortSlaveActivity : public CSlaveActivity
throw;
}
ActPrintLog("SORT waiting barrier.1");
if (!barrier->wait(false)) {
Sleep(1000); // let original error through
throw MakeThorException(TE_BarrierAborted,"SORT: Barrier Aborted");
{
BlockedActivityTimer t(slaveTimerStats, timeActivities);
if (!barrier->wait(false)) {
Sleep(1000); // let original error through
throw MakeThorException(TE_BarrierAborted,"SORT: Barrier Aborted");
}
}
ActPrintLog("SORT barrier.1 raised");
output.setown(sorter->startMerge(totalrows));
Expand Down
27 changes: 16 additions & 11 deletions thorlcr/msort/tsorts.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "thbuf.hpp"
#include "thbufdef.hpp"
#include "thgraph.hpp"
#include "thgraphslave.hpp"

#ifdef _DEBUG
//#define TRACE_UNIQUE
Expand All @@ -59,8 +60,6 @@ inline void traceWait(const char *name, T &sem,unsigned interval=60*1000)
#define MINCOMPRESSEDROWSIZE 16
#define MAXCOMPRESSEDROWSIZE 0x2000

#define MPBLOCKTIMEOUT (1000*60*15)


class CWriteIntercept : public CSimpleInterface
{
Expand Down Expand Up @@ -607,7 +606,7 @@ class CMiniSort
class CThorSorter : public CSimpleInterface, implements IThorSorter, implements ISortSlaveBase, implements ISortSlaveMP,
private IThreaded
{
CActivityBase *activity;
CSlaveActivity *activity;
SocketEndpoint myendpoint;
Linked<ICommunicator> clusterComm;
mptag_t mpTagRPC;
Expand Down Expand Up @@ -816,7 +815,7 @@ class CThorSorter : public CSimpleInterface, implements IThorSorter, implements
public:
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);

CThorSorter(CActivityBase *_activity, SocketEndpoint &ep, ICommunicator *_clusterComm, mptag_t _mpTagRPC)
CThorSorter(CSlaveActivity *_activity, SocketEndpoint &ep, ICommunicator *_clusterComm, mptag_t _mpTagRPC)
: activity(_activity), myendpoint(ep), clusterComm(_clusterComm), mpTagRPC(_mpTagRPC),
rowArray(*_activity, _activity), threaded("CThorSorter", this), spillStats(spillStatistics)
{
Expand Down Expand Up @@ -1269,11 +1268,14 @@ class CThorSorter : public CSimpleInterface, implements IThorSorter, implements
{
ActPrintLog(activity, "Gather in");
globalCount = 0;
for (;;) {
if (abort)
return;
if (startgathersem.wait(10000))
break;
{
BlockedActivityTimer t(activity->getTotalCyclesRef(), activity->queryTimeActivities());
for (;;) {
if (abort)
return;
if (startgathersem.wait(10000))
break;
}
}
ActPrintLog(activity, "SORT: Gather");
assertex(!rowif);
Expand Down Expand Up @@ -1375,7 +1377,10 @@ class CThorSorter : public CSimpleInterface, implements IThorSorter, implements
virtual IRowStream * startMerge(rowcount_t &_totalrows)
{
ActPrintLog(activity, "SORT Merge Waiting");
traceWait("startmergesem",startmergesem);
{
BlockedActivityTimer t(activity->getTotalCyclesRef(), activity->queryTimeActivities());
traceWait("startmergesem",startmergesem);
}
ActPrintLog(activity, "SORT Merge Start");
_totalrows = totalrows;
return merger.getLink();
Expand Down Expand Up @@ -1407,7 +1412,7 @@ class CThorSorter : public CSimpleInterface, implements IThorSorter, implements

//==============================================================================

THORSORT_API IThorSorter *CreateThorSorter(CActivityBase *activity, SocketEndpoint &ep, ICommunicator *clusterComm, mptag_t _mpTagRPC)
THORSORT_API IThorSorter *CreateThorSorter(CSlaveActivity *activity, SocketEndpoint &ep, ICommunicator *clusterComm, mptag_t _mpTagRPC)
{
return new CThorSorter(activity, ep, clusterComm, _mpTagRPC);
}
Expand Down
4 changes: 2 additions & 2 deletions thorlcr/msort/tsorts.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ interface ISocketRowWriter: extends IRowWriter
virtual void stop()=0;
};

class CActivityBase;
THORSORT_API IThorSorter *CreateThorSorter(CActivityBase *activity, SocketEndpoint &ep, ICommunicator *clusterComm, mptag_t _mpTagRPC);
class CSlaveActivity;
THORSORT_API IThorSorter *CreateThorSorter(CSlaveActivity *activity, SocketEndpoint &ep, ICommunicator *clusterComm, mptag_t _mpTagRPC);
IRowStream *ConnectMergeRead(unsigned id,IThorRowInterfaces *rowif,SocketEndpoint &nodeaddr,rowcount_t startrec,rowcount_t numrecs, ISocket *socket);
ISocketRowWriter *ConnectMergeWrite(IThorRowInterfaces *rowif,ISocket *socket,size32_t bufsize,rowcount_t &startrec,rowcount_t &numrecs);
#define SOCKETSERVERINC 1
Expand Down

0 comments on commit 77028e9

Please sign in to comment.