Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-32890 Add cleanjobqueues to daliadmin #19242

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions dali/daliadmin/daadmin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3385,4 +3385,43 @@ void removeOrphanedGlobalVariables(bool dryrun, bool reconstruct)
}
}

void cleanJobQueues(bool dryRun)
{
Owned<IRemoteConnection> conn = querySDS().connect("/JobQueues", myProcessSession(), 0, SDS_LOCK_TIMEOUT);
if (!conn)
{
WARNLOG("Failed to connect to /JobQueues");
return;
}
Owned<IPropertyTreeIterator> queueIter = conn->queryRoot()->getElements("Queue");
ForEach(*queueIter)
{
IPropertyTree &queue = queueIter->query();
const char *name = queue.queryProp("@name");
if (isEmptyString(name)) // should not be blank, but guard
continue;
PROGLOG("Processing queue: %s", name);
VStringBuffer queuePath("/JobQueues/Queue[@name=\"%s\"]", name);
Owned<IRemoteConnection> queueConn = querySDS().connect(queuePath, myProcessSession(), RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT);
IPropertyTree *queueRoot = queueConn->queryRoot();

Owned<IPropertyTreeIterator> clientIter = queueRoot->getElements("Client");
std::vector<IPropertyTree *> toRemove;
ForEach (*clientIter)
{
IPropertyTree &client = clientIter->query();
if (client.getPropInt("@connected") == 0)
toRemove.push_back(&client);
}
if (!dryRun)
{
for (auto &client: toRemove)
queue.removeTree(client);
}
PROGLOG("Job queue '%s': %s %u stale client entries", name, dryRun ? "dryrun, there are" : "removed", (unsigned)toRemove.size());
queueConn->commit();
queueConn.clear();
}
}

} // namespace daadmin
1 change: 1 addition & 0 deletions dali/daliadmin/daadmin.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,5 +95,6 @@ extern DALIADMIN_API void daliping(const char *dalis, unsigned connecttime, unsi

extern DALIADMIN_API void validateStore(bool fix, bool deleteFiles, bool verbose);
extern DALIADMIN_API void removeOrphanedGlobalVariables(bool dryrun, bool reconstruct);
extern DALIADMIN_API void cleanJobQueues(bool dryRun);

} // namespace daadmin
6 changes: 6 additions & 0 deletions dali/daliadmin/daliadmin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ void usage(const char *exe)
printf("Other dali server and misc commands:\n");
printf(" auditlog <fromdate> <todate> <match>\n");
printf(" cleanglobalwuid [dryrun] [noreconstruct]\n");
printf(" cleanjobqueues [dryrun]\n");
printf(" clusterlist <mask> -- list clusters (mask optional)\n");
printf(" coalesce -- force transaction coalesce\n");
printf(" dalilocks [ <ip-pattern> ] [ files ] -- get all locked files/xpaths\n");
Expand Down Expand Up @@ -576,6 +577,11 @@ int main(int argc, const char* argv[])
}
removeOrphanedGlobalVariables(dryrun, reconstruct);
}
else if (strieq(cmd, "cleanjobqueues"))
{
bool dryRun = np>0 && strieq("dryrun", params.item(1));
cleanJobQueues(dryRun);
}
else if (strieq(cmd, "remotetest"))
remoteTest(params.item(1), true);
else
Expand Down
Loading