Skip to content

Commit 7e5138a

Browse files
L-asEricson2314
authored andcommitted
Don't expose handleChildOutput and handleEOF to Goal subclasses
Instead, you call `childStarted`, instead of `worker.childStarted`, which takes a lambda that handles the child output. The handler can return true to end it early. This commit breaks timeouts, but the next one fixes it.
1 parent 2172c17 commit 7e5138a

File tree

10 files changed

+337
-235
lines changed

10 files changed

+337
-235
lines changed

src/libstore/build/derivation-goal.cc

Lines changed: 176 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,105 @@ void DerivationGoal::started()
569569
worker.updateProgress();
570570
}
571571

572+
static std::function<bool(Descriptor, std::string_view)> hookHandler(
573+
std::shared_ptr<BufferedSink> & logSink,
574+
Activity & act,
575+
const Activity & workerAct,
576+
std::map<ActivityId, Activity> & builderActivities,
577+
bool & failed,
578+
#ifndef _WIN32 // TODO enable build hook on Windows
579+
HookInstance & hook,
580+
#endif
581+
unsigned long & logSize,
582+
std::string & currentLogLine,
583+
size_t & currentLogLinePos,
584+
std::string & currentHookLine,
585+
std::list<std::string> & logTail
586+
)
587+
{
588+
return [&](Descriptor fd, std::string_view data) {
589+
// local & `ssh://`-builds are dealt with here.
590+
bool isWrittenToLog =
591+
#ifdef _WIN32 // TODO enable build hook on Windows
592+
false;
593+
#else
594+
fd == hook.builderOut.readSide.get();
595+
#endif
596+
597+
if (isWrittenToLog) {
598+
logSize += data.size();
599+
if (settings.maxLogSize && logSize > settings.maxLogSize) {
600+
failed = true;
601+
return true; // break out and stop listening to children
602+
}
603+
604+
for (auto c : data)
605+
if (c == '\r')
606+
currentLogLinePos = 0;
607+
else if (c == '\n') {
608+
if (!handleJSONLogMessage(
609+
currentLogLine, act, builderActivities, "the derivation builder", false)) {
610+
act.result(resBuildLogLine, currentLogLine);
611+
logTail.push_back(std::move(currentLogLine));
612+
if (logTail.size() > settings.logLines)
613+
logTail.pop_front();
614+
}
615+
616+
currentLogLine = "";
617+
currentLogLinePos = 0;
618+
} else {
619+
if (currentLogLinePos >= currentLogLine.size())
620+
currentLogLine.resize(currentLogLinePos + 1);
621+
currentLogLine[currentLogLinePos++] = c;
622+
}
623+
624+
if (logSink)
625+
(*logSink)(data);
626+
}
627+
628+
#ifndef _WIN32 // TODO enable build hook on Windows
629+
if (fd == hook.fromHook.readSide.get()) {
630+
for (auto c : data)
631+
if (c == '\n') {
632+
auto json = parseJSONMessage(currentHookLine, "the derivation builder");
633+
if (json) {
634+
auto s =
635+
handleJSONLogMessage(*json, workerAct, hook.activities, "the derivation builder", true);
636+
// ensure that logs from a builder using `ssh-ng://` as protocol
637+
// are also available to `nix log`.
638+
if (s && !isWrittenToLog && logSink) {
639+
const auto type = (*json)["type"];
640+
const auto fields = (*json)["fields"];
641+
if (type == resBuildLogLine) {
642+
(*logSink)((fields.size() > 0 ? fields[0].get<std::string>() : "") + "\n");
643+
} else if (type == resSetPhase && !fields.is_null()) {
644+
const auto phase = fields[0];
645+
if (!phase.is_null()) {
646+
// nixpkgs' stdenv produces lines in the log to signal
647+
// phase changes.
648+
// We want to get the same lines in case of remote builds.
649+
// The format is:
650+
// @nix { "action": "setPhase", "phase": "$curPhase" }
651+
const auto logLine =
652+
nlohmann::json::object({{"action", "setPhase"}, {"phase", phase}});
653+
(*logSink)(
654+
"@nix "
655+
+ logLine.dump(-1, ' ', false, nlohmann::json::error_handler_t::replace)
656+
+ "\n");
657+
}
658+
}
659+
}
660+
}
661+
currentHookLine.clear();
662+
} else
663+
currentHookLine += c;
664+
}
665+
#endif
666+
failed = false;
667+
return false;
668+
};
669+
}
670+
572671
Goal::Co DerivationGoal::tryToBuild()
573672
{
574673
trace("trying to build");
@@ -646,27 +745,82 @@ Goal::Co DerivationGoal::tryToBuild()
646745

647746
if (!buildLocally) {
648747
switch (tryBuildHook()) {
649-
case rpAccept:
650-
/* Yes, it has started doing so. Wait until we get
651-
EOF from the hook. */
652-
actLock.reset();
653-
buildResult.startTime = time(0); // inexact
654-
started();
655-
co_await Suspend{};
656-
co_return hookDone();
657-
case rpPostpone:
658-
/* Not now; wait until at least one child finishes or
659-
the wake-up timeout expires. */
660-
if (!actLock)
661-
actLock = std::make_unique<Activity>(*logger, lvlWarn, actBuildWaiting,
662-
fmt("waiting for a machine to build '%s'", Magenta(worker.store.printStorePath(drvPath))));
663-
worker.waitForAWhile(shared_from_this());
664-
outputLocks.unlock();
665-
co_await Suspend{};
666-
co_return tryToBuild();
667-
case rpDecline:
668-
/* We should do it ourselves. */
669-
break;
748+
case rpAccept:
749+
/* Yes, it has started doing so. Wait until we get
750+
EOF from the hook. */
751+
actLock.reset();
752+
buildResult.startTime = time(0); // inexact
753+
started();
754+
#ifndef _WIN32 // TODO enable build hook on Windows
755+
assert(hook);
756+
#endif
757+
{
758+
bool failed = false;
759+
assert(act);
760+
trace("calling childStarted hook");
761+
unsigned long logSize = 0;
762+
std::string currentLogLine{};
763+
size_t currentLogLinePos = 0;
764+
std::string currentHookLine{};
765+
std::list<std::string> logTail{};
766+
auto handler = hookHandler(
767+
logSink,
768+
*act,
769+
worker.act,
770+
builderActivities,
771+
failed,
772+
#ifndef _WIN32 // TODO enable build hook on Windows
773+
*hook,
774+
#endif
775+
logSize,
776+
currentLogLine,
777+
currentLogLinePos,
778+
currentHookLine,
779+
logTail
780+
);
781+
#ifndef _WIN32 // TODO enable build hook on Windows
782+
co_await childStarted(
783+
{ hook->fromHook.readSide.get()
784+
, hook->builderOut.readSide.get()
785+
}, false, false, std::move(handler));
786+
trace("calling childStarted hook end");
787+
#endif
788+
789+
if (
790+
!currentLogLine.empty() &&
791+
!handleJSONLogMessage(currentLogLine, *act, builderActivities, "the derivation builder", false)
792+
) {
793+
act->result(resBuildLogLine, currentLogLine);
794+
}
795+
796+
if (failed) {
797+
co_return done(
798+
BuildResult::LogLimitExceeded,
799+
{},
800+
Error(
801+
"%s killed after writing more than %d bytes of log output",
802+
getName(),
803+
settings.maxLogSize));
804+
} else {
805+
co_return hookDone(std::move(logTail));
806+
}
807+
}
808+
case rpPostpone:
809+
/* Not now; wait until at least one child finishes or
810+
the wake-up timeout expires. */
811+
if (!actLock)
812+
actLock = std::make_unique<Activity>(
813+
*logger,
814+
lvlWarn,
815+
actBuildWaiting,
816+
fmt("waiting for a machine to build '%s'", Magenta(worker.store.printStorePath(drvPath))));
817+
co_await waitForAWhile();
818+
outputLocks.unlock();
819+
co_await waitForAWhile();
820+
co_return tryToBuild();
821+
case rpDecline:
822+
/* We should do it ourselves. */
823+
break;
670824
}
671825
}
672826

@@ -898,7 +1052,7 @@ void appendLogTailErrorMsg(
8981052
}
8991053

9001054

901-
Goal::Co DerivationGoal::hookDone()
1055+
Goal::Co DerivationGoal::hookDone(std::list<std::string> logTail)
9021056
{
9031057
#ifndef _WIN32
9041058
assert(hook);
@@ -922,9 +1076,6 @@ Goal::Co DerivationGoal::hookDone()
9221076
buildResult.timesBuilt++;
9231077
buildResult.stopTime = time(0);
9241078

925-
/* So the child is gone now. */
926-
worker.childTerminated(this);
927-
9281079
/* Close the read side of the logger pipe. */
9291080
#ifndef _WIN32 // TODO enable build hook on Windows
9301081
hook->builderOut.readSide.close();
@@ -1158,7 +1309,6 @@ HookReply DerivationGoal::tryBuildHook()
11581309
std::set<MuxablePipePollState::CommChannel> fds;
11591310
fds.insert(hook->fromHook.readSide.get());
11601311
fds.insert(hook->builderOut.readSide.get());
1161-
worker.childStarted(shared_from_this(), fds, false, false);
11621312

11631313
return rpAccept;
11641314
#endif
@@ -1167,8 +1317,6 @@ HookReply DerivationGoal::tryBuildHook()
11671317

11681318
Path DerivationGoal::openLogFile()
11691319
{
1170-
logSize = 0;
1171-
11721320
if (!settings.keepLog) return "";
11731321

11741322
auto baseName = std::string(baseNameOf(worker.store.printStorePath(drvPath)));
@@ -1212,111 +1360,6 @@ void DerivationGoal::closeLogFile()
12121360
fdLogFile.close();
12131361
}
12141362

1215-
1216-
bool DerivationGoal::isReadDesc(Descriptor fd)
1217-
{
1218-
#ifdef _WIN32 // TODO enable build hook on Windows
1219-
return false;
1220-
#else
1221-
return fd == hook->builderOut.readSide.get();
1222-
#endif
1223-
}
1224-
1225-
void DerivationGoal::handleChildOutput(Descriptor fd, std::string_view data)
1226-
{
1227-
// local & `ssh://`-builds are dealt with here.
1228-
auto isWrittenToLog = isReadDesc(fd);
1229-
if (isWrittenToLog)
1230-
{
1231-
logSize += data.size();
1232-
if (settings.maxLogSize && logSize > settings.maxLogSize) {
1233-
killChild();
1234-
// We're not inside a coroutine, hence we can't use co_return here.
1235-
// Thus we ignore the return value.
1236-
[[maybe_unused]] Done _ = done(
1237-
BuildResult::LogLimitExceeded, {},
1238-
Error("%s killed after writing more than %d bytes of log output",
1239-
getName(), settings.maxLogSize));
1240-
return;
1241-
}
1242-
1243-
for (auto c : data)
1244-
if (c == '\r')
1245-
currentLogLinePos = 0;
1246-
else if (c == '\n')
1247-
flushLine();
1248-
else {
1249-
if (currentLogLinePos >= currentLogLine.size())
1250-
currentLogLine.resize(currentLogLinePos + 1);
1251-
currentLogLine[currentLogLinePos++] = c;
1252-
}
1253-
1254-
if (logSink) (*logSink)(data);
1255-
}
1256-
1257-
#ifndef _WIN32 // TODO enable build hook on Windows
1258-
if (hook && fd == hook->fromHook.readSide.get()) {
1259-
for (auto c : data)
1260-
if (c == '\n') {
1261-
auto json = parseJSONMessage(currentHookLine, "the derivation builder");
1262-
if (json) {
1263-
auto s = handleJSONLogMessage(*json, worker.act, hook->activities, "the derivation builder", true);
1264-
// ensure that logs from a builder using `ssh-ng://` as protocol
1265-
// are also available to `nix log`.
1266-
if (s && !isWrittenToLog && logSink) {
1267-
const auto type = (*json)["type"];
1268-
const auto fields = (*json)["fields"];
1269-
if (type == resBuildLogLine) {
1270-
(*logSink)((fields.size() > 0 ? fields[0].get<std::string>() : "") + "\n");
1271-
} else if (type == resSetPhase && ! fields.is_null()) {
1272-
const auto phase = fields[0];
1273-
if (! phase.is_null()) {
1274-
// nixpkgs' stdenv produces lines in the log to signal
1275-
// phase changes.
1276-
// We want to get the same lines in case of remote builds.
1277-
// The format is:
1278-
// @nix { "action": "setPhase", "phase": "$curPhase" }
1279-
const auto logLine = nlohmann::json::object({
1280-
{"action", "setPhase"},
1281-
{"phase", phase}
1282-
});
1283-
(*logSink)("@nix " + logLine.dump(-1, ' ', false, nlohmann::json::error_handler_t::replace) + "\n");
1284-
}
1285-
}
1286-
}
1287-
}
1288-
currentHookLine.clear();
1289-
} else
1290-
currentHookLine += c;
1291-
}
1292-
#endif
1293-
}
1294-
1295-
1296-
void DerivationGoal::handleEOF(Descriptor fd)
1297-
{
1298-
if (!currentLogLine.empty()) flushLine();
1299-
worker.wakeUp(shared_from_this());
1300-
}
1301-
1302-
1303-
void DerivationGoal::flushLine()
1304-
{
1305-
if (handleJSONLogMessage(currentLogLine, *act, builderActivities, "the derivation builder", false))
1306-
;
1307-
1308-
else {
1309-
logTail.push_back(currentLogLine);
1310-
if (logTail.size() > settings.logLines) logTail.pop_front();
1311-
1312-
act->result(resBuildLogLine, currentLogLine);
1313-
}
1314-
1315-
currentLogLine = "";
1316-
currentLogLinePos = 0;
1317-
}
1318-
1319-
13201363
std::map<std::string, std::optional<StorePath>> DerivationGoal::queryPartialDerivationOutputMap()
13211364
{
13221365
assert(!drv->type().isImpure());

0 commit comments

Comments
 (0)