Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/candidate-9.8.x' into candidate-…
Browse files Browse the repository at this point in the history
…9.10.x

Signed-off-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday committed Jan 15, 2025
2 parents 421e8fd + e83ab05 commit 3612b8a
Show file tree
Hide file tree
Showing 19 changed files with 364 additions and 42 deletions.
28 changes: 26 additions & 2 deletions ecl/hqlcpp/hqlckey.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,14 +307,38 @@ IHqlExpression * KeyedJoinInfo::querySimplifiedKey(IHqlExpression * expr)

IHqlExpression * queryBaseIndexForKeyedJoin(IHqlExpression * expr)
{
if (expr->getOperator() == no_if)
node_operator op = expr->getOperator();
if (op == no_if)
{
IHqlExpression * left = queryBaseIndexForKeyedJoin(expr->queryChild(1));
IHqlExpression * right = queryBaseIndexForKeyedJoin(expr->queryChild(2));
if (left && right)
return left;
{
//IF (cond, index) and IF(cond, null, index) should be allowed, and will return the index
if (left->getOperator() != no_null)
return left;
return right;
}
return nullptr;
}
else if (op == no_chooseds)
{
IHqlExpression * result = nullptr;
ForEachChildFrom(i, expr, 1)
{
IHqlExpression * match = queryBaseIndexForKeyedJoin(expr->queryChild(i));
if (!match)
return nullptr;
if (!result || result->getOperator() == no_null)
result = match;
}
return result;
}
else if (op == no_null)
return expr;
else if (op == no_split)
return queryBaseIndexForKeyedJoin(expr->queryChild(0));

return queryPhysicalRootTable(expr);
}

Expand Down
2 changes: 0 additions & 2 deletions esp/services/ws_workunits/ws_workunitsHelpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,6 @@ struct WUComponentLogOptions
{
if (relativeTimeBufferSecs > 0 )
wuLogSearchTimeBuffSecs = relativeTimeBufferSecs;
else
throw makeStringException(ECLWATCH_INVALID_INPUT, "ZapLogFilter: Invalid 'TimeRange' detected!");
}
}

Expand Down
8 changes: 8 additions & 0 deletions helm/examples/tracing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ All configuration options detailed here are part of the HPCC Systems Helm chart,
- alwaysCreateGlobalIds - If true, assign newly created global ID to any requests that do not supply one.
- optAlwaysCreateTraceIds - If true components generate trace/span ids if none are provided by the remote caller.
- enableDefaultLogExporter - If true, creates a trace exporter outputting to the log using the default options
- resourceAttributes: - Defines OTel specific resource attribute configuration values
which are appended to the runtime OTEL_RESOURCE_ATTRIBUTES. See OTel doc: https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/#general-sdk-configuration
- deploymentEnvironment - Defines deployment.environment, which is used to specify
the spans' deployment environment (aka deployment tier).
See OTel doc: https://opentelemetry.io/docs/specs/semconv/resource/deployment-environment/
- serviceNamespace - Defines service.namespace which helps to distinguish a group
of services.
See OTel doc: https://opentelemetry.io/docs/specs/semconv/resource/#semantic-attributes-with-dedicated-environment-variable
- exporters: - Defines a list of exporters in charge of forwarding span data to target back-end
- type - "OTLP-HTTP" | "OTLP-GRPC" | "OS" | "JLOG"
- "JLOG"
Expand Down
10 changes: 10 additions & 0 deletions helm/examples/tracing/baremetal-otlp-http-localhost-sample.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<Environment>
<Software>
<tracing disabled="false">
<resourceAttributes deploymentEnvironment="development" serviceNamespace="cmakeinstall"/>
<exporters consoleDebug="true" endpoint="http://localhost:4318/v1/traces" type="OTLP-HTTP">
<batch enabled="false" maxQueueSize="4095" scheduledDelayMillis="6001" maxExportBatchSize="511"/>
</exporters>
</tracing>
</Software>
</Environment>
13 changes: 13 additions & 0 deletions helm/examples/tracing/otlp-grpc-collector-desktop.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
global:
tracing:
resourceAttributes: # used to declare OTEL Resource Attribute config values
deploymentEnvironment: development # used to anotate tracing spans' environment identifier (development/production/statiging/etc)
exporters:
- type: OTLP-GRPC
endpoint: "192.168.68.111:4317/"
useSslCredentials: false
batch:
enabled: true
maxQueueSize: 4096
scheduledDelayMillis: 6000
maxExportBatchSize: 512
14 changes: 14 additions & 0 deletions helm/hpcc/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1169,6 +1169,20 @@
"items": {
"$ref": "#/definitions/traceExporter"
}
},
"resourceAttributes": {
"type": "object",
"properties": {
"deploymentEnvironment": {
"type": "string",
"description": "Name of the deployment environment (aka deployment tier) such as staging/development/production. "
},
"serviceNamespace": {
"type": "string",
"description": "Identifier used to help distinguish instances of same service"
}
},
"additionalProperties": { "type": ["integer", "string", "boolean"] }
}
},
"additionalProperties": { "type": ["integer", "string", "boolean"] }
Expand Down
2 changes: 2 additions & 0 deletions helm/hpcc/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ global:
tracing:
disabled: false
alwaysCreateTraceIds: true
resourceAttributes: # used to declare OTEL Resource Attribute config values
deploymentEnvironment: development # used to anotate tracing spans' environment identifier (development/production/statiging/etc)

## resource settings for stub components
#stubInstanceResources:
Expand Down
10 changes: 10 additions & 0 deletions roxie/ccd/ccd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ void setMulticastEndpoints(unsigned numChannels);
#define ROXIE_BG_PRIORITY 0xc0000000 // mask in activityId indicating it goes on the bg queue
#define ROXIE_PRIORITY_MASK (ROXIE_SLA_PRIORITY | ROXIE_HIGH_PRIORITY | ROXIE_LOW_PRIORITY)

#define QUERY_BG_PRIORITY_VALUE -1
#define QUERY_LOW_PRIORITY_VALUE 0
#define QUERY_HIGH_PRIORITY_VALUE 1
#define QUERY_SLA_PRIORITY_VALUE 2
static constexpr int queryMinPriorityValue = QUERY_BG_PRIORITY_VALUE;
static constexpr int queryMaxPriorityValue = QUERY_SLA_PRIORITY_VALUE;

#define ROXIE_ACTIVITY_FETCH 0x20000000 // or'ed into activityId for fetch part of full keyed join activities

// Status information returned in the activityId field of the header:
Expand Down Expand Up @@ -306,6 +313,7 @@ extern StringArray allQuerySetNames;
extern bool blockedLocalAgent;
extern bool acknowledgeAllRequests;
extern unsigned packetAcknowledgeTimeout;
extern cycle_t dynPriorityAdjustCycles;
extern bool alwaysTrustFormatCrcs;
extern bool allFilesDynamic;
extern bool lockSuperFiles;
Expand Down Expand Up @@ -486,6 +494,8 @@ inline unsigned getBondedChannel(unsigned partNo)
return ((partNo - 1) % numChannels) + 1;
}

extern unsigned getPriorityMask(int priority);

extern void FatalError(const char *format, ...) __attribute__((format(printf, 1, 2)));
extern unsigned getNextInstanceId();
extern void closedown();
Expand Down
5 changes: 5 additions & 0 deletions roxie/ccd/ccdcontext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1530,6 +1530,11 @@ class CRoxieContextBase : implements IRoxieAgentContext, implements ICodeContext
return options;
}

virtual cycle_t queryElapsedCycles() const
{
return elapsedTimer.elapsedCycles();
}

const char *queryAuthToken()
{
return authToken.str();
Expand Down
1 change: 1 addition & 0 deletions roxie/ccd/ccdcontext.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ interface IRoxieAgentContext : extends IRoxieContextLogger
virtual void noteChildGraph(unsigned id, IActivityGraph *childGraph) = 0;
virtual roxiemem::IRowManager &queryRowManager() = 0;
virtual const QueryOptions &queryOptions() const = 0;
virtual cycle_t queryElapsedCycles() const = 0;
virtual void addAgentsReplyLen(unsigned len, unsigned duplicates, unsigned resends) = 0;
virtual const char *queryAuthToken() = 0;
virtual const IResolvedFile *resolveLFN(const char *filename, bool isOpt, bool isPrivilegedUser) = 0;
Expand Down
26 changes: 13 additions & 13 deletions roxie/ccd/ccdlistener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1241,10 +1241,10 @@ class RoxieWorkUnitWorker : public RoxieQueryWorker
{
switch((int)priority)
{
case 0: loQueryStats.noteQuery(failed, elapsedTime); break;
case 1: hiQueryStats.noteQuery(failed, elapsedTime); break;
case 2: slaQueryStats.noteQuery(failed, elapsedTime); break;
case -1: bgQueryStats.noteQuery(failed, elapsedTime); break;
case QUERY_LOW_PRIORITY_VALUE: loQueryStats.noteQuery(failed, elapsedTime); break;
case QUERY_HIGH_PRIORITY_VALUE: hiQueryStats.noteQuery(failed, elapsedTime); break;
case QUERY_SLA_PRIORITY_VALUE: slaQueryStats.noteQuery(failed, elapsedTime); break;
case QUERY_BG_PRIORITY_VALUE: bgQueryStats.noteQuery(failed, elapsedTime); break;
}
combinedQueryStats.noteQuery(failed, elapsedTime);
}
Expand Down Expand Up @@ -1334,7 +1334,7 @@ class RoxieWorkUnitWorker : public RoxieQueryWorker
unsigned agentsReplyLen = 0;
unsigned agentsDuplicates = 0;
unsigned agentsResends = 0;
unsigned priority = (unsigned) -2;
unsigned priority = (unsigned) -2; // NB -2 is outside of priority range
try
{
bool isBlind = wu->getDebugValueBool("blindLogging", false);
Expand All @@ -1358,10 +1358,10 @@ class RoxieWorkUnitWorker : public RoxieQueryWorker
priority = queryFactory->queryOptions().priority;
switch ((int)priority)
{
case 0: loQueryStats.noteActive(); break;
case 1: hiQueryStats.noteActive(); break;
case 2: slaQueryStats.noteActive(); break;
case -1: bgQueryStats.noteActive(); break;
case QUERY_LOW_PRIORITY_VALUE: loQueryStats.noteActive(); break;
case QUERY_HIGH_PRIORITY_VALUE: hiQueryStats.noteActive(); break;
case QUERY_SLA_PRIORITY_VALUE: slaQueryStats.noteActive(); break;
case QUERY_BG_PRIORITY_VALUE: bgQueryStats.noteActive(); break;
}
combinedQueryStats.noteActive();
Owned<IRoxieServerContext> ctx = queryFactory->createContext(wu, logctx);
Expand Down Expand Up @@ -1528,10 +1528,10 @@ class RoxieProtocolMsgContext : implements IHpccProtocolMsgContext, public CInte
unsigned priority = getQueryPriority();
switch ((int)priority)
{
case 0: loQueryStats.noteActive(); break;
case 1: hiQueryStats.noteActive(); break;
case 2: slaQueryStats.noteActive(); break;
case -1: bgQueryStats.noteActive(); break;
case QUERY_LOW_PRIORITY_VALUE: loQueryStats.noteActive(); break;
case QUERY_HIGH_PRIORITY_VALUE: hiQueryStats.noteActive(); break;
case QUERY_SLA_PRIORITY_VALUE: slaQueryStats.noteActive(); break;
case QUERY_BG_PRIORITY_VALUE: bgQueryStats.noteActive(); break;
}
unknownQueryStats.noteComplete();
combinedQueryStats.noteActive();
Expand Down
4 changes: 4 additions & 0 deletions roxie/ccd/ccdmain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ unsigned numRequestArrayThreads = 5;
bool blockedLocalAgent = true;
bool acknowledgeAllRequests = true;
unsigned packetAcknowledgeTimeout = 100;
cycle_t dynPriorityAdjustCycles = 0; // default off (0)
unsigned ccdMulticastPort;
bool enableHeartBeat = true;
unsigned parallelLoopFlowLimit = 100;
Expand Down Expand Up @@ -1006,6 +1007,9 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
blockedLocalAgent = topology->getPropBool("@blockedLocalAgent", blockedLocalAgent);
acknowledgeAllRequests = topology->getPropBool("@acknowledgeAllRequests", acknowledgeAllRequests);
packetAcknowledgeTimeout = topology->getPropInt("@packetAcknowledgeTimeout", packetAcknowledgeTimeout);
unsigned dynAdjustMsec = topology->getPropInt("@dynPriorityAdjustTime", 0);
if (dynAdjustMsec)
dynPriorityAdjustCycles = dynAdjustMsec * (queryOneSecCycles() / 1000ULL);
ccdMulticastPort = topology->getPropInt("@multicastPort", CCD_MULTICAST_PORT);
statsExpiryTime = topology->getPropInt("@statsExpiryTime", 3600);
roxiemem::setMemTraceSizeLimit((memsize_t) topology->getPropInt64("@memTraceSizeLimit", 0));
Expand Down
54 changes: 36 additions & 18 deletions roxie/ccd/ccdquery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,8 @@ class CSharedOnceContext : public CInterfaceOf<ISharedOnceContext>

QueryOptions::QueryOptions()
{
priority = 0;
priority = QUERY_LOW_PRIORITY_VALUE;
dynPriority = QUERY_LOW_PRIORITY_VALUE;
timeLimit = defaultTimeLimit[0];
warnTimeLimit = defaultWarnTimeLimit[0];

Expand Down Expand Up @@ -358,6 +359,7 @@ QueryOptions::QueryOptions()
QueryOptions::QueryOptions(const QueryOptions &other)
{
priority = other.priority;
dynPriority = other.dynPriority.load();
timeLimit = other.timeLimit;
warnTimeLimit = other.warnTimeLimit;

Expand Down Expand Up @@ -394,23 +396,31 @@ QueryOptions::QueryOptions(const QueryOptions &other)
numWorkflowThreads = other.numWorkflowThreads;
}

void QueryOptions::setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stateInfo)
void QueryOptions::updateDynPriority(int _priority)
{
// calculate priority before others since it affects the defaults of others
updateFromWorkUnit(priority, wu, "priority");
if (stateInfo)
updateFromContext(priority, stateInfo, "@priority");
if ((int)priority < 0)
dynPriority = _priority;
if (dynPriority < QUERY_LOW_PRIORITY_VALUE)
{
// use LOW queue time limits ...
timeLimit = defaultTimeLimit[0];
warnTimeLimit = defaultWarnTimeLimit[0];
}
else
{
timeLimit = defaultTimeLimit[priority];
warnTimeLimit = defaultWarnTimeLimit[priority];
timeLimit = defaultTimeLimit[_priority];
warnTimeLimit = defaultWarnTimeLimit[_priority];
}
}

void QueryOptions::setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stateInfo)
{
// calculate priority before others since it affects the defaults of others
updateFromWorkUnit(priority, wu, "priority");
if (stateInfo)
updateFromContext(priority, stateInfo, "@priority");

updateDynPriority((int)priority);

updateFromWorkUnit(timeLimit, wu, "timeLimit");
updateFromWorkUnit(warnTimeLimit, wu, "warnTimeLimit");
updateFromWorkUnitM(memoryLimit, wu, "memoryLimit");
Expand Down Expand Up @@ -495,6 +505,20 @@ void QueryOptions::setFromContext(const IPropertyTree *ctx)
if (ctx)
{
// Note: priority cannot be set at context level
// b/c this is after activities have been created, but we could
// dynamically adj priority in the header activityId before sending
int tmpPriority = (int)priority;
updateFromContext(tmpPriority, ctx, "@priority", "_Priority");

if (tmpPriority > queryMaxPriorityValue)
tmpPriority = queryMaxPriorityValue;
if (tmpPriority < queryMinPriorityValue)
tmpPriority = queryMinPriorityValue;

// only adjust lower ...
if (tmpPriority < (int)priority)
updateDynPriority(tmpPriority);

updateFromContext(timeLimit, ctx, "@timeLimit", "_TimeLimit");
updateFromContext(warnTimeLimit, ctx, "@warnTimeLimit", "_WarnTimeLimit");
updateFromContextM(memoryLimit, ctx, "@memoryLimit", "_MemoryLimit");
Expand Down Expand Up @@ -624,15 +648,9 @@ class CQueryFactory : implements IQueryFactory, implements IResourceContext, pub

if (isSuspended)
return createRoxieServerDummyActivityFactory(id, subgraphId, *this, NULL, TAKnone, node, false); // Is there actually any point?
switch (options.priority)
{
case 1:
rid |= ROXIE_HIGH_PRIORITY;
break;
case 2:
rid |= ROXIE_SLA_PRIORITY;
break;
}

rid |= getPriorityMask(options.priority);

StringBuffer helperName;
helperName.append("fAc").append(id);
HelperFactory *helperFactory = dll->getFactory(helperName);
Expand Down
3 changes: 2 additions & 1 deletion roxie/ccd/ccdquery.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,10 @@ class QueryOptions
void setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stateInfo);
void setFromContext(const IPropertyTree *ctx);
void setFromAgentLoggingFlags(unsigned loggingFlags);

void updateDynPriority(int _priority);

unsigned priority;
mutable std::atomic<int> dynPriority;
unsigned timeLimit;
unsigned warnTimeLimit;
unsigned traceLimit;
Expand Down
Loading

0 comments on commit 3612b8a

Please sign in to comment.