Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
330 changes: 182 additions & 148 deletions src/libstore/build/derivation-goal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,22 +96,6 @@ std::string DerivationGoal::key()
}


void DerivationGoal::killChild()
{
#ifndef _WIN32 // TODO enable build hook on Windows
hook.reset();
#endif
}


void DerivationGoal::timedOut(Error && ex)
{
killChild();
// We're not inside a coroutine, hence we can't use co_return here.
// Thus we ignore the return value.
[[maybe_unused]] Done _ = done(BuildResult::TimedOut, {}, std::move(ex));
}

void DerivationGoal::addWantedOutputs(const OutputsSpec & outputs)
{
auto newWanted = wantedOutputs.union_(outputs);
Expand Down Expand Up @@ -577,6 +561,105 @@ void DerivationGoal::started()
worker.updateProgress();
}

static std::function<bool(Descriptor, std::string_view)> makeHookLogHandler(
std::shared_ptr<BufferedSink> & logSink,
Activity & act,
const Activity & workerAct,
std::map<ActivityId, Activity> & builderActivities,
bool & failed,
#ifndef _WIN32 // TODO enable build hook on Windows
HookInstance & hook,
#endif
unsigned long & logSize,
std::string & currentLogLine,
size_t & currentLogLinePos,
std::string & currentHookLine,
std::list<std::string> & logTail
)
{
return [&](Descriptor fd, std::string_view data) {
// local & `ssh://`-builds are dealt with here.
bool isWrittenToLog =
#ifdef _WIN32 // TODO enable build hook on Windows
false;
#else
fd == hook.builderOut.readSide.get();
#endif

if (isWrittenToLog) {
logSize += data.size();
if (settings.maxLogSize && logSize > settings.maxLogSize) {
failed = true;
return true; // break out and stop listening to children
}

for (auto c : data)
if (c == '\r')
currentLogLinePos = 0;
else if (c == '\n') {
if (!handleJSONLogMessage(
currentLogLine, act, builderActivities, "the derivation builder", false)) {
act.result(resBuildLogLine, currentLogLine);
logTail.push_back(std::move(currentLogLine));
if (logTail.size() > settings.logLines)
logTail.pop_front();
}

currentLogLine = "";
currentLogLinePos = 0;
} else {
if (currentLogLinePos >= currentLogLine.size())
currentLogLine.resize(currentLogLinePos + 1);
currentLogLine[currentLogLinePos++] = c;
}

if (logSink)
(*logSink)(data);
}

#ifndef _WIN32 // TODO enable build hook on Windows
if (fd == hook.fromHook.readSide.get()) {
for (auto c : data)
if (c == '\n') {
auto json = parseJSONMessage(currentHookLine, "the derivation builder");
if (json) {
auto s =
handleJSONLogMessage(*json, workerAct, hook.activities, "the derivation builder", true);
// ensure that logs from a builder using `ssh-ng://` as protocol
// are also available to `nix log`.
if (s && !isWrittenToLog && logSink) {
const auto type = (*json)["type"];
const auto fields = (*json)["fields"];
if (type == resBuildLogLine) {
(*logSink)((fields.size() > 0 ? fields[0].get<std::string>() : "") + "\n");
} else if (type == resSetPhase && !fields.is_null()) {
const auto phase = fields[0];
if (!phase.is_null()) {
// nixpkgs' stdenv produces lines in the log to signal
// phase changes.
// We want to get the same lines in case of remote builds.
// The format is:
// @nix { "action": "setPhase", "phase": "$curPhase" }
const auto logLine =
nlohmann::json::object({{"action", "setPhase"}, {"phase", phase}});
(*logSink)(
"@nix "
+ logLine.dump(-1, ' ', false, nlohmann::json::error_handler_t::replace)
+ "\n");
}
}
}
}
currentHookLine.clear();
} else
currentHookLine += c;
}
#endif
failed = false;
return false;
};
}

Goal::Co DerivationGoal::tryToBuild()
{
trace("trying to build");
Expand Down Expand Up @@ -654,26 +737,88 @@ Goal::Co DerivationGoal::tryToBuild()

if (!buildLocally) {
switch (tryBuildHook()) {
case rpAccept:
/* Yes, it has started doing so. Wait until we get
EOF from the hook. */
actLock.reset();
buildResult.startTime = time(0); // inexact
started();
co_await Suspend{};
co_return hookDone();
case rpPostpone:
/* Not now; wait until at least one child finishes or
the wake-up timeout expires. */
if (!actLock)
actLock = std::make_unique<Activity>(*logger, lvlWarn, actBuildWaiting,
fmt("waiting for a machine to build '%s'", Magenta(worker.store.printStorePath(drvPath))));
outputLocks.unlock();
co_await waitForAWhile();
co_return tryToBuild();
case rpDecline:
/* We should do it ourselves. */
break;
case rpAccept:
/* Yes, it has started doing so. Wait until we get
EOF from the hook. */
actLock.reset();
buildResult.startTime = time(0); // inexact
started();
#ifndef _WIN32 // TODO enable build hook on Windows
assert(hook);
#endif
{
bool failed = false;
assert(act);
trace("calling childStarted hook");
unsigned long logSize = 0;
std::string currentLogLine{};
size_t currentLogLinePos = 0;
std::string currentHookLine{};
std::list<std::string> logTail{};
auto handler = makeHookLogHandler(
logSink,
*act,
worker.act,
builderActivities,
failed,
#ifndef _WIN32 // TODO enable build hook on Windows
*hook,
#endif
logSize,
currentLogLine,
currentLogLinePos,
currentHookLine,
logTail
);
bool timedOut = false;
#ifndef _WIN32 // TODO enable build hook on Windows
co_await childStarted(
{ hook->fromHook.readSide.get()
, hook->builderOut.readSide.get()
}, false, false, std::move(handler), timedOut);
trace("calling childStarted hook end");
#endif

if (
!currentLogLine.empty() &&
!handleJSONLogMessage(currentLogLine, *act, builderActivities, "the derivation builder", false)
) {
act->result(resBuildLogLine, currentLogLine);
}

if (timedOut) {
#ifndef _WIN32 // TODO enable build hook on Windows
hook.reset();
#endif
co_return done(BuildResult::TimedOut, {}, std::move(ex));
} else if (failed) {
co_return done(
BuildResult::LogLimitExceeded,
{},
Error(
"%s killed after writing more than %d bytes of log output",
getName(),
settings.maxLogSize));
} else {
co_return hookDone(std::move(logTail));
}
}
case rpPostpone:
/* Not now; wait until at least one child finishes or
the wake-up timeout expires. */
if (!actLock)
actLock = std::make_unique<Activity>(
*logger,
lvlWarn,
actBuildWaiting,
fmt("waiting for a machine to build '%s'", Magenta(worker.store.printStorePath(drvPath))));
co_await waitForAWhile();
outputLocks.unlock();
co_await waitForAWhile();
co_return tryToBuild();
case rpDecline:
/* We should do it ourselves. */
break;
}
}

Expand Down Expand Up @@ -905,7 +1050,7 @@ void appendLogTailErrorMsg(
}


Goal::Co DerivationGoal::hookDone()
Goal::Co DerivationGoal::hookDone(std::list<std::string> logTail)
{
#ifndef _WIN32
assert(hook);
Expand All @@ -929,9 +1074,6 @@ Goal::Co DerivationGoal::hookDone()
buildResult.timesBuilt++;
buildResult.stopTime = time(0);

/* So the child is gone now. */
worker.childTerminated(this);

/* Close the read side of the logger pipe. */
#ifndef _WIN32 // TODO enable build hook on Windows
hook->builderOut.readSide.close();
Expand Down Expand Up @@ -1165,7 +1307,6 @@ HookReply DerivationGoal::tryBuildHook()
std::set<MuxablePipePollState::CommChannel> fds;
fds.insert(hook->fromHook.readSide.get());
fds.insert(hook->builderOut.readSide.get());
worker.childStarted(shared_from_this(), fds, false, false);

return rpAccept;
#endif
Expand All @@ -1174,8 +1315,6 @@ HookReply DerivationGoal::tryBuildHook()

Path DerivationGoal::openLogFile()
{
logSize = 0;

if (!settings.keepLog) return "";

auto baseName = std::string(baseNameOf(worker.store.printStorePath(drvPath)));
Expand Down Expand Up @@ -1219,111 +1358,6 @@ void DerivationGoal::closeLogFile()
fdLogFile.close();
}


bool DerivationGoal::isReadDesc(Descriptor fd)
{
#ifdef _WIN32 // TODO enable build hook on Windows
return false;
#else
return fd == hook->builderOut.readSide.get();
#endif
}

void DerivationGoal::handleChildOutput(Descriptor fd, std::string_view data)
{
// local & `ssh://`-builds are dealt with here.
auto isWrittenToLog = isReadDesc(fd);
if (isWrittenToLog)
{
logSize += data.size();
if (settings.maxLogSize && logSize > settings.maxLogSize) {
killChild();
// We're not inside a coroutine, hence we can't use co_return here.
// Thus we ignore the return value.
[[maybe_unused]] Done _ = done(
BuildResult::LogLimitExceeded, {},
Error("%s killed after writing more than %d bytes of log output",
getName(), settings.maxLogSize));
return;
}

for (auto c : data)
if (c == '\r')
currentLogLinePos = 0;
else if (c == '\n')
flushLine();
else {
if (currentLogLinePos >= currentLogLine.size())
currentLogLine.resize(currentLogLinePos + 1);
currentLogLine[currentLogLinePos++] = c;
}

if (logSink) (*logSink)(data);
}

#ifndef _WIN32 // TODO enable build hook on Windows
if (hook && fd == hook->fromHook.readSide.get()) {
for (auto c : data)
if (c == '\n') {
auto json = parseJSONMessage(currentHookLine, "the derivation builder");
if (json) {
auto s = handleJSONLogMessage(*json, worker.act, hook->activities, "the derivation builder", true);
// ensure that logs from a builder using `ssh-ng://` as protocol
// are also available to `nix log`.
if (s && !isWrittenToLog && logSink) {
const auto type = (*json)["type"];
const auto fields = (*json)["fields"];
if (type == resBuildLogLine) {
(*logSink)((fields.size() > 0 ? fields[0].get<std::string>() : "") + "\n");
} else if (type == resSetPhase && ! fields.is_null()) {
const auto phase = fields[0];
if (! phase.is_null()) {
// nixpkgs' stdenv produces lines in the log to signal
// phase changes.
// We want to get the same lines in case of remote builds.
// The format is:
// @nix { "action": "setPhase", "phase": "$curPhase" }
const auto logLine = nlohmann::json::object({
{"action", "setPhase"},
{"phase", phase}
});
(*logSink)("@nix " + logLine.dump(-1, ' ', false, nlohmann::json::error_handler_t::replace) + "\n");
}
}
}
}
currentHookLine.clear();
} else
currentHookLine += c;
}
#endif
}


void DerivationGoal::handleEOF(Descriptor fd)
{
if (!currentLogLine.empty()) flushLine();
worker.wakeUp(shared_from_this());
}


void DerivationGoal::flushLine()
{
if (handleJSONLogMessage(currentLogLine, *act, builderActivities, "the derivation builder", false))
;

else {
logTail.push_back(currentLogLine);
if (logTail.size() > settings.logLines) logTail.pop_front();

act->result(resBuildLogLine, currentLogLine);
}

currentLogLine = "";
currentLogLinePos = 0;
}


std::map<std::string, std::optional<StorePath>> DerivationGoal::queryPartialDerivationOutputMap()
{
assert(!drv->type().isImpure());
Expand Down
Loading