Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-33166 Roxie BG queue allow for changing thread priority #19379

Merged
merged 1 commit into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions roxie/ccd/ccd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,8 @@ extern bool blockedLocalAgent;
extern bool acknowledgeAllRequests;
extern unsigned packetAcknowledgeTimeout;
extern cycle_t dynPriorityAdjustCycles;
extern bool traceThreadStartDelay;
extern int adjustBGThreadNiceValue;
extern bool alwaysTrustFormatCrcs;
extern bool allFilesDynamic;
extern bool lockSuperFiles;
Expand Down
8 changes: 8 additions & 0 deletions roxie/ccd/ccdmain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ bool blockedLocalAgent = true;
bool acknowledgeAllRequests = true;
unsigned packetAcknowledgeTimeout = 100;
cycle_t dynPriorityAdjustCycles = 0; // default off (0)
bool traceThreadStartDelay = true;
int adjustBGThreadNiceValue = 5;
unsigned ccdMulticastPort;
bool enableHeartBeat = true;
unsigned parallelLoopFlowLimit = 100;
Expand Down Expand Up @@ -1010,6 +1012,12 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
unsigned dynAdjustMsec = topology->getPropInt("@dynPriorityAdjustTime", 0);
if (dynAdjustMsec)
dynPriorityAdjustCycles = dynAdjustMsec * (queryOneSecCycles() / 1000ULL);
traceThreadStartDelay = topology->getPropBool("@traceThreadStartDelay", traceThreadStartDelay);
adjustBGThreadNiceValue = topology->getPropInt("@adjustBGThreadNiceValue", adjustBGThreadNiceValue);
if (adjustBGThreadNiceValue < 0)
adjustBGThreadNiceValue = 0;
if (adjustBGThreadNiceValue > 19)
adjustBGThreadNiceValue = 19;
ccdMulticastPort = topology->getPropInt("@multicastPort", CCD_MULTICAST_PORT);
statsExpiryTime = topology->getPropInt("@statsExpiryTime", 3600);
roxiemem::setMemTraceSizeLimit((memsize_t) topology->getPropInt64("@memTraceSizeLimit", 0));
Expand Down
11 changes: 9 additions & 2 deletions roxie/ccd/ccdqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1188,6 +1188,13 @@ class RoxieQueue : public CInterface, implements IThreadFactory
if (qname && *qname)
tname.appendf(" (%s)", qname);
workers.setown(createThreadPool(tname.str(), this, false, nullptr, numWorkers));
if (traceThreadStartDelay)
workers->setStartDelayTracing(60);
if (qname && *qname)
{
if (streq(qname, "BG"))
workers->setNiceValue(adjustBGThreadNiceValue);
}
started = 0;
idle = 0;
if (IBYTIbufferSize)
Expand Down Expand Up @@ -1893,7 +1900,7 @@ class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface
public:
IMPLEMENT_IINTERFACE;

RoxieReceiverBase(unsigned _numWorkers) : slaQueue(_numWorkers, "SLA"), hiQueue(_numWorkers, "HIGH"), loQueue(_numWorkers, "LOW"), bgQueue(_numWorkers/2 + 1, "BG"), numWorkers(_numWorkers)
RoxieReceiverBase(unsigned _numWorkers) : slaQueue(_numWorkers, "SLA"), hiQueue(_numWorkers, "HIGH"), loQueue(_numWorkers, "LOW"), bgQueue(_numWorkers, "BG"), numWorkers(_numWorkers)
{
}

Expand All @@ -1902,7 +1909,7 @@ class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface
loQueue.start();
hiQueue.start();
slaQueue.start();
bgQueue.start(); // consider nice(+3) BG threads
bgQueue.start(); // NB BG thread priority can be adjusted
}

virtual void stop()
Expand Down
1 change: 1 addition & 0 deletions roxie/ccd/ccdserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4611,6 +4611,7 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie
ctx->queryOptions().dynPriority = QUERY_BG_PRIORITY_VALUE;
unsigned dynAdjustMsec = (dynPriorityAdjustCycles * 1000ULL) / queryOneSecCycles();
UWARNLOG("WARNING: %d msec dynamic adjustment threshold reached, shifting query to BG queue", dynAdjustMsec);
p->queryHeader().activityId &= ~ROXIE_PRIORITY_MASK;
p->queryHeader().activityId |= ROXIE_BG_PRIORITY;
// TODO: what to do about still running activities' continuation/ack priorities ?
}
Expand Down
7 changes: 7 additions & 0 deletions system/jlib/jthread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -995,6 +995,7 @@ class CThreadPool: public CThreadPoolBase, implements IThreadPool, public CInter
unsigned stacksize;
unsigned timeoutOnRelease;
unsigned traceStartDelayPeriod = 0;
int niceValue = 0;
unsigned startsInPeriod = 0;
cycle_t startDelayInPeriod = 0;
CCycleTimer overAllTimer;
Expand Down Expand Up @@ -1114,6 +1115,8 @@ class CThreadPool: public CThreadPoolBase, implements IThreadPool, public CInter
CPooledThreadWrapper &ret = *new CPooledThreadWrapper(*this,newid,factory->createNew());
if (stacksize)
ret.setStackSize(stacksize);
if (niceValue)
ret.setNice(niceValue);
ret.start(false);
threadwrappers.append(ret);
return ret;
Expand Down Expand Up @@ -1281,6 +1284,10 @@ class CThreadPool: public CThreadPoolBase, implements IThreadPool, public CInter
{
traceStartDelayPeriod = secs;
}
void setNiceValue(int value)
{
niceValue = value;
}
bool waitAvailable(unsigned timeout)
{
if (!defaultmax)
Expand Down
1 change: 1 addition & 0 deletions system/jlib/jthread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ interface IThreadPool : extends IInterface
virtual unsigned runningCount()=0; // number of currently running threads
virtual PooledThreadHandle startNoBlock(void *param)=0; // starts a new thread if it can do so without blocking, else throws exception
virtual void setStartDelayTracing(unsigned secs) = 0; // set start delay tracing period
virtual void setNiceValue(int value) = 0; // set priority for thread
virtual bool waitAvailable(unsigned timeout) = 0; // wait until a pool member is available
};

Expand Down
Loading