Skip to content

Commit 9580786

Browse files
committed
Auto merge of #11764 - weihanglo:jobserver-cleanup, r=epage
Jobserver cleanup
2 parents e472ccc + d8fb0db commit 9580786

File tree

6 files changed

+33
-245
lines changed

6 files changed

+33
-245
lines changed

src/cargo/core/compiler/context/mod.rs

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,6 @@ pub struct Context<'a, 'cfg> {
7171
/// metadata files in addition to the rlib itself.
7272
rmeta_required: HashSet<Unit>,
7373

74-
/// When we're in jobserver-per-rustc process mode, this keeps those
75-
/// jobserver clients for each Unit (which eventually becomes a rustc
76-
/// process).
77-
pub rustc_clients: HashMap<Unit, Client>,
78-
7974
/// Map of the LTO-status of each unit. This indicates what sort of
8075
/// compilation is happening (only object, only bitcode, both, etc), and is
8176
/// precalculated early on.
@@ -124,7 +119,6 @@ impl<'a, 'cfg> Context<'a, 'cfg> {
124119
primary_packages: HashSet::new(),
125120
files: None,
126121
rmeta_required: HashSet::new(),
127-
rustc_clients: HashMap::new(),
128122
lto: HashMap::new(),
129123
metadata_for_doc_units: HashMap::new(),
130124
failed_scrape_units: Arc::new(Mutex::new(HashSet::new())),
@@ -614,24 +608,6 @@ impl<'a, 'cfg> Context<'a, 'cfg> {
614608
self.rmeta_required.contains(unit)
615609
}
616610

617-
/// Used by `-Zjobserver-per-rustc`.
618-
pub fn new_jobserver(&mut self) -> CargoResult<Client> {
619-
let tokens = self.bcx.jobs() as usize;
620-
let client = Client::new(tokens).with_context(|| "failed to create jobserver")?;
621-
622-
// Drain the client fully
623-
for i in 0..tokens {
624-
client.acquire_raw().with_context(|| {
625-
format!(
626-
"failed to fully drain {}/{} token from jobserver at startup",
627-
i, tokens,
628-
)
629-
})?;
630-
}
631-
632-
Ok(client)
633-
}
634-
635611
/// Finds metadata for Doc/Docscrape units.
636612
///
637613
/// rustdoc needs a -Cmetadata flag in order to recognize StableCrateIds that refer to

src/cargo/core/compiler/job_queue/job_state.rs

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -194,20 +194,4 @@ impl<'a, 'cfg> JobState<'a, 'cfg> {
194194
self.messages
195195
.push(Message::FutureIncompatReport(self.id, report));
196196
}
197-
198-
/// The rustc underlying this Job is about to acquire a jobserver token (i.e., block)
199-
/// on the passed client.
200-
///
201-
/// This should arrange for the associated client to eventually get a token via
202-
/// `client.release_raw()`.
203-
pub fn will_acquire(&self) {
204-
self.messages.push(Message::NeedsToken(self.id));
205-
}
206-
207-
/// The rustc underlying this Job is informing us that it is done with a jobserver token.
208-
///
209-
/// Note that it does *not* write that token back anywhere.
210-
pub fn release_token(&self) {
211-
self.messages.push(Message::ReleaseToken(self.id));
212-
}
213197
}

src/cargo/core/compiler/job_queue/mod.rs

Lines changed: 30 additions & 147 deletions
Original file line numberDiff line numberDiff line change
@@ -30,46 +30,30 @@
3030
//!
3131
//! ## Jobserver
3232
//!
33-
//! Cargo and rustc have a somewhat non-trivial jobserver relationship with each
34-
//! other, which is due to scaling issues with sharing a single jobserver
35-
//! amongst what is potentially hundreds of threads of work on many-cored
36-
//! systems on (at least) Linux, and likely other platforms as well.
33+
//! As of Feb. 2023, Cargo and rustc have a relatively simple jobserver
34+
//! relationship with each other. They share a single jobserver amongst what
35+
//! is potentially hundreds of threads of work on many-cored systems.
36+
//! The jobserver could come from either the environment (e.g., from a `make`
37+
//! invocation), or from Cargo creating its own jobserver server if there is no
38+
//! jobserver to inherit from.
3739
//!
3840
//! Cargo wants to complete the build as quickly as possible, fully saturating
39-
//! all cores (as constrained by the -j=N) parameter. Cargo also must not spawn
41+
//! all cores (as constrained by the `-j=N`) parameter. Cargo also must not spawn
4042
//! more than N threads of work: the total amount of tokens we have floating
4143
//! around must always be limited to N.
4244
//!
43-
//! It is not really possible to optimally choose which crate should build first
44-
//! or last; nor is it possible to decide whether to give an additional token to
45-
//! rustc first or rather spawn a new crate of work. For now, the algorithm we
46-
//! implement prioritizes spawning as many crates (i.e., rustc processes) as
47-
//! possible, and then filling each rustc with tokens on demand.
45+
//! It is not really possible to optimally choose which crate should build
46+
//! first or last; nor is it possible to decide whether to give an additional
47+
//! token to rustc first or rather spawn a new crate of work. The algorithm in
48+
//! Cargo prioritizes spawning as many crates (i.e., rustc processes) as
49+
//! possible. In short, the jobserver relationship among Cargo and rustc
50+
//! processes is **1 `cargo` to N `rustc`**. Cargo knows nothing beyond rustc
51+
//! processes in terms of parallelism[^parallel-rustc].
4852
//!
49-
//! We integrate with the [jobserver], originating from GNU make, to make sure
50-
//! that build scripts which use make to build C code can cooperate with us on
51-
//! the number of used tokens and avoid overfilling the system we're on.
52-
//!
53-
//! The jobserver is unfortunately a very simple protocol, so we enhance it a
54-
//! little when we know that there is a rustc on the other end. Via the stderr
55-
//! pipe we have to rustc, we get messages such as `NeedsToken` and
56-
//! `ReleaseToken` from rustc.
57-
//!
58-
//! [`NeedsToken`] indicates that a rustc is interested in acquiring a token,
59-
//! but never that it would be impossible to make progress without one (i.e.,
60-
//! it would be incorrect for rustc to not terminate due to an unfulfilled
61-
//! `NeedsToken` request); we do not usually fulfill all `NeedsToken` requests for a
62-
//! given rustc.
63-
//!
64-
//! [`ReleaseToken`] indicates that a rustc is done with one of its tokens and
65-
//! is ready for us to re-acquire ownership — we will either release that token
66-
//! back into the general pool or reuse it ourselves. Note that rustc will
67-
//! inform us that it is releasing a token even if it itself is also requesting
68-
//! tokens; is up to us whether to return the token to that same rustc.
69-
//!
70-
//! `jobserver` also manages the allocation of tokens to rustc beyond
71-
//! the implicit token each rustc owns (i.e., the ones used for parallel LLVM
72-
//! work and parallel rustc threads).
53+
//! We integrate with the [jobserver] crate, originating from GNU make
54+
//! [POSIX jobserver], to make sure that build scripts which use make to
55+
//! build C code can cooperate with us on the number of used tokens and
56+
//! avoid overfilling the system we're on.
7357
//!
7458
//! ## Scheduling
7559
//!
@@ -113,17 +97,24 @@
11397
//!
11498
//! See [`Message`] for all available message kinds.
11599
//!
100+
//! [^parallel-rustc]: In fact, `jobserver` that Cargo uses also manages the
101+
//! allocation of tokens to rustc beyond the implicit token each rustc owns
102+
//! (i.e., the ones used for parallel LLVM work and parallel rustc threads).
103+
//! See also ["Rust Compiler Development Guide: Parallel Compilation"]
104+
//! and [this comment][rustc-codegen] in rust-lang/rust.
105+
//!
106+
//! ["Rust Compiler Development Guide: Parallel Compilation"]: https://rustc-dev-guide.rust-lang.org/parallel-rustc.html
107+
//! [rustc-codegen]: https://github.com/rust-lang/rust/blob/5423745db8b434fcde54888b35f518f00cce00e4/compiler/rustc_codegen_ssa/src/back/write.rs#L1204-L1217
116108
//! [jobserver]: https://docs.rs/jobserver
117-
//! [`NeedsToken`]: Message::NeedsToken
118-
//! [`ReleaseToken`]: Message::ReleaseToken
109+
//! [POSIX jobserver]: https://www.gnu.org/software/make/manual/html_node/POSIX-Jobserver.html
119110
//! [`push`]: Queue::push
120111
//! [`push_bounded`]: Queue::push_bounded
121112
122113
mod job;
123114
mod job_state;
124115

125116
use std::cell::RefCell;
126-
use std::collections::{BTreeMap, HashMap, HashSet};
117+
use std::collections::{HashMap, HashSet};
127118
use std::fmt::Write as _;
128119
use std::io;
129120
use std::path::{Path, PathBuf};
@@ -133,7 +124,7 @@ use std::time::Duration;
133124

134125
use anyhow::{format_err, Context as _};
135126
use cargo_util::ProcessBuilder;
136-
use jobserver::{Acquired, Client, HelperThread};
127+
use jobserver::{Acquired, HelperThread};
137128
use log::{debug, trace};
138129
use semver::Version;
139130

@@ -199,13 +190,6 @@ struct DrainState<'cfg> {
199190
/// single rustc process.
200191
tokens: Vec<Acquired>,
201192

202-
/// rustc per-thread tokens, when in jobserver-per-rustc mode.
203-
rustc_tokens: HashMap<JobId, Vec<Acquired>>,
204-
205-
/// This represents the list of rustc jobs (processes) and associated
206-
/// clients that are interested in receiving a token.
207-
to_send_clients: BTreeMap<JobId, Vec<Client>>,
208-
209193
/// The list of jobs that we have not yet started executing, but have
210194
/// retrieved from the `queue`. We eagerly pull jobs off the main queue to
211195
/// allow us to request jobserver tokens pretty early.
@@ -387,12 +371,6 @@ enum Message {
387371
Token(io::Result<Acquired>),
388372
Finish(JobId, Artifact, CargoResult<()>),
389373
FutureIncompatReport(JobId, Vec<FutureBreakageItem>),
390-
391-
// This client should get release_raw called on it with one of our tokens
392-
NeedsToken(JobId),
393-
394-
// A token previously passed to a NeedsToken client is being released.
395-
ReleaseToken(JobId),
396374
}
397375

398376
impl<'cfg> JobQueue<'cfg> {
@@ -507,8 +485,6 @@ impl<'cfg> JobQueue<'cfg> {
507485
next_id: 0,
508486
timings: self.timings,
509487
tokens: Vec::new(),
510-
rustc_tokens: HashMap::new(),
511-
to_send_clients: BTreeMap::new(),
512488
pending_queue: Vec::new(),
513489
print: DiagnosticPrinter::new(cx.bcx.config),
514490
finished: 0,
@@ -600,46 +576,9 @@ impl<'cfg> DrainState<'cfg> {
600576
self.active.len() < self.tokens.len() + 1
601577
}
602578

603-
// The oldest job (i.e., least job ID) is the one we grant tokens to first.
604-
fn pop_waiting_client(&mut self) -> (JobId, Client) {
605-
// FIXME: replace this with BTreeMap::first_entry when that stabilizes.
606-
let key = *self
607-
.to_send_clients
608-
.keys()
609-
.next()
610-
.expect("at least one waiter");
611-
let clients = self.to_send_clients.get_mut(&key).unwrap();
612-
let client = clients.pop().unwrap();
613-
if clients.is_empty() {
614-
self.to_send_clients.remove(&key);
615-
}
616-
(key, client)
617-
}
618-
619-
// If we managed to acquire some extra tokens, send them off to a waiting rustc.
620-
fn grant_rustc_token_requests(&mut self) -> CargoResult<()> {
621-
while !self.to_send_clients.is_empty() && self.has_extra_tokens() {
622-
let (id, client) = self.pop_waiting_client();
623-
// This unwrap is guaranteed to succeed. `active` must be at least
624-
// length 1, as otherwise there can't be a client waiting to be sent
625-
// on, so tokens.len() must also be at least one.
626-
let token = self.tokens.pop().unwrap();
627-
self.rustc_tokens
628-
.entry(id)
629-
.or_insert_with(Vec::new)
630-
.push(token);
631-
client
632-
.release_raw()
633-
.with_context(|| "failed to release jobserver token")?;
634-
}
635-
636-
Ok(())
637-
}
638-
639579
fn handle_event(
640580
&mut self,
641581
cx: &mut Context<'_, '_>,
642-
jobserver_helper: &HelperThread,
643582
plan: &mut BuildPlan,
644583
event: Message,
645584
) -> Result<(), ErrorToHandle> {
@@ -699,19 +638,6 @@ impl<'cfg> DrainState<'cfg> {
699638
Artifact::All => {
700639
trace!("end: {:?}", id);
701640
self.finished += 1;
702-
if let Some(rustc_tokens) = self.rustc_tokens.remove(&id) {
703-
// This puts back the tokens that this rustc
704-
// acquired into our primary token list.
705-
//
706-
// This represents a rustc bug: it did not
707-
// release all of its thread tokens but finished
708-
// completely. But we want to make Cargo resilient
709-
// to such rustc bugs, as they're generally not
710-
// fatal in nature (i.e., Cargo can make progress
711-
// still, and the build might not even fail).
712-
self.tokens.extend(rustc_tokens);
713-
}
714-
self.to_send_clients.remove(&id);
715641
self.report_warning_count(
716642
cx.bcx.config,
717643
id,
@@ -756,31 +682,6 @@ impl<'cfg> DrainState<'cfg> {
756682
let token = acquired_token.with_context(|| "failed to acquire jobserver token")?;
757683
self.tokens.push(token);
758684
}
759-
Message::NeedsToken(id) => {
760-
trace!("queue token request");
761-
jobserver_helper.request_token();
762-
let client = cx.rustc_clients[&self.active[&id]].clone();
763-
self.to_send_clients
764-
.entry(id)
765-
.or_insert_with(Vec::new)
766-
.push(client);
767-
}
768-
Message::ReleaseToken(id) => {
769-
// Note that this pops off potentially a completely
770-
// different token, but all tokens of the same job are
771-
// conceptually the same so that's fine.
772-
//
773-
// self.tokens is a "pool" -- the order doesn't matter -- and
774-
// this transfers ownership of the token into that pool. If we
775-
// end up using it on the next go around, then this token will
776-
// be truncated, same as tokens obtained through Message::Token.
777-
let rustc_tokens = self
778-
.rustc_tokens
779-
.get_mut(&id)
780-
.expect("no tokens associated");
781-
self.tokens
782-
.push(rustc_tokens.pop().expect("rustc releases token it has"));
783-
}
784685
}
785686

786687
Ok(())
@@ -795,19 +696,6 @@ impl<'cfg> DrainState<'cfg> {
795696
// listen for a message with a timeout, and on timeout we run the
796697
// previous parts of the loop again.
797698
let mut events = self.messages.try_pop_all();
798-
trace!(
799-
"tokens in use: {}, rustc_tokens: {:?}, waiting_rustcs: {:?} (events this tick: {})",
800-
self.tokens.len(),
801-
self.rustc_tokens
802-
.iter()
803-
.map(|(k, j)| (k, j.len()))
804-
.collect::<Vec<_>>(),
805-
self.to_send_clients
806-
.iter()
807-
.map(|(k, j)| (k, j.len()))
808-
.collect::<Vec<_>>(),
809-
events.len(),
810-
);
811699
if events.is_empty() {
812700
loop {
813701
self.tick_progress();
@@ -866,17 +754,13 @@ impl<'cfg> DrainState<'cfg> {
866754
break;
867755
}
868756

869-
if let Err(e) = self.grant_rustc_token_requests() {
870-
self.handle_error(&mut cx.bcx.config.shell(), &mut errors, e);
871-
}
872-
873757
// And finally, before we block waiting for the next event, drop any
874758
// excess tokens we may have accidentally acquired. Due to how our
875759
// jobserver interface is architected we may acquire a token that we
876760
// don't actually use, and if this happens just relinquish it back
877761
// to the jobserver itself.
878762
for event in self.wait_for_events() {
879-
if let Err(event_err) = self.handle_event(cx, jobserver_helper, plan, event) {
763+
if let Err(event_err) = self.handle_event(cx, plan, event) {
880764
self.handle_error(&mut cx.bcx.config.shell(), &mut errors, event_err);
881765
}
882766
}
@@ -970,7 +854,6 @@ impl<'cfg> DrainState<'cfg> {
970854
self.active.len(),
971855
self.pending_queue.len(),
972856
self.queue.len(),
973-
self.rustc_tokens.len(),
974857
);
975858
self.timings.record_cpu();
976859

src/cargo/core/compiler/mod.rs

Lines changed: 1 addition & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -715,14 +715,7 @@ fn prepare_rustc(
715715
base.env("CARGO_TARGET_TMPDIR", tmp.display().to_string());
716716
}
717717

718-
if cx.bcx.config.cli_unstable().jobserver_per_rustc {
719-
let client = cx.new_jobserver()?;
720-
base.inherit_jobserver(&client);
721-
base.arg("-Z").arg("jobserver-token-requests");
722-
assert!(cx.rustc_clients.insert(unit.clone(), client).is_none());
723-
} else {
724-
base.inherit_jobserver(&cx.jobserver);
725-
}
718+
base.inherit_jobserver(&cx.jobserver);
726719
build_base_args(cx, &mut base, unit, crate_types)?;
727720
build_deps_args(&mut base, cx, unit)?;
728721
Ok(base)
@@ -1701,31 +1694,6 @@ fn on_stderr_line_inner(
17011694
return Ok(false);
17021695
}
17031696

1704-
#[derive(serde::Deserialize)]
1705-
struct JobserverNotification {
1706-
jobserver_event: Event,
1707-
}
1708-
1709-
#[derive(Debug, serde::Deserialize)]
1710-
enum Event {
1711-
WillAcquire,
1712-
Release,
1713-
}
1714-
1715-
if let Ok(JobserverNotification { jobserver_event }) =
1716-
serde_json::from_str::<JobserverNotification>(compiler_message.get())
1717-
{
1718-
trace!(
1719-
"found jobserver directive from rustc: `{:?}`",
1720-
jobserver_event
1721-
);
1722-
match jobserver_event {
1723-
Event::WillAcquire => state.will_acquire(),
1724-
Event::Release => state.release_token(),
1725-
}
1726-
return Ok(false);
1727-
}
1728-
17291697
// And failing all that above we should have a legitimate JSON diagnostic
17301698
// from the compiler, so wrap it in an external Cargo JSON message
17311699
// indicating which package it came from and then emit it.

0 commit comments

Comments
 (0)