Skip to content

Commit

Permalink
handle broken evaluation worker pipes on write
Browse files Browse the repository at this point in the history
  • Loading branch information
Mic92 committed Dec 10, 2023
1 parent 4f78b16 commit a0b1d84
Showing 1 changed file with 94 additions and 14 deletions.
108 changes: 94 additions & 14 deletions src/nix-eval-jobs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,46 @@ std::string attrPathJoin(json input) {
return 0;
}

class LineReader {
public:
LineReader(int fd) {
stream = fdopen(fd, "r");
if (!stream) {
throw Error("fdopen failed: %s", strerror(errno));
}
}

~LineReader() {
free(stream); // we don't own the fd, so don't close it
free(buffer);
}

LineReader(LineReader &&other) {
stream = other.stream;
other.stream = nullptr;
buffer = other.buffer;
other.buffer = nullptr;
len = other.len;
other.len = 0;
}

[[nodiscard]] std::string_view readLine() {
ssize_t read = getline(&buffer, &len, stream);

if (read == -1) {
return {}; // Return an empty string_view in case of error
}

// Remove trailing newline
return std::string_view(buffer, read - 1);
}

private:
FILE *stream = nullptr;
char *buffer = nullptr;
size_t len = 0;
};

static void worker(ref<EvalState> state, Bindings &autoArgs, AutoCloseFD &to,
AutoCloseFD &from) {

Expand All @@ -316,13 +356,15 @@ static void worker(ref<EvalState> state, Bindings &autoArgs, AutoCloseFD &to,
}
}();

LineReader fromReader(from.get());

while (true) {
/* Wait for the collector to send us a job name. */
if (tryWriteLine(to.get(), "next") < 0) {
return; // main process died
}

auto s = readLine(from.get());
auto s = fromReader.readLine();
if (s == "exit")
break;
if (!hasPrefix(s, "do "))
Expand Down Expand Up @@ -477,19 +519,28 @@ struct State {
std::exception_ptr exc;
};

void handleBrokenWorkerPipe(pid_t child) {
void handleBrokenWorkerPipe(Proc &proc) {
while (1) {
int rc = waitpid(child, nullptr, WNOHANG);
int rc = waitpid(proc.pid, nullptr, WNOHANG);
if (rc == 0) {
proc.pid = -1; // we already took the process status from Proc, no
// need to wait for it again to avoid error messages
throw Error("BUG: worker pipe closed but worker still running?");
} else if (rc == -1) {
proc.pid = -1;
throw Error("BUG: waitpid waiting for worker failed: %s",
strerror(errno));
} else {
if (WIFEXITED(rc)) {
proc.pid = -1;
throw Error("evaluation worker exited with %d",
WEXITSTATUS(rc));
} else if (WIFSIGNALED(rc)) {
proc.pid = -1;
if (WTERMSIG(rc) == SIGKILL) {
throw Error("evaluation worker killed by SIGKILL, maybe "
"memory limit reached?");
}
throw Error("evaluation worker killed by signal %d",
WTERMSIG(rc));
} // else ignore WIFSTOPPED and WIFCONTINUED
Expand All @@ -502,20 +553,35 @@ std::function<void()> collector(Sync<State> &state_,
return [&]() {
try {
std::optional<std::unique_ptr<Proc>> proc_;
std::optional<std::unique_ptr<LineReader>> fromReader_;

while (true) {

auto proc = proc_.has_value() ? std::move(proc_.value())
: std::make_unique<Proc>(worker);
if (!proc_.has_value()) {
proc_ = std::make_unique<Proc>(worker);
fromReader_ =
std::make_unique<LineReader>(proc_.value()->from.get());
}
auto proc = std::move(proc_.value());
auto fromReader = std::move(fromReader_.value());

/* Check whether the existing worker process is still there. */
auto s = readLine(proc->from.get());
if (s == "restart") {
auto s = fromReader->readLine();
if (s == "") {
handleBrokenWorkerPipe(*proc.get());
} else if (s == "restart") {
proc_ = std::nullopt;
fromReader_ = std::nullopt;
continue;
} else if (s != "next") {
auto json = json::parse(s);
throw Error("worker error: %s", (std::string)json["error"]);
try {
auto json = json::parse(s);
throw Error("worker error: %s",
(std::string)json["error"]);
} catch (const json::exception &e) {
throw Error(
"Received invalid JSON from worker: %s '%s'",
e.what(), s);
}
}

/* Wait for a job name to become available. */
Expand All @@ -527,7 +593,7 @@ std::function<void()> collector(Sync<State> &state_,
if ((state->todo.empty() && state->active.empty()) ||
state->exc) {
if (tryWriteLine(proc->to.get(), "exit") < 0) {
handleBrokenWorkerPipe(proc->pid);
handleBrokenWorkerPipe(*proc.get());
}
return;
}
Expand All @@ -542,12 +608,25 @@ std::function<void()> collector(Sync<State> &state_,

/* Tell the worker to evaluate it. */
if (tryWriteLine(proc->to.get(), "do " + attrPath.dump()) < 0) {
handleBrokenWorkerPipe(proc->pid);
handleBrokenWorkerPipe(*proc.get());
}

/* Wait for the response. */
auto respString = readLine(proc->from.get());
auto response = json::parse(respString);
auto respString = fromReader->readLine();
if (respString == "") {
handleBrokenWorkerPipe(*proc.get());
}
json response;
try {
response = json::parse(respString);
if (response.find("error") != response.end()) {
throw Error("worker error: %s",
(std::string)response["error"]);
}
} catch (const json::exception &e) {
throw Error("Received invalid JSON from worker: %s '%s'",
e.what(), respString);
}

/* Handle the response. */
std::vector<json> newAttrs;
Expand All @@ -563,6 +642,7 @@ std::function<void()> collector(Sync<State> &state_,
}

proc_ = std::move(proc);
fromReader_ = std::move(fromReader);

/* Add newly discovered job names to the queue. */
{
Expand Down

0 comments on commit a0b1d84

Please sign in to comment.