Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow globbing for named constituents #1425

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
325 changes: 235 additions & 90 deletions src/hydra-eval-jobs/hydra-eval-jobs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
#include <sys/wait.h>
#include <sys/resource.h>

#include <fnmatch.h>

#include <nlohmann/json.hpp>

void check_pid_status_nonblocking(pid_t check_pid)
Expand Down Expand Up @@ -160,6 +162,9 @@ static void worker(
auto vRoot = state.allocValue();
state.autoCallFunction(autoArgs, vTop, *vRoot);

size_t prev = 0;
auto start = std::chrono::steady_clock::now();

while (true) {
/* Wait for the master to send us a job name. */
writeLine(to.get(), "next");
Expand Down Expand Up @@ -234,6 +239,10 @@ static void worker(
if (v->type() == nString)
job["namedConstituents"].push_back(v->string_view());
}

auto glob = v->attrs()->get(state.symbols.create("_hydraGlobConstituents"));
bool globConstituents = glob && state.forceBool(*glob->value, glob->pos, "while evaluating the `_hydraGlobConstituents` attribute");
job["globConstituents"] = globConstituents;
}

/* Register the derivation as a GC root. !!! This
Expand Down Expand Up @@ -294,14 +303,229 @@ static void worker(

/* If our RSS exceeds the maximum, exit. The master will
start a new process. */

auto end = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::seconds>(start - end).count();
struct rusage r;
getrusage(RUSAGE_SELF, &r);
size_t delta = (size_t)r.ru_maxrss - prev; // KiB
if (delta > maxMemorySize * 1024 * 0.5 || (duration > 1))
printError("evaluating job '%s' increased memory usage by %d MiB", attrPath,
(r.ru_maxrss - prev)/1024);

prev = r.ru_maxrss;
if ((size_t) r.ru_maxrss > maxMemorySize * 1024) break;
}

writeLine(to.get(), "restart");
}

struct DependencyCycle : public std::exception {
std::string a;
std::string b;
std::set<std::string> remainingAggregates;

DependencyCycle(const std::string & a, const std::string & b, const std::set<std::string> & remainingAggregates) : a(a), b(b), remainingAggregates(remainingAggregates) {}

std::string what() {
return fmt("Dependency cycle: %s <-> %s", a, b);
}
};

struct AggregateJob
{
std::string name;
std::set<std::string> dependencies;
std::unordered_map<std::string, std::string> brokenJobs;

bool operator<(const AggregateJob & b) const { return name < b.name; }
};

// This is copied from `libutil/topo-sort.hh` in CppNix and slightly modified.
// However, I needed a way to use strings as identifiers to sort, but still be able
// to put AggregateJob objects into this function since I'd rather not
// have to transform back and forth between a list of strings and AggregateJobs
// in resolveNamedConstituents.
std::vector<AggregateJob> topoSort(std::set<AggregateJob> items)
{
std::vector<AggregateJob> sorted;
std::set<std::string> visited, parents;

std::map<std::string, AggregateJob> dictIdentToObject;
for (auto & it : items) {
dictIdentToObject.insert({it.name, it});
}

std::function<void(const std::string & path, const std::string * parent)> dfsVisit;

dfsVisit = [&](const std::string & path, const std::string * parent) {
if (parents.count(path)) {
dictIdentToObject.erase(path);
dictIdentToObject.erase(*parent);
std::set<std::string> remaining;
for (auto & [k, _] : dictIdentToObject) {
remaining.insert(k);
}
throw DependencyCycle(path, *parent, remaining);
}

if (!visited.insert(path).second) return;
parents.insert(path);

std::set<std::string> references = dictIdentToObject[path].dependencies;

for (auto & i : references)
/* Don't traverse into items that don't exist in our starting set. */
if (i != path && dictIdentToObject.find(i) != dictIdentToObject.end())
dfsVisit(i, &path);

sorted.push_back(dictIdentToObject[path]);
parents.erase(path);
};

for (auto & [i, _] : dictIdentToObject)
dfsVisit(i, nullptr);

return sorted;
}

static bool insertMatchingConstituents(const std::string & childJobName,
const std::string & jobName,
std::function<bool(const std::string &, nlohmann::json &)> isBroken,
nlohmann::json & jobs,
std::set<std::string> & results)
{
bool expansionFound = false;
for (auto job = jobs.begin(); job != jobs.end(); job++) {
// Never select the job itself as constituent. Trivial way
// to avoid obvious cycles.
if (job.key() == jobName) {
continue;
}
auto jobName = job.key();
if (fnmatch(childJobName.c_str(), jobName.c_str(), 0) == 0 && !isBroken(jobName, *job)) {
results.insert(jobName);
expansionFound = true;
}
}

return expansionFound;
}

static std::vector<AggregateJob> resolveNamedConstituents(nlohmann::json & jobs)
{
std::set<AggregateJob> aggregateJobs;
for (auto i = jobs.begin(); i != jobs.end(); ++i) {
auto jobName = i.key();
auto & job = i.value();

auto named = job.find("namedConstituents");
if (named != job.end()) {
bool globConstituents = job.value<bool>("globConstituents", false);
std::unordered_map<std::string, std::string> brokenJobs;
std::set<std::string> results;

auto isBroken = [&brokenJobs, &jobName](
const std::string & childJobName, nlohmann::json & job) -> bool {
if (job.find("error") != job.end()) {
std::string error = job["error"];
printError("aggregate job '%s' references broken job '%s': %s", jobName, childJobName, error);
brokenJobs[childJobName] = error;
return true;
} else {
return false;
}
};

for (const std::string & childJobName : *named) {
auto childJob = jobs.find(childJobName);
if (childJob == jobs.end()) {
if (!globConstituents) {
printError("aggregate job '%s' references non-existent job '%s'", jobName, childJobName);
brokenJobs[childJobName] = "does not exist";
} else if (!insertMatchingConstituents(childJobName, jobName, isBroken, jobs, results)) {
warn("aggregate job '%s' references constituent glob pattern '%s' with no matches", jobName, childJobName);
brokenJobs[childJobName] = "constituent glob pattern had no matches";
}
} else if (!isBroken(childJobName, *childJob)) {
results.insert(childJobName);
}
}

aggregateJobs.insert(AggregateJob(jobName, results, brokenJobs));
}
}

return topoSort(aggregateJobs);
}

static void rewriteAggregates(nlohmann::json & jobs,
std::vector<AggregateJob> aggregateJobs,
bool dryRun,
ref<Store> store)
{
for (auto & aggregateJob : aggregateJobs) {
auto & job = jobs.find(aggregateJob.name).value();
if (dryRun) {
for (auto & childJobName : aggregateJob.dependencies) {
std::string constituentDrvPath = jobs[childJobName]["drvPath"];
job["constituents"].push_back(constituentDrvPath);
}
} else {
auto drvPath = store->parseStorePath((std::string) job["drvPath"]);
auto drv = store->readDerivation(drvPath);

for (auto & childJobName : aggregateJob.dependencies) {
auto childDrvPath = store->parseStorePath((std::string) jobs[childJobName]["drvPath"]);
auto childDrv = store->readDerivation(childDrvPath);
job["constituents"].push_back(store->printStorePath(childDrvPath));
drv.inputDrvs.map[childDrvPath].value = {childDrv.outputs.begin()->first};
}

if (aggregateJob.brokenJobs.empty()) {
std::string drvName(drvPath.name());
assert(hasSuffix(drvName, drvExtension));
drvName.resize(drvName.size() - drvExtension.size());

auto hashModulo = hashDerivationModulo(*store, drv, true);
if (hashModulo.kind != DrvHash::Kind::Regular) continue;
auto h = hashModulo.hashes.find("out");
if (h == hashModulo.hashes.end()) continue;
auto outPath = store->makeOutputPath("out", h->second, drvName);
drv.env["out"] = store->printStorePath(outPath);
drv.outputs.insert_or_assign("out", DerivationOutput::InputAddressed { .path = outPath });
auto newDrvPath = store->printStorePath(writeDerivation(*store, drv));

debug("rewrote aggregate derivation %s -> %s", store->printStorePath(drvPath), newDrvPath);

job["drvPath"] = newDrvPath;
job["outputs"]["out"] = store->printStorePath(outPath);
}
}

job.erase("namedConstituents");

/* Register the derivation as a GC root. !!! This
registers roots for jobs that we may have already
done. */
auto localStore = store.dynamic_pointer_cast<LocalFSStore>();
if (gcRootsDir != "" && localStore) {
auto drvPath = job["drvPath"].get<std::string>();
Path root = gcRootsDir + "/" + std::string(baseNameOf(drvPath));
if (!pathExists(root))
localStore->addPermRoot(localStore->parseStorePath(drvPath), root);
}

if (!aggregateJob.brokenJobs.empty()) {
std::stringstream ss;
for (const auto& [jobName, error] : aggregateJob.brokenJobs) {
ss << jobName << ": " << error << "\n";
}
job["error"] = ss.str();
}
}
}

int main(int argc, char * * argv)
{
/* Prevent undeclared dependencies in the evaluation via
Expand Down Expand Up @@ -484,101 +708,22 @@ int main(int argc, char * * argv)
if (state->exc)
std::rethrow_exception(state->exc);

/* For aggregate jobs that have named consistuents
/* For aggregate jobs that have named constituents
(i.e. constituents that are a job name rather than a
derivation), look up the referenced job and add it to the
dependencies of the aggregate derivation. */
auto store = openStore();

for (auto i = state->jobs.begin(); i != state->jobs.end(); ++i) {
auto jobName = i.key();
auto & job = i.value();

auto named = job.find("namedConstituents");
if (named == job.end()) continue;

std::unordered_map<std::string, std::string> brokenJobs;
auto getNonBrokenJobOrRecordError = [&brokenJobs, &jobName, &state](
const std::string & childJobName) -> std::optional<nlohmann::json> {
auto childJob = state->jobs.find(childJobName);
if (childJob == state->jobs.end()) {
printError("aggregate job '%s' references non-existent job '%s'", jobName, childJobName);
brokenJobs[childJobName] = "does not exist";
return std::nullopt;
}
if (childJob->find("error") != childJob->end()) {
std::string error = (*childJob)["error"];
printError("aggregate job '%s' references broken job '%s': %s", jobName, childJobName, error);
brokenJobs[childJobName] = error;
return std::nullopt;
}
return *childJob;
};

if (myArgs.dryRun) {
for (std::string jobName2 : *named) {
auto job2 = getNonBrokenJobOrRecordError(jobName2);
if (!job2) {
continue;
}
std::string drvPath2 = (*job2)["drvPath"];
job["constituents"].push_back(drvPath2);
}
} else {
auto drvPath = store->parseStorePath((std::string) job["drvPath"]);
auto drv = store->readDerivation(drvPath);

for (std::string jobName2 : *named) {
auto job2 = getNonBrokenJobOrRecordError(jobName2);
if (!job2) {
continue;
}
auto drvPath2 = store->parseStorePath((std::string) (*job2)["drvPath"]);
auto drv2 = store->readDerivation(drvPath2);
job["constituents"].push_back(store->printStorePath(drvPath2));
drv.inputDrvs.map[drvPath2].value = {drv2.outputs.begin()->first};
}

if (brokenJobs.empty()) {
std::string drvName(drvPath.name());
assert(hasSuffix(drvName, drvExtension));
drvName.resize(drvName.size() - drvExtension.size());

auto hashModulo = hashDerivationModulo(*store, drv, true);
if (hashModulo.kind != DrvHash::Kind::Regular) continue;
auto h = hashModulo.hashes.find("out");
if (h == hashModulo.hashes.end()) continue;
auto outPath = store->makeOutputPath("out", h->second, drvName);
drv.env["out"] = store->printStorePath(outPath);
drv.outputs.insert_or_assign("out", DerivationOutput::InputAddressed { .path = outPath });
auto newDrvPath = store->printStorePath(writeDerivation(*store, drv));

debug("rewrote aggregate derivation %s -> %s", store->printStorePath(drvPath), newDrvPath);

job["drvPath"] = newDrvPath;
job["outputs"]["out"] = store->printStorePath(outPath);
}
}

job.erase("namedConstituents");

/* Register the derivation as a GC root. !!! This
registers roots for jobs that we may have already
done. */
auto localStore = store.dynamic_pointer_cast<LocalFSStore>();
if (gcRootsDir != "" && localStore) {
auto drvPath = job["drvPath"].get<std::string>();
Path root = gcRootsDir + "/" + std::string(baseNameOf(drvPath));
if (!pathExists(root))
localStore->addPermRoot(localStore->parseStorePath(drvPath), root);
}

if (!brokenJobs.empty()) {
std::stringstream ss;
for (const auto& [jobName, error] : brokenJobs) {
ss << jobName << ": " << error << "\n";
}
job["error"] = ss.str();
try {
auto namedConstituents = resolveNamedConstituents(state->jobs);
rewriteAggregates(state->jobs, namedConstituents, myArgs.dryRun, store);
} catch (DependencyCycle & e) {
printError("Found dependency cycle between jobs '%s' and '%s'", e.a, e.b);
state->jobs[e.a]["error"] = e.what();
state->jobs[e.b]["error"] = e.what();

for (auto & jobName : e.remainingAggregates) {
state->jobs[jobName]["error"] = "Skipping aggregate because of a dependency cycle";
}
}

Expand Down
Loading