Skip to content

Commit

Permalink
Merge pull request #340 from Ericson2314/aggregate-jobs
Browse files Browse the repository at this point in the history
Bring back constituents to Hydra
  • Loading branch information
Mic92 authored Nov 25, 2024
2 parents d926bcd + c375e6d commit 2c36b37
Show file tree
Hide file tree
Showing 9 changed files with 287 additions and 8 deletions.
2 changes: 1 addition & 1 deletion flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
"nscloud-cache-size-20gb"
"nscloud-cache-tag-nix-eval-jobs"
];
"x86_64-darwin" = "macos-12";
"x86_64-darwin" = "macos-13";
"aarch64-darwin" = "macos-14";
"aarch64-linux" = [
"nscloud-ubuntu-22.04-arm64-4x16-with-cache"
Expand Down
9 changes: 8 additions & 1 deletion src/drv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ auto queryCacheStatus(nix::Store &store,

/* The fields of a derivation that are printed in json form */
Drv::Drv(std::string &attrPath, nix::EvalState &state,
nix::PackageInfo &packageInfo, MyArgs &args) {
nix::PackageInfo &packageInfo, MyArgs &args,
std::optional<Constituents> constituents)
: constituents(constituents) {

auto localStore = state.store.dynamic_pointer_cast<nix::LocalFSStore>();

Expand Down Expand Up @@ -177,6 +179,11 @@ void to_json(nlohmann::json &json, const Drv &drv) {
json["meta"] = drv.meta.value();
}

if (auto constituents = drv.constituents) {
json["constituents"] = constituents->constituents;
json["namedConstituents"] = constituents->namedConstituents;
}

if (drv.cacheStatus != Drv::CacheStatus::Unknown) {
// Deprecated field
json["isCached"] = drv.cacheStatus == Drv::CacheStatus::Cached ||
Expand Down
12 changes: 11 additions & 1 deletion src/drv.hh
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,19 @@ class EvalState;
struct PackageInfo;
} // namespace nix

struct Constituents {
std::vector<std::string> constituents;
std::vector<std::string> namedConstituents;
Constituents(std::vector<std::string> constituents,
std::vector<std::string> namedConstituents)
: constituents(constituents), namedConstituents(namedConstituents) {};
};

/* The fields of a derivation that are printed in json form */
struct Drv {
Drv(std::string &attrPath, nix::EvalState &state,
nix::PackageInfo &packageInfo, MyArgs &args);
nix::PackageInfo &packageInfo, MyArgs &args,
std::optional<Constituents> constituents);
std::string name;
std::string system;
std::string drvPath;
Expand All @@ -31,5 +40,6 @@ struct Drv {
std::map<std::string, std::optional<std::string>> outputs;
std::map<std::string, std::set<std::string>> inputDrvs;
std::optional<nlohmann::json> meta;
std::optional<Constituents> constituents;
};
void to_json(nlohmann::json &json, const Drv &drv);
6 changes: 6 additions & 0 deletions src/eval-args.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ MyArgs::MyArgs() : MixCommonArgs("nix-eval-jobs") {
.description = "include derivation meta field in output",
.handler = {&meta, true}});

addFlag(
{.longName = "constituents",
.description =
"whether to evaluate constituents for Hydra's aggregate feature",
.handler = {&constituents, true}});

addFlag(
{.longName = "check-cache-status",
.description =
Expand Down
1 change: 1 addition & 0 deletions src/eval-args.hh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class MyArgs : virtual public nix::MixEvalArgs,
bool impure = false;
bool forceRecurse = false;
bool checkCacheStatus = false;
bool constituents = false;
size_t nrWorkers = 1;
size_t maxMemorySize = 4096;

Expand Down
106 changes: 105 additions & 1 deletion src/nix-eval-jobs.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include <nix/config.h> // IWYU pragma: keep
#include <curl/curl.h>
#include <nix/derivations.hh>
#include <nix/local-fs-store.hh>
#include <nix/eval-settings.hh>
#include <nix/shared.hh>
#include <nix/sync.hh>
Expand Down Expand Up @@ -174,6 +176,7 @@ struct State {
std::set<nlohmann::json> todo =
nlohmann::json::array({nlohmann::json::array()});
std::set<nlohmann::json> active;
std::map<std::string, nlohmann::json> jobs;
std::exception_ptr exc;
};

Expand Down Expand Up @@ -344,7 +347,11 @@ void collector(nix::Sync<State> &state_, std::condition_variable &wakeup) {
}
} else {
auto state(state_.lock());
std::cout << respString << "\n" << std::flush;
state->jobs.insert_or_assign(response["attr"], response);
auto named = response.find("namedConstituents");
if (named == response.end() || named->empty()) {
std::cout << respString << "\n" << std::flush;
}
}

proc_ = std::move(proc);
Expand Down Expand Up @@ -448,5 +455,102 @@ auto main(int argc, char **argv) -> int {
if (state->exc) {
std::rethrow_exception(state->exc);
}

if (myArgs.constituents) {
auto store = myArgs.evalStoreUrl
? nix::openStore(*myArgs.evalStoreUrl)
: nix::openStore();
for (auto &[attr, job_json] : state->jobs) {
auto namedConstituents = job_json.find("namedConstituents");
if (namedConstituents != job_json.end() &&
!namedConstituents->empty()) {
bool broken = false;
auto drvPathAggregate =
store->parseStorePath((std::string)job_json["drvPath"]);
auto drvAggregate = store->readDerivation(drvPathAggregate);
if (!job_json.contains("constituents")) {
job_json["constituents"] = nlohmann::json::array();
}
std::vector<std::string> errors;
for (auto child : *namedConstituents) {
auto childJob = state->jobs.find(child);
if (childJob == state->jobs.end()) {
broken = true;
errors.push_back(
nix::fmt("%s: does not exist", child));
} else if (childJob->second.find("error") !=
childJob->second.end()) {
broken = true;
errors.push_back(nix::fmt(
"%s: %s", child, childJob->second["error"]));
} else {
auto drvPathChild = store->parseStorePath(
(std::string)childJob->second["drvPath"]);
auto drvChild = store->readDerivation(drvPathChild);
job_json["constituents"].push_back(
store->printStorePath(drvPathChild));
drvAggregate.inputDrvs.map[drvPathChild].value = {
drvChild.outputs.begin()->first};
}
}

if (broken) {
nlohmann::json out;
out["attr"] = job_json["attr"];
out["error"] = nix::concatStringsSep("\n", errors);
out["constituents"] = nlohmann::json::array();
std::cout << out.dump() << "\n" << std::flush;
} else {
std::string drvName(drvPathAggregate.name());
assert(drvName.ends_with(nix::drvExtension));
drvName.resize(drvName.size() -
nix::drvExtension.size());

auto hashModulo = nix::hashDerivationModulo(
*store, drvAggregate, true);
if (hashModulo.kind != nix::DrvHash::Kind::Regular)
continue;

auto h = hashModulo.hashes.find("out");
if (h == hashModulo.hashes.end())
continue;
auto outPath =
store->makeOutputPath("out", h->second, drvName);
drvAggregate.env["out"] =
store->printStorePath(outPath);
drvAggregate.outputs.insert_or_assign(
"out", nix::DerivationOutput::InputAddressed{
.path = outPath});
auto newDrvPath = store->printStorePath(
nix::writeDerivation(*store, drvAggregate));

if (myArgs.gcRootsDir != "") {
nix::Path root =
myArgs.gcRootsDir + "/" +
std::string(nix::baseNameOf(newDrvPath));
if (!nix::pathExists(root)) {
auto localStore = store.dynamic_pointer_cast<
nix::LocalFSStore>();
auto storePath =
localStore->parseStorePath(newDrvPath);
localStore->addPermRoot(storePath, root);
}
}

nix::logger->log(
nix::lvlDebug,
nix::fmt("rewrote aggregate derivation %s -> %s",
store->printStorePath(drvPathAggregate),
newDrvPath));

job_json["drvPath"] = newDrvPath;
job_json["outputs"]["out"] =
store->printStorePath(outPath);
job_json.erase("namedConstituents");
std::cout << job_json.dump() << "\n" << std::flush;
}
}
}
}
});
}
61 changes: 58 additions & 3 deletions src/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,64 @@ void worker(
state->autoCallFunction(autoArgs, *vTmp, *v);

if (v->type() == nix::nAttrs) {
auto packageInfo = nix::getDerivation(*state, *v, false);
if (packageInfo) {
auto drv = Drv(attrPathS, *state, *packageInfo, args);
if (auto packageInfo = nix::getDerivation(*state, *v, false)) {
std::optional<Constituents> maybeConstituents;
if (args.constituents) {
std::vector<std::string> constituents;
std::vector<std::string> namedConstituents;
auto a = v->attrs()->get(
state->symbols.create("_hydraAggregate"));
if (a &&
state->forceBool(*a->value, a->pos,
"while evaluating the "
"`_hydraAggregate` attribute")) {
auto a = v->attrs()->get(
state->symbols.create("constituents"));
if (!a)
state
->error<nix::EvalError>(
"derivation must have a ‘constituents’ "
"attribute")
.debugThrow();

nix::NixStringContext context;
state->coerceToString(
a->pos, *a->value, context,
"while evaluating the `constituents` attribute",
true, false);
for (auto &c : context)
std::visit(
nix::overloaded{
[&](const nix::NixStringContextElem::
Built &b) {
constituents.push_back(
b.drvPath->to_string(
*state->store));
},
[&](const nix::NixStringContextElem::
Opaque &o) {},
[&](const nix::NixStringContextElem::
DrvDeep &d) {},
},
c.raw);

state->forceList(*a->value, a->pos,
"while evaluating the "
"`constituents` attribute");
for (unsigned int n = 0; n < a->value->listSize();
++n) {
auto v = a->value->listElems()[n];
state->forceValue(*v, nix::noPos);
if (v->type() == nix::nString)
namedConstituents.push_back(
std::string(v->c_str()));
}
}
maybeConstituents =
Constituents(constituents, namedConstituents);
}
auto drv = Drv(attrPathS, *state, *packageInfo, args,
maybeConstituents);
reply.update(drv);

/* Register the derivation as a GC root. !!! This
Expand Down
32 changes: 31 additions & 1 deletion tests/assets/flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixpkgs-unstable";

outputs =
{ nixpkgs, ... }:
{ self, nixpkgs, ... }:
let
pkgs = nixpkgs.legacyPackages.x86_64-linux;
in
Expand All @@ -25,6 +25,36 @@
builder = ":";
};
};
success = {
aggregate =
pkgs.runCommand "aggregate"
{
_hydraAggregate = true;
constituents = [
self.hydraJobs.builtJob
"anotherone"
];
}
''
touch $out
'';
anotherone = pkgs.writeText "constituent" "text";
};
failures = {
aggregate =
pkgs.runCommand "aggregate"
{
_hydraAggregate = true;
constituents = [
"doesntexist"
"doesnteval"
];
}
''
touch $out
'';
doesnteval = pkgs.writeText "constituent" (toString { });
};
};
};
}
66 changes: 66 additions & 0 deletions tests/test_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,72 @@ def test_eval_error() -> None:
assert "this is an evaluation error" in attrs["error"]


def test_constituents() -> None:
with TemporaryDirectory() as tempdir:
cmd = [
str(BIN),
"--gc-roots-dir",
tempdir,
"--meta",
"--workers",
"1",
"--flake",
".#legacyPackages.x86_64-linux.success",
"--constituents",
]
res = subprocess.run(
cmd,
cwd=TEST_ROOT.joinpath("assets"),
text=True,
stdout=subprocess.PIPE,
)
print(res.stdout)
results = [json.loads(r) for r in res.stdout.split("\n") if r]
assert len(results) == 2
child = results[0]
assert child["attr"] == "anotherone"
aggregate = results[1]
assert aggregate["attr"] == "aggregate"
assert "namedConstituents" not in aggregate
assert aggregate["constituents"][0].endswith("-job1.drv")
assert aggregate["constituents"][1] == child["drvPath"]
assert "error" not in aggregate


def test_constituents_error() -> None:
with TemporaryDirectory() as tempdir:
cmd = [
str(BIN),
"--gc-roots-dir",
tempdir,
"--meta",
"--workers",
"1",
"--flake",
".#legacyPackages.x86_64-linux.failures",
"--constituents",
]
res = subprocess.run(
cmd,
cwd=TEST_ROOT.joinpath("assets"),
text=True,
stdout=subprocess.PIPE,
)
print(res.stdout)
results = [json.loads(r) for r in res.stdout.split("\n") if r]
assert len(results) == 2
child = results[0]
assert child["attr"] == "doesnteval"
assert "error" in child
aggregate = results[1]
assert aggregate["attr"] == "aggregate"
assert "namedConstituents" not in aggregate
assert aggregate["error"].startswith(
'"doesntexist": does not exist\n"doesnteval": "error: derivation '
)
assert aggregate["constituents"] == []


@pytest.mark.infiniterecursion
def test_recursion_error() -> None:
with TemporaryDirectory() as tempdir:
Expand Down

0 comments on commit 2c36b37

Please sign in to comment.