Skip to content

Commit b781ec4

Browse files
committed
Auto merge of #8247 - ehuss:output-hang, r=alexcrichton
Gracefully handle errors during a build. If there are certain errors like EPIPE during a build, Cargo runs the risk of hanging if the compiler emits too many messages. This happens because Cargo now uses a bounded queue for compiler messages. However, if the main loop exits while the threads are running, there is nothing to drain the queue, and thus the threads will block indefinitely. The solution here is to be extra careful of how errors are handled in the main loop. All errors are now treated roughly the same (report the error, allow the queue to continue to drain). I've also tweaked things so the *first* error is reported, not the last. Closes #8245
2 parents fcf2236 + 5e561ae commit b781ec4

File tree

3 files changed

+185
-37
lines changed

3 files changed

+185
-37
lines changed

src/cargo/core/compiler/job_queue.rs

Lines changed: 69 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ use super::job::{
6969
};
7070
use super::timings::Timings;
7171
use super::{BuildContext, BuildPlan, CompileMode, Context, Unit};
72-
use crate::core::{PackageId, TargetKind};
72+
use crate::core::{PackageId, Shell, TargetKind};
7373
use crate::util::diagnostic_server::{self, DiagnosticPrinter};
7474
use crate::util::machine_message::{self, Message as _};
7575
use crate::util::{self, internal, profile};
@@ -401,8 +401,13 @@ impl<'cfg> JobQueue<'cfg> {
401401
.take()
402402
.map(move |srv| srv.start(move |msg| messages.push(Message::FixDiagnostic(msg))));
403403

404-
crossbeam_utils::thread::scope(move |scope| state.drain_the_queue(cx, plan, scope, &helper))
405-
.expect("child threads shouldn't panic")
404+
crossbeam_utils::thread::scope(move |scope| {
405+
match state.drain_the_queue(cx, plan, scope, &helper) {
406+
Some(err) => Err(err),
407+
None => Ok(()),
408+
}
409+
})
410+
.expect("child threads shouldn't panic")
406411
}
407412
}
408413

@@ -412,7 +417,6 @@ impl<'cfg> DrainState<'cfg> {
412417
cx: &mut Context<'_, '_>,
413418
jobserver_helper: &HelperThread,
414419
scope: &Scope<'_>,
415-
has_errored: bool,
416420
) -> CargoResult<()> {
417421
// Dequeue as much work as we can, learning about everything
418422
// possible that can run. Note that this is also the point where we
@@ -425,11 +429,6 @@ impl<'cfg> DrainState<'cfg> {
425429
}
426430
}
427431

428-
// Do not actually spawn the new work if we've errored out
429-
if has_errored {
430-
return Ok(());
431-
}
432-
433432
// Now that we've learned of all possible work that we can execute
434433
// try to spawn it so long as we've got a jobserver token which says
435434
// we're able to perform some parallel work.
@@ -487,7 +486,7 @@ impl<'cfg> DrainState<'cfg> {
487486
jobserver_helper: &HelperThread,
488487
plan: &mut BuildPlan,
489488
event: Message,
490-
) -> CargoResult<Option<anyhow::Error>> {
489+
) -> CargoResult<()> {
491490
match event {
492491
Message::Run(id, cmd) => {
493492
cx.bcx
@@ -545,17 +544,7 @@ impl<'cfg> DrainState<'cfg> {
545544
Err(e) => {
546545
let msg = "The following warnings were emitted during compilation:";
547546
self.emit_warnings(Some(msg), &unit, cx)?;
548-
549-
if !self.active.is_empty() {
550-
crate::display_error(&e, &mut *cx.bcx.config.shell());
551-
cx.bcx.config.shell().warn(
552-
"build failed, waiting for other \
553-
jobs to finish...",
554-
)?;
555-
return Ok(Some(anyhow::format_err!("build failed")));
556-
} else {
557-
return Ok(Some(e));
558-
}
547+
return Err(e);
559548
}
560549
}
561550
}
@@ -590,7 +579,7 @@ impl<'cfg> DrainState<'cfg> {
590579
}
591580
}
592581

593-
Ok(None)
582+
Ok(())
594583
}
595584

596585
// This will also tick the progress bar as appropriate
@@ -631,13 +620,18 @@ impl<'cfg> DrainState<'cfg> {
631620
events
632621
}
633622

623+
/// This is the "main" loop, where Cargo does all work to run the
624+
/// compiler.
625+
///
626+
/// This returns an Option to prevent the use of `?` on `Result` types
627+
/// because it is important for the loop to carefully handle errors.
634628
fn drain_the_queue(
635629
mut self,
636630
cx: &mut Context<'_, '_>,
637631
plan: &mut BuildPlan,
638632
scope: &Scope<'_>,
639633
jobserver_helper: &HelperThread,
640-
) -> CargoResult<()> {
634+
) -> Option<anyhow::Error> {
641635
trace!("queue: {:#?}", self.queue);
642636

643637
// Iteratively execute the entire dependency graph. Each turn of the
@@ -651,25 +645,34 @@ impl<'cfg> DrainState<'cfg> {
651645
// successful and otherwise wait for pending work to finish if it failed
652646
// and then immediately return.
653647
let mut error = None;
648+
// CAUTION! Do not use `?` or break out of the loop early. Every error
649+
// must be handled in such a way that the loop is still allowed to
650+
// drain event messages.
654651
loop {
655-
self.spawn_work_if_possible(cx, jobserver_helper, scope, error.is_some())?;
652+
if error.is_none() {
653+
if let Err(e) = self.spawn_work_if_possible(cx, jobserver_helper, scope) {
654+
self.handle_error(&mut cx.bcx.config.shell(), &mut error, e);
655+
}
656+
}
656657

657658
// If after all that we're not actually running anything then we're
658659
// done!
659660
if self.active.is_empty() {
660661
break;
661662
}
662663

663-
self.grant_rustc_token_requests()?;
664+
if let Err(e) = self.grant_rustc_token_requests() {
665+
self.handle_error(&mut cx.bcx.config.shell(), &mut error, e);
666+
}
664667

665668
// And finally, before we block waiting for the next event, drop any
666669
// excess tokens we may have accidentally acquired. Due to how our
667670
// jobserver interface is architected we may acquire a token that we
668671
// don't actually use, and if this happens just relinquish it back
669672
// to the jobserver itself.
670673
for event in self.wait_for_events() {
671-
if let Some(err) = self.handle_event(cx, jobserver_helper, plan, event)? {
672-
error = Some(err);
674+
if let Err(event_err) = self.handle_event(cx, jobserver_helper, plan, event) {
675+
self.handle_error(&mut cx.bcx.config.shell(), &mut error, event_err);
673676
}
674677
}
675678
}
@@ -694,29 +697,62 @@ impl<'cfg> DrainState<'cfg> {
694697
}
695698

696699
let time_elapsed = util::elapsed(cx.bcx.config.creation_time().elapsed());
697-
self.timings.finished(cx.bcx, &error)?;
700+
if let Err(e) = self.timings.finished(cx.bcx, &error) {
701+
if error.is_some() {
702+
crate::display_error(&e, &mut cx.bcx.config.shell());
703+
} else {
704+
return Some(e);
705+
}
706+
}
698707
if cx.bcx.build_config.emit_json() {
699708
let msg = machine_message::BuildFinished {
700709
success: error.is_none(),
701710
}
702711
.to_json_string();
703-
writeln!(cx.bcx.config.shell().out(), "{}", msg)?;
712+
if let Err(e) = writeln!(cx.bcx.config.shell().out(), "{}", msg) {
713+
if error.is_some() {
714+
crate::display_error(&e.into(), &mut cx.bcx.config.shell());
715+
} else {
716+
return Some(e.into());
717+
}
718+
}
704719
}
705720

706721
if let Some(e) = error {
707-
Err(e)
722+
Some(e)
708723
} else if self.queue.is_empty() && self.pending_queue.is_empty() {
709724
let message = format!(
710725
"{} [{}] target(s) in {}",
711726
profile_name, opt_type, time_elapsed
712727
);
713728
if !cx.bcx.build_config.build_plan {
714-
cx.bcx.config.shell().status("Finished", message)?;
729+
// It doesn't really matter if this fails.
730+
drop(cx.bcx.config.shell().status("Finished", message));
715731
}
716-
Ok(())
732+
None
717733
} else {
718734
debug!("queue: {:#?}", self.queue);
719-
Err(internal("finished with jobs still left in the queue"))
735+
Some(internal("finished with jobs still left in the queue"))
736+
}
737+
}
738+
739+
fn handle_error(
740+
&self,
741+
shell: &mut Shell,
742+
err_state: &mut Option<anyhow::Error>,
743+
new_err: anyhow::Error,
744+
) {
745+
if err_state.is_some() {
746+
// Already encountered one error.
747+
log::warn!("{:?}", new_err);
748+
} else {
749+
if !self.active.is_empty() {
750+
crate::display_error(&new_err, shell);
751+
drop(shell.warn("build failed, waiting for other jobs to finish..."));
752+
*err_state = Some(anyhow::format_err!("build failed"));
753+
} else {
754+
*err_state = Some(new_err);
755+
}
720756
}
721757
}
722758

src/cargo/core/compiler/timings.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crate::core::compiler::BuildContext;
88
use crate::core::PackageId;
99
use crate::util::cpu::State;
1010
use crate::util::machine_message::{self, Message};
11-
use crate::util::{paths, CargoResult, Config};
11+
use crate::util::{paths, CargoResult, CargoResultExt, Config};
1212
use std::collections::HashMap;
1313
use std::io::{BufWriter, Write};
1414
use std::time::{Duration, Instant, SystemTime};
@@ -322,7 +322,8 @@ impl<'cfg> Timings<'cfg> {
322322
self.unit_times
323323
.sort_unstable_by(|a, b| a.start.partial_cmp(&b.start).unwrap());
324324
if self.report_html {
325-
self.report_html(bcx, error)?;
325+
self.report_html(bcx, error)
326+
.chain_err(|| "failed to save timing report")?;
326327
}
327328
Ok(())
328329
}

tests/testsuite/build.rs

Lines changed: 113 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@ use cargo::util::paths::dylib_path_envvar;
44
use cargo_test_support::paths::{root, CargoPathExt};
55
use cargo_test_support::registry::Package;
66
use cargo_test_support::{
7-
basic_bin_manifest, basic_lib_manifest, basic_manifest, main_file, project, rustc_host,
8-
sleep_ms, symlink_supported, t, Execs, ProjectBuilder,
7+
basic_bin_manifest, basic_lib_manifest, basic_manifest, lines_match, main_file, project,
8+
rustc_host, sleep_ms, symlink_supported, t, Execs, ProjectBuilder,
99
};
1010
use std::env;
1111
use std::fs;
12+
use std::io::Read;
13+
use std::process::Stdio;
1214

1315
#[cargo_test]
1416
fn cargo_compile_simple() {
@@ -4818,3 +4820,112 @@ fn user_specific_cfgs_are_filtered_out() {
48184820
.run();
48194821
p.process(&p.bin("foo")).run();
48204822
}
4823+
4824+
#[cargo_test]
4825+
fn close_output() {
4826+
// What happens when stdout or stderr is closed during a build.
4827+
4828+
// Server to know when rustc has spawned.
4829+
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
4830+
let addr = listener.local_addr().unwrap();
4831+
4832+
let p = project()
4833+
.file(
4834+
"Cargo.toml",
4835+
r#"
4836+
[package]
4837+
name = "foo"
4838+
version = "0.1.0"
4839+
edition = "2018"
4840+
4841+
[lib]
4842+
proc-macro = true
4843+
"#,
4844+
)
4845+
.file(
4846+
"src/lib.rs",
4847+
&r#"
4848+
use proc_macro::TokenStream;
4849+
use std::io::Read;
4850+
4851+
#[proc_macro]
4852+
pub fn repro(_input: TokenStream) -> TokenStream {
4853+
println!("hello stdout!");
4854+
eprintln!("hello stderr!");
4855+
// Tell the test we have started.
4856+
let mut socket = std::net::TcpStream::connect("__ADDR__").unwrap();
4857+
// Wait for the test to tell us to start printing.
4858+
let mut buf = [0];
4859+
drop(socket.read_exact(&mut buf));
4860+
let use_stderr = std::env::var("__CARGO_REPRO_STDERR").is_ok();
4861+
for i in 0..10000 {
4862+
if use_stderr {
4863+
eprintln!("{}", i);
4864+
} else {
4865+
println!("{}", i);
4866+
}
4867+
std::thread::sleep(std::time::Duration::new(0, 1));
4868+
}
4869+
TokenStream::new()
4870+
}
4871+
"#
4872+
.replace("__ADDR__", &addr.to_string()),
4873+
)
4874+
.file(
4875+
"src/main.rs",
4876+
r#"
4877+
foo::repro!();
4878+
4879+
fn main() {}
4880+
"#,
4881+
)
4882+
.build();
4883+
4884+
// The `stderr` flag here indicates if this should forcefully close stderr or stdout.
4885+
let spawn = |stderr: bool| {
4886+
let mut cmd = p.cargo("build").build_command();
4887+
cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
4888+
if stderr {
4889+
cmd.env("__CARGO_REPRO_STDERR", "1");
4890+
}
4891+
let mut child = cmd.spawn().unwrap();
4892+
// Wait for proc macro to start.
4893+
let pm_conn = listener.accept().unwrap().0;
4894+
// Close stderr or stdout.
4895+
if stderr {
4896+
drop(child.stderr.take());
4897+
} else {
4898+
drop(child.stdout.take());
4899+
}
4900+
// Tell the proc-macro to continue;
4901+
drop(pm_conn);
4902+
// Read the output from the other channel.
4903+
let out: &mut dyn Read = if stderr {
4904+
child.stdout.as_mut().unwrap()
4905+
} else {
4906+
child.stderr.as_mut().unwrap()
4907+
};
4908+
let mut result = String::new();
4909+
out.read_to_string(&mut result).unwrap();
4910+
let status = child.wait().unwrap();
4911+
assert!(!status.success());
4912+
result
4913+
};
4914+
4915+
let stderr = spawn(false);
4916+
lines_match(
4917+
"\
4918+
[COMPILING] foo [..]
4919+
hello stderr!
4920+
[ERROR] [..]
4921+
[WARNING] build failed, waiting for other jobs to finish...
4922+
[ERROR] build failed
4923+
",
4924+
&stderr,
4925+
);
4926+
4927+
// Try again with stderr.
4928+
p.build_dir().rm_rf();
4929+
let stdout = spawn(true);
4930+
lines_match("hello_stdout!", &stdout);
4931+
}

0 commit comments

Comments
 (0)