Skip to content

Commit

Permalink
Merge pull request #19242 from jakesmith/HPCC-32890-cleanjobqueues
Browse files Browse the repository at this point in the history
HPCC-32890 Add cleanjobqueues to daliadmin

Reviewed-By: Shamser Ahmed <[email protected]>
Reviewed-by: Gavin Halliday <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Oct 29, 2024
2 parents ed05bcf + c44c522 commit 0c76000
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 0 deletions.
39 changes: 39 additions & 0 deletions dali/daliadmin/daadmin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3385,4 +3385,43 @@ void removeOrphanedGlobalVariables(bool dryrun, bool reconstruct)
}
}

void cleanJobQueues(bool dryRun)
{
Owned<IRemoteConnection> conn = querySDS().connect("/JobQueues", myProcessSession(), 0, SDS_LOCK_TIMEOUT);
if (!conn)
{
WARNLOG("Failed to connect to /JobQueues");
return;
}
Owned<IPropertyTreeIterator> queueIter = conn->queryRoot()->getElements("Queue");
ForEach(*queueIter)
{
IPropertyTree &queue = queueIter->query();
const char *name = queue.queryProp("@name");
if (isEmptyString(name)) // should not be blank, but guard
continue;
PROGLOG("Processing queue: %s", name);
VStringBuffer queuePath("/JobQueues/Queue[@name=\"%s\"]", name);
Owned<IRemoteConnection> queueConn = querySDS().connect(queuePath, myProcessSession(), RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT);
IPropertyTree *queueRoot = queueConn->queryRoot();

Owned<IPropertyTreeIterator> clientIter = queueRoot->getElements("Client");
std::vector<IPropertyTree *> toRemove;
ForEach (*clientIter)
{
IPropertyTree &client = clientIter->query();
if (client.getPropInt("@connected") == 0)
toRemove.push_back(&client);
}
if (!dryRun)
{
for (auto &client: toRemove)
queue.removeTree(client);
}
PROGLOG("Job queue '%s': %s %u stale client entries", name, dryRun ? "dryrun, there are" : "removed", (unsigned)toRemove.size());
queueConn->commit();
queueConn.clear();
}
}

} // namespace daadmin
1 change: 1 addition & 0 deletions dali/daliadmin/daadmin.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,5 +95,6 @@ extern DALIADMIN_API void daliping(const char *dalis, unsigned connecttime, unsi

extern DALIADMIN_API void validateStore(bool fix, bool deleteFiles, bool verbose);
extern DALIADMIN_API void removeOrphanedGlobalVariables(bool dryrun, bool reconstruct);
extern DALIADMIN_API void cleanJobQueues(bool dryRun);

} // namespace daadmin
6 changes: 6 additions & 0 deletions dali/daliadmin/daliadmin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ void usage(const char *exe)
printf("Other dali server and misc commands:\n");
printf(" auditlog <fromdate> <todate> <match>\n");
printf(" cleanglobalwuid [dryrun] [noreconstruct]\n");
printf(" cleanjobqueues [dryrun]\n");
printf(" clusterlist <mask> -- list clusters (mask optional)\n");
printf(" coalesce -- force transaction coalesce\n");
printf(" dalilocks [ <ip-pattern> ] [ files ] -- get all locked files/xpaths\n");
Expand Down Expand Up @@ -576,6 +577,11 @@ int main(int argc, const char* argv[])
}
removeOrphanedGlobalVariables(dryrun, reconstruct);
}
else if (strieq(cmd, "cleanjobqueues"))
{
bool dryRun = np>0 && strieq("dryrun", params.item(1));
cleanJobQueues(dryRun);
}
else if (strieq(cmd, "remotetest"))
remoteTest(params.item(1), true);
else
Expand Down

0 comments on commit 0c76000

Please sign in to comment.