blob: 26fcd482692955437ef23cc0b6d0eaed5156369c [file] [log] [blame]
//! Management of the interaction between the main `cargo` and all spawned jobs.
//!
//! ## Overview
//!
//! This module implements a job queue. A job here represents a unit of work,
//! which is roughly a rusc invocation, a build script run, or just a no-op.
//! The job queue primarily handles the following things:
//!
//! * Spawns concurrent jobs. Depending on its [`Freshness`], a job could be
//! either executed on a spawned thread or ran on the same thread to avoid
//! the threading overhead.
//! * Controls the number of concurrency. It allocates and manages [`jobserver`]
//! tokens to each spawned off rustc and build scripts.
//! * Manages the communication between the main `cargo` process and its
//! spawned jobs. Those [`Message`]s are sent over a [`Queue`] shared
//! across threads.
//! * Schedules the execution order of each [`Job`]. Priorities are determined
//! when calling [`JobQueue::enqueue`] to enqueue a job. The scheduling is
//! relatively rudimentary and could likely be improved.
//!
//! A rough outline of building a queue and executing jobs is:
//!
//! 1. [`JobQueue::new`] to simply create one queue.
//! 2. [`JobQueue::enqueue`] to add new jobs onto the queue.
//! 3. Consumes the queue and executes all jobs via [`JobQueue::execute`].
//!
//! The primary loop happens insides [`JobQueue::execute`], which is effectively
//! [`DrainState::drain_the_queue`]. [`DrainState`] is, as its name tells,
//! the running state of the job queue getting drained.
//!
//! ## Jobserver
//!
//! As of Feb. 2023, Cargo and rustc have a relatively simple jobserver
//! relationship with each other. They share a single jobserver amongst what
//! is potentially hundreds of threads of work on many-cored systems.
//! The jobserver could come from either the environment (e.g., from a `make`
//! invocation), or from Cargo creating its own jobserver server if there is no
//! jobserver to inherit from.
//!
//! Cargo wants to complete the build as quickly as possible, fully saturating
//! all cores (as constrained by the `-j=N`) parameter. Cargo also must not spawn
//! more than N threads of work: the total amount of tokens we have floating
//! around must always be limited to N.
//!
//! It is not really possible to optimally choose which crate should build
//! first or last; nor is it possible to decide whether to give an additional
//! token to rustc first or rather spawn a new crate of work. The algorithm in
//! Cargo prioritizes spawning as many crates (i.e., rustc processes) as
//! possible. In short, the jobserver relationship among Cargo and rustc
//! processes is **1 `cargo` to N `rustc`**. Cargo knows nothing beyond rustc
//! processes in terms of parallelism[^parallel-rustc].
//!
//! We integrate with the [jobserver] crate, originating from GNU make
//! [POSIX jobserver], to make sure that build scripts which use make to
//! build C code can cooperate with us on the number of used tokens and
//! avoid overfilling the system we're on.
//!
//! ## Scheduling
//!
//! The current scheduling algorithm is not really polished. It is simply based
//! on a dependency graph [`DependencyQueue`]. We continue adding nodes onto
//! the graph until we finalize it. When the graph gets finalized, it finds the
//! sum of the cost of each dependencies of each node, including transitively.
//! The sum of dependency cost turns out to be the cost of each given node.
//!
//! At the time being, the cost is just passed as a fixed placeholder in
//! [`JobQueue::enqueue`]. In the future, we could explore more possibilities
//! around it. For instance, we start persisting timing information for each
//! build somewhere. For a subsequent build, we can look into the historical
//! data and perform a PGO-like optimization to prioritize jobs, making a build
//! fully pipelined.
//!
//! ## Message queue
//!
//! Each spawned thread running a process uses the message queue [`Queue`] to
//! send messages back to the main thread (the one running `cargo`).
//! The main thread coordinates everything, and handles printing output.
//!
//! It is important to be careful which messages use [`push`] vs [`push_bounded`].
//! `push` is for priority messages (like tokens, or "finished") where the
//! sender shouldn't block. We want to handle those so real work can proceed
//! ASAP.
//!
//! `push_bounded` is only for messages being printed to stdout/stderr. Being
//! bounded prevents a flood of messages causing a large amount of memory
//! being used.
//!
//! `push` also avoids blocking which helps avoid deadlocks. For example, when
//! the diagnostic server thread is dropped, it waits for the thread to exit.
//! But if the thread is blocked on a full queue, and there is a critical
//! error, the drop will deadlock. This should be fixed at some point in the
//! future. The jobserver thread has a similar problem, though it will time
//! out after 1 second.
//!
//! To access the message queue, each running `Job` is given its own [`JobState`],
//! containing everything it needs to communicate with the main thread.
//!
//! See [`Message`] for all available message kinds.
//!
//! [^parallel-rustc]: In fact, `jobserver` that Cargo uses also manages the
//! allocation of tokens to rustc beyond the implicit token each rustc owns
//! (i.e., the ones used for parallel LLVM work and parallel rustc threads).
//! See also ["Rust Compiler Development Guide: Parallel Compilation"]
//! and [this comment][rustc-codegen] in rust-lang/rust.
//!
//! ["Rust Compiler Development Guide: Parallel Compilation"]: https://rustc-dev-guide.rust-lang.org/parallel-rustc.html
//! [rustc-codegen]: https://github.com/rust-lang/rust/blob/5423745db8b434fcde54888b35f518f00cce00e4/compiler/rustc_codegen_ssa/src/back/write.rs#L1204-L1217
//! [jobserver]: https://docs.rs/jobserver
//! [POSIX jobserver]: https://www.gnu.org/software/make/manual/html_node/POSIX-Jobserver.html
//! [`push`]: Queue::push
//! [`push_bounded`]: Queue::push_bounded
mod job;
mod job_state;
use std::cell::RefCell;
use std::collections::{HashMap, HashSet};
use std::fmt::Write as _;
use std::io;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::thread::{self, Scope};
use std::time::Duration;
use anyhow::{format_err, Context as _};
use cargo_util::ProcessBuilder;
use jobserver::{Acquired, HelperThread};
use semver::Version;
use tracing::{debug, trace};
pub use self::job::Freshness::{self, Dirty, Fresh};
pub use self::job::{Job, Work};
pub use self::job_state::JobState;
use super::context::OutputFile;
use super::timings::Timings;
use super::{BuildContext, BuildPlan, CompileMode, Context, Unit};
use crate::core::compiler::descriptive_pkg_name;
use crate::core::compiler::future_incompat::{
self, FutureBreakageItem, FutureIncompatReportPackage,
};
use crate::core::resolver::ResolveBehavior;
use crate::core::{PackageId, Shell, TargetKind};
use crate::util::diagnostic_server::{self, DiagnosticPrinter};
use crate::util::errors::AlreadyPrintedError;
use crate::util::machine_message::{self, Message as _};
use crate::util::CargoResult;
use crate::util::{self, internal, profile};
use crate::util::{Config, DependencyQueue, Progress, ProgressStyle, Queue};
/// This structure is backed by the `DependencyQueue` type and manages the
/// queueing of compilation steps for each package. Packages enqueue units of
/// work and then later on the entire graph is converted to DrainState and
/// executed.
pub struct JobQueue<'cfg> {
queue: DependencyQueue<Unit, Artifact, Job>,
counts: HashMap<PackageId, usize>,
timings: Timings<'cfg>,
}
/// This structure is backed by the `DependencyQueue` type and manages the
/// actual compilation step of each package. Packages enqueue units of work and
/// then later on the entire graph is processed and compiled.
///
/// It is created from JobQueue when we have fully assembled the crate graph
/// (i.e., all package dependencies are known).
struct DrainState<'cfg> {
// This is the length of the DependencyQueue when starting out
total_units: usize,
queue: DependencyQueue<Unit, Artifact, Job>,
messages: Arc<Queue<Message>>,
/// Diagnostic deduplication support.
diag_dedupe: DiagDedupe<'cfg>,
/// Count of warnings, used to print a summary after the job succeeds
warning_count: HashMap<JobId, WarningCount>,
active: HashMap<JobId, Unit>,
compiled: HashSet<PackageId>,
documented: HashSet<PackageId>,
scraped: HashSet<PackageId>,
counts: HashMap<PackageId, usize>,
progress: Progress<'cfg>,
next_id: u32,
timings: Timings<'cfg>,
/// Tokens that are currently owned by this Cargo, and may be "associated"
/// with a rustc process. They may also be unused, though if so will be
/// dropped on the next loop iteration.
///
/// Note that the length of this may be zero, but we will still spawn work,
/// as we share the implicit token given to this Cargo process with a
/// single rustc process.
tokens: Vec<Acquired>,
/// The list of jobs that we have not yet started executing, but have
/// retrieved from the `queue`. We eagerly pull jobs off the main queue to
/// allow us to request jobserver tokens pretty early.
pending_queue: Vec<(Unit, Job, usize)>,
print: DiagnosticPrinter<'cfg>,
/// How many jobs we've finished
finished: usize,
per_package_future_incompat_reports: Vec<FutureIncompatReportPackage>,
}
/// Count of warnings, used to print a summary after the job succeeds
#[derive(Default)]
pub struct WarningCount {
/// total number of warnings
pub total: usize,
/// number of warnings that were suppressed because they
/// were duplicates of a previous warning
pub duplicates: usize,
/// number of fixable warnings set to `NotAllowed`
/// if any errors have been seen ofr the current
/// target
pub fixable: FixableWarnings,
}
impl WarningCount {
/// If an error is seen this should be called
/// to set `fixable` to `NotAllowed`
fn disallow_fixable(&mut self) {
self.fixable = FixableWarnings::NotAllowed;
}
/// Checks fixable if warnings are allowed
/// fixable warnings are allowed if no
/// errors have been seen for the current
/// target. If an error was seen `fixable`
/// will be `NotAllowed`.
fn fixable_allowed(&self) -> bool {
match &self.fixable {
FixableWarnings::NotAllowed => false,
_ => true,
}
}
}
/// Used to keep track of how many fixable warnings there are
/// and if fixable warnings are allowed
#[derive(Default)]
pub enum FixableWarnings {
NotAllowed,
#[default]
Zero,
Positive(usize),
}
pub struct ErrorsDuringDrain {
pub count: usize,
}
struct ErrorToHandle {
error: anyhow::Error,
/// This field is true for "interesting" errors and false for "mundane"
/// errors. If false, we print the above error only if it's the first one
/// encountered so far while draining the job queue.
///
/// At most places that an error is propagated, we set this to false to
/// avoid scenarios where Cargo might end up spewing tons of redundant error
/// messages. For example if an i/o stream got closed somewhere, we don't
/// care about individually reporting every thread that it broke; just the
/// first is enough.
///
/// The exception where print_always is true is that we do report every
/// instance of a rustc invocation that failed with diagnostics. This
/// corresponds to errors from Message::Finish.
print_always: bool,
}
impl<E> From<E> for ErrorToHandle
where
anyhow::Error: From<E>,
{
fn from(error: E) -> Self {
ErrorToHandle {
error: anyhow::Error::from(error),
print_always: false,
}
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct JobId(pub u32);
impl std::fmt::Display for JobId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
/// Handler for deduplicating diagnostics.
struct DiagDedupe<'cfg> {
seen: RefCell<HashSet<u64>>,
config: &'cfg Config,
}
impl<'cfg> DiagDedupe<'cfg> {
fn new(config: &'cfg Config) -> Self {
DiagDedupe {
seen: RefCell::new(HashSet::new()),
config,
}
}
/// Emits a diagnostic message.
///
/// Returns `true` if the message was emitted, or `false` if it was
/// suppressed for being a duplicate.
fn emit_diag(&self, diag: &str) -> CargoResult<bool> {
let h = util::hash_u64(diag);
if !self.seen.borrow_mut().insert(h) {
return Ok(false);
}
let mut shell = self.config.shell();
shell.print_ansi_stderr(diag.as_bytes())?;
shell.err().write_all(b"\n")?;
Ok(true)
}
}
/// Possible artifacts that can be produced by compilations, used as edge values
/// in the dependency graph.
///
/// As edge values we can have multiple kinds of edges depending on one node,
/// for example some units may only depend on the metadata for an rlib while
/// others depend on the full rlib. This `Artifact` enum is used to distinguish
/// this case and track the progress of compilations as they proceed.
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
enum Artifact {
/// A generic placeholder for "depends on everything run by a step" and
/// means that we can't start the next compilation until the previous has
/// finished entirely.
All,
/// A node indicating that we only depend on the metadata of a compilation,
/// but the compilation is typically also producing an rlib. We can start
/// our step, however, before the full rlib is available.
Metadata,
}
enum Message {
Run(JobId, String),
BuildPlanMsg(String, ProcessBuilder, Arc<Vec<OutputFile>>),
Stdout(String),
Stderr(String),
// This is for general stderr output from subprocesses
Diagnostic {
id: JobId,
level: String,
diag: String,
fixable: bool,
},
// This handles duplicate output that is suppressed, for showing
// only a count of duplicate messages instead
WarningCount {
id: JobId,
emitted: bool,
fixable: bool,
},
// This is for warnings generated by Cargo's interpretation of the
// subprocess output, e.g. scrape-examples prints a warning if a
// unit fails to be scraped
Warning {
id: JobId,
warning: String,
},
FixDiagnostic(diagnostic_server::Message),
Token(io::Result<Acquired>),
Finish(JobId, Artifact, CargoResult<()>),
FutureIncompatReport(JobId, Vec<FutureBreakageItem>),
}
impl<'cfg> JobQueue<'cfg> {
pub fn new(bcx: &BuildContext<'_, 'cfg>) -> JobQueue<'cfg> {
JobQueue {
queue: DependencyQueue::new(),
counts: HashMap::new(),
timings: Timings::new(bcx, &bcx.roots),
}
}
pub fn enqueue(&mut self, cx: &Context<'_, 'cfg>, unit: &Unit, job: Job) -> CargoResult<()> {
let dependencies = cx.unit_deps(unit);
let mut queue_deps = dependencies
.iter()
.filter(|dep| {
// Binaries aren't actually needed to *compile* tests, just to run
// them, so we don't include this dependency edge in the job graph.
// But we shouldn't filter out dependencies being scraped for Rustdoc.
(!dep.unit.target.is_test() && !dep.unit.target.is_bin())
|| dep.unit.artifact.is_true()
|| dep.unit.mode.is_doc_scrape()
})
.map(|dep| {
// Handle the case here where our `unit -> dep` dependency may
// only require the metadata, not the full compilation to
// finish. Use the tables in `cx` to figure out what kind
// of artifact is associated with this dependency.
let artifact = if cx.only_requires_rmeta(unit, &dep.unit) {
Artifact::Metadata
} else {
Artifact::All
};
(dep.unit.clone(), artifact)
})
.collect::<HashMap<_, _>>();
// This is somewhat tricky, but we may need to synthesize some
// dependencies for this target if it requires full upstream
// compilations to have completed. Because of pipelining, some
// dependency edges may be `Metadata` due to the above clause (as
// opposed to everything being `All`). For example consider:
//
// a (binary)
// â”” b (lib)
// â”” c (lib)
//
// Here the dependency edge from B to C will be `Metadata`, and the
// dependency edge from A to B will be `All`. For A to be compiled,
// however, it currently actually needs the full rlib of C. This means
// that we need to synthesize a dependency edge for the dependency graph
// from A to C. That's done here.
//
// This will walk all dependencies of the current target, and if any of
// *their* dependencies are `Metadata` then we depend on the `All` of
// the target as well. This should ensure that edges changed to
// `Metadata` propagate upwards `All` dependencies to anything that
// transitively contains the `Metadata` edge.
if unit.requires_upstream_objects() {
for dep in dependencies {
depend_on_deps_of_deps(cx, &mut queue_deps, dep.unit.clone());
}
fn depend_on_deps_of_deps(
cx: &Context<'_, '_>,
deps: &mut HashMap<Unit, Artifact>,
unit: Unit,
) {
for dep in cx.unit_deps(&unit) {
if deps.insert(dep.unit.clone(), Artifact::All).is_none() {
depend_on_deps_of_deps(cx, deps, dep.unit.clone());
}
}
}
}
// For now we use a fixed placeholder value for the cost of each unit, but
// in the future this could be used to allow users to provide hints about
// relative expected costs of units, or this could be automatically set in
// a smarter way using timing data from a previous compilation.
self.queue.queue(unit.clone(), job, queue_deps, 100);
*self.counts.entry(unit.pkg.package_id()).or_insert(0) += 1;
Ok(())
}
/// Executes all jobs necessary to build the dependency graph.
///
/// This function will spawn off `config.jobs()` workers to build all of the
/// necessary dependencies, in order. Freshness is propagated as far as
/// possible along each dependency chain.
pub fn execute(mut self, cx: &mut Context<'_, '_>, plan: &mut BuildPlan) -> CargoResult<()> {
let _p = profile::start("executing the job graph");
self.queue.queue_finished();
let progress = Progress::with_style("Building", ProgressStyle::Ratio, cx.bcx.config);
let state = DrainState {
total_units: self.queue.len(),
queue: self.queue,
// 100 here is somewhat arbitrary. It is a few screenfulls of
// output, and hopefully at most a few megabytes of memory for
// typical messages. If you change this, please update the test
// caching_large_output, too.
messages: Arc::new(Queue::new(100)),
diag_dedupe: DiagDedupe::new(cx.bcx.config),
warning_count: HashMap::new(),
active: HashMap::new(),
compiled: HashSet::new(),
documented: HashSet::new(),
scraped: HashSet::new(),
counts: self.counts,
progress,
next_id: 0,
timings: self.timings,
tokens: Vec::new(),
pending_queue: Vec::new(),
print: DiagnosticPrinter::new(cx.bcx.config, &cx.bcx.rustc().workspace_wrapper),
finished: 0,
per_package_future_incompat_reports: Vec::new(),
};
// Create a helper thread for acquiring jobserver tokens
let messages = state.messages.clone();
let helper = cx
.jobserver
.clone()
.into_helper_thread(move |token| {
messages.push(Message::Token(token));
})
.with_context(|| "failed to create helper thread for jobserver management")?;
// Create a helper thread to manage the diagnostics for rustfix if
// necessary.
let messages = state.messages.clone();
// It is important that this uses `push` instead of `push_bounded` for
// now. If someone wants to fix this to be bounded, the `drop`
// implementation needs to be changed to avoid possible deadlocks.
let _diagnostic_server = cx
.bcx
.build_config
.rustfix_diagnostic_server
.borrow_mut()
.take()
.map(move |srv| srv.start(move |msg| messages.push(Message::FixDiagnostic(msg))));
thread::scope(
move |scope| match state.drain_the_queue(cx, plan, scope, &helper) {
Some(err) => Err(err),
None => Ok(()),
},
)
}
}
impl<'cfg> DrainState<'cfg> {
fn spawn_work_if_possible<'s>(
&mut self,
cx: &mut Context<'_, '_>,
jobserver_helper: &HelperThread,
scope: &'s Scope<'s, '_>,
) -> CargoResult<()> {
// Dequeue as much work as we can, learning about everything
// possible that can run. Note that this is also the point where we
// start requesting job tokens. Each job after the first needs to
// request a token.
while let Some((unit, job, priority)) = self.queue.dequeue() {
// We want to keep the pieces of work in the `pending_queue` sorted
// by their priorities, and insert the current job at its correctly
// sorted position: following the lower priority jobs, and the ones
// with the same priority (since they were dequeued before the
// current one, we also keep that relation).
let idx = self
.pending_queue
.partition_point(|&(_, _, p)| p <= priority);
self.pending_queue.insert(idx, (unit, job, priority));
if self.active.len() + self.pending_queue.len() > 1 {
jobserver_helper.request_token();
}
}
// Now that we've learned of all possible work that we can execute
// try to spawn it so long as we've got a jobserver token which says
// we're able to perform some parallel work.
// The `pending_queue` is sorted in ascending priority order, and we
// remove items from its end to schedule the highest priority items
// sooner.
while self.has_extra_tokens() && !self.pending_queue.is_empty() {
let (unit, job, _) = self.pending_queue.pop().unwrap();
*self.counts.get_mut(&unit.pkg.package_id()).unwrap() -= 1;
if !cx.bcx.build_config.build_plan {
// Print out some nice progress information.
// NOTE: An error here will drop the job without starting it.
// That should be OK, since we want to exit as soon as
// possible during an error.
self.note_working_on(cx.bcx.config, cx.bcx.ws.root(), &unit, job.freshness())?;
}
self.run(&unit, job, cx, scope);
}
Ok(())
}
fn has_extra_tokens(&self) -> bool {
self.active.len() < self.tokens.len() + 1
}
fn handle_event(
&mut self,
cx: &mut Context<'_, '_>,
plan: &mut BuildPlan,
event: Message,
) -> Result<(), ErrorToHandle> {
match event {
Message::Run(id, cmd) => {
cx.bcx
.config
.shell()
.verbose(|c| c.status("Running", &cmd))?;
self.timings.unit_start(id, self.active[&id].clone());
}
Message::BuildPlanMsg(module_name, cmd, filenames) => {
plan.update(&module_name, &cmd, &filenames)?;
}
Message::Stdout(out) => {
writeln!(cx.bcx.config.shell().out(), "{}", out)?;
}
Message::Stderr(err) => {
let mut shell = cx.bcx.config.shell();
shell.print_ansi_stderr(err.as_bytes())?;
shell.err().write_all(b"\n")?;
}
Message::Diagnostic {
id,
level,
diag,
fixable,
} => {
let emitted = self.diag_dedupe.emit_diag(&diag)?;
if level == "warning" {
self.bump_warning_count(id, emitted, fixable);
}
if level == "error" {
let cnts = self.warning_count.entry(id).or_default();
// If there is an error, the `cargo fix` message should not show
cnts.disallow_fixable();
}
}
Message::Warning { id, warning } => {
cx.bcx.config.shell().warn(warning)?;
self.bump_warning_count(id, true, false);
}
Message::WarningCount {
id,
emitted,
fixable,
} => {
self.bump_warning_count(id, emitted, fixable);
}
Message::FixDiagnostic(msg) => {
self.print.print(&msg)?;
}
Message::Finish(id, artifact, result) => {
let unit = match artifact {
// If `id` has completely finished we remove it
// from the `active` map ...
Artifact::All => {
trace!("end: {:?}", id);
self.finished += 1;
self.report_warning_count(
cx.bcx.config,
id,
&cx.bcx.rustc().workspace_wrapper,
);
self.active.remove(&id).unwrap()
}
// ... otherwise if it hasn't finished we leave it
// in there as we'll get another `Finish` later on.
Artifact::Metadata => {
trace!("end (meta): {:?}", id);
self.active[&id].clone()
}
};
debug!("end ({:?}): {:?}", unit, result);
match result {
Ok(()) => self.finish(id, &unit, artifact, cx)?,
Err(_) if cx.bcx.unit_can_fail_for_docscraping(&unit) => {
cx.failed_scrape_units
.lock()
.unwrap()
.insert(cx.files().metadata(&unit));
self.queue.finish(&unit, &artifact);
}
Err(error) => {
let msg = "The following warnings were emitted during compilation:";
self.emit_warnings(Some(msg), &unit, cx)?;
self.back_compat_notice(cx, &unit)?;
return Err(ErrorToHandle {
error,
print_always: true,
});
}
}
}
Message::FutureIncompatReport(id, items) => {
let package_id = self.active[&id].pkg.package_id();
self.per_package_future_incompat_reports
.push(FutureIncompatReportPackage { package_id, items });
}
Message::Token(acquired_token) => {
let token = acquired_token.with_context(|| "failed to acquire jobserver token")?;
self.tokens.push(token);
}
}
Ok(())
}
// This will also tick the progress bar as appropriate
fn wait_for_events(&mut self) -> Vec<Message> {
// Drain all events at once to avoid displaying the progress bar
// unnecessarily. If there's no events we actually block waiting for
// an event, but we keep a "heartbeat" going to allow `record_cpu`
// to run above to calculate CPU usage over time. To do this we
// listen for a message with a timeout, and on timeout we run the
// previous parts of the loop again.
let mut events = self.messages.try_pop_all();
if events.is_empty() {
loop {
self.tick_progress();
self.tokens.truncate(self.active.len() - 1);
match self.messages.pop(Duration::from_millis(500)) {
Some(message) => {
events.push(message);
break;
}
None => continue,
}
}
}
events
}
/// This is the "main" loop, where Cargo does all work to run the
/// compiler.
///
/// This returns an Option to prevent the use of `?` on `Result` types
/// because it is important for the loop to carefully handle errors.
fn drain_the_queue<'s>(
mut self,
cx: &mut Context<'_, '_>,
plan: &mut BuildPlan,
scope: &'s Scope<'s, '_>,
jobserver_helper: &HelperThread,
) -> Option<anyhow::Error> {
trace!("queue: {:#?}", self.queue);
// Iteratively execute the entire dependency graph. Each turn of the
// loop starts out by scheduling as much work as possible (up to the
// maximum number of parallel jobs we have tokens for). A local queue
// is maintained separately from the main dependency queue as one
// dequeue may actually dequeue quite a bit of work (e.g., 10 binaries
// in one package).
//
// After a job has finished we update our internal state if it was
// successful and otherwise wait for pending work to finish if it failed
// and then immediately return (or keep going, if requested by the build
// config).
let mut errors = ErrorsDuringDrain { count: 0 };
// CAUTION! Do not use `?` or break out of the loop early. Every error
// must be handled in such a way that the loop is still allowed to
// drain event messages.
loop {
if errors.count == 0 || cx.bcx.build_config.keep_going {
if let Err(e) = self.spawn_work_if_possible(cx, jobserver_helper, scope) {
self.handle_error(&mut cx.bcx.config.shell(), &mut errors, e);
}
}
// If after all that we're not actually running anything then we're
// done!
if self.active.is_empty() {
break;
}
// And finally, before we block waiting for the next event, drop any
// excess tokens we may have accidentally acquired. Due to how our
// jobserver interface is architected we may acquire a token that we
// don't actually use, and if this happens just relinquish it back
// to the jobserver itself.
for event in self.wait_for_events() {
if let Err(event_err) = self.handle_event(cx, plan, event) {
self.handle_error(&mut cx.bcx.config.shell(), &mut errors, event_err);
}
}
}
self.progress.clear();
let profile_name = cx.bcx.build_config.requested_profile;
// NOTE: this may be a bit inaccurate, since this may not display the
// profile for what was actually built. Profile overrides can change
// these settings, and in some cases different targets are built with
// different profiles. To be accurate, it would need to collect a
// list of Units built, and maybe display a list of the different
// profiles used. However, to keep it simple and compatible with old
// behavior, we just display what the base profile is.
let profile = cx.bcx.profiles.base_profile();
let mut opt_type = String::from(if profile.opt_level.as_str() == "0" {
"unoptimized"
} else {
"optimized"
});
if profile.debuginfo.is_turned_on() {
opt_type += " + debuginfo";
}
let time_elapsed = util::elapsed(cx.bcx.config.creation_time().elapsed());
if let Err(e) = self.timings.finished(cx, &errors.to_error()) {
self.handle_error(&mut cx.bcx.config.shell(), &mut errors, e);
}
if cx.bcx.build_config.emit_json() {
let mut shell = cx.bcx.config.shell();
let msg = machine_message::BuildFinished {
success: errors.count == 0,
}
.to_json_string();
if let Err(e) = writeln!(shell.out(), "{}", msg) {
self.handle_error(&mut shell, &mut errors, e);
}
}
if let Some(error) = errors.to_error() {
// Any errors up to this point have already been printed via the
// `display_error` inside `handle_error`.
Some(anyhow::Error::new(AlreadyPrintedError::new(error)))
} else if self.queue.is_empty() && self.pending_queue.is_empty() {
let message = format!(
"{} [{}] target(s) in {}",
profile_name, opt_type, time_elapsed
);
if !cx.bcx.build_config.build_plan {
// It doesn't really matter if this fails.
let _ = cx.bcx.config.shell().status("Finished", message);
future_incompat::save_and_display_report(
cx.bcx,
&self.per_package_future_incompat_reports,
);
}
None
} else {
debug!("queue: {:#?}", self.queue);
Some(internal("finished with jobs still left in the queue"))
}
}
fn handle_error(
&self,
shell: &mut Shell,
err_state: &mut ErrorsDuringDrain,
new_err: impl Into<ErrorToHandle>,
) {
let new_err = new_err.into();
if new_err.print_always || err_state.count == 0 {
crate::display_error(&new_err.error, shell);
if err_state.count == 0 && !self.active.is_empty() {
let _ = shell.warn("build failed, waiting for other jobs to finish...");
}
err_state.count += 1;
} else {
tracing::warn!("{:?}", new_err.error);
}
}
// This also records CPU usage and marks concurrency; we roughly want to do
// this as often as we spin on the events receiver (at least every 500ms or
// so).
fn tick_progress(&mut self) {
// Record some timing information if `--timings` is enabled, and
// this'll end up being a noop if we're not recording this
// information.
self.timings.mark_concurrency(
self.active.len(),
self.pending_queue.len(),
self.queue.len(),
);
self.timings.record_cpu();
let active_names = self
.active
.values()
.map(|u| self.name_for_progress(u))
.collect::<Vec<_>>();
let _ = self.progress.tick_now(
self.finished,
self.total_units,
&format!(": {}", active_names.join(", ")),
);
}
fn name_for_progress(&self, unit: &Unit) -> String {
let pkg_name = unit.pkg.name();
let target_name = unit.target.name();
match unit.mode {
CompileMode::Doc { .. } => format!("{}(doc)", pkg_name),
CompileMode::RunCustomBuild => format!("{}(build)", pkg_name),
CompileMode::Test | CompileMode::Check { test: true } => match unit.target.kind() {
TargetKind::Lib(_) => format!("{}(test)", target_name),
TargetKind::CustomBuild => panic!("cannot test build script"),
TargetKind::Bin => format!("{}(bin test)", target_name),
TargetKind::Test => format!("{}(test)", target_name),
TargetKind::Bench => format!("{}(bench)", target_name),
TargetKind::ExampleBin | TargetKind::ExampleLib(_) => {
format!("{}(example test)", target_name)
}
},
_ => match unit.target.kind() {
TargetKind::Lib(_) => pkg_name.to_string(),
TargetKind::CustomBuild => format!("{}(build.rs)", pkg_name),
TargetKind::Bin => format!("{}(bin)", target_name),
TargetKind::Test => format!("{}(test)", target_name),
TargetKind::Bench => format!("{}(bench)", target_name),
TargetKind::ExampleBin | TargetKind::ExampleLib(_) => {
format!("{}(example)", target_name)
}
},
}
}
/// Executes a job.
///
/// Fresh jobs block until finished (which should be very fast!), Dirty
/// jobs will spawn a thread in the background and return immediately.
fn run<'s>(&mut self, unit: &Unit, job: Job, cx: &Context<'_, '_>, scope: &'s Scope<'s, '_>) {
let id = JobId(self.next_id);
self.next_id = self.next_id.checked_add(1).unwrap();
debug!("start {}: {:?}", id, unit);
assert!(self.active.insert(id, unit.clone()).is_none());
let messages = self.messages.clone();
let is_fresh = job.freshness().is_fresh();
let rmeta_required = cx.rmeta_required(unit);
let doit = move |diag_dedupe| {
let state = JobState::new(id, messages, diag_dedupe, rmeta_required);
state.run_to_finish(job);
};
match is_fresh {
true => {
self.timings.add_fresh();
// Running a fresh job on the same thread is often much faster than spawning a new
// thread to run the job.
doit(Some(&self.diag_dedupe));
}
false => {
self.timings.add_dirty();
scope.spawn(move || doit(None));
}
}
}
fn emit_warnings(
&mut self,
msg: Option<&str>,
unit: &Unit,
cx: &mut Context<'_, '_>,
) -> CargoResult<()> {
let outputs = cx.build_script_outputs.lock().unwrap();
let metadata = match cx.find_build_script_metadata(unit) {
Some(metadata) => metadata,
None => return Ok(()),
};
let bcx = &mut cx.bcx;
if let Some(output) = outputs.get(metadata) {
if !output.warnings.is_empty() {
if let Some(msg) = msg {
writeln!(bcx.config.shell().err(), "{}\n", msg)?;
}
for warning in output.warnings.iter() {
bcx.config.shell().warn(warning)?;
}
if msg.is_some() {
// Output an empty line.
writeln!(bcx.config.shell().err())?;
}
}
}
Ok(())
}
fn bump_warning_count(&mut self, id: JobId, emitted: bool, fixable: bool) {
let cnts = self.warning_count.entry(id).or_default();
cnts.total += 1;
if !emitted {
cnts.duplicates += 1;
// Don't add to fixable if it's already been emitted
} else if fixable {
// Do not add anything to the fixable warning count if
// is `NotAllowed` since that indicates there was an
// error while building this `Unit`
if cnts.fixable_allowed() {
cnts.fixable = match cnts.fixable {
FixableWarnings::NotAllowed => FixableWarnings::NotAllowed,
FixableWarnings::Zero => FixableWarnings::Positive(1),
FixableWarnings::Positive(fixable) => FixableWarnings::Positive(fixable + 1),
};
}
}
}
/// Displays a final report of the warnings emitted by a particular job.
fn report_warning_count(
&mut self,
config: &Config,
id: JobId,
rustc_workspace_wrapper: &Option<PathBuf>,
) {
let count = match self.warning_count.remove(&id) {
// An error could add an entry for a `Unit`
// with 0 warnings but having fixable
// warnings be disallowed
Some(count) if count.total > 0 => count,
None | Some(_) => return,
};
let unit = &self.active[&id];
let mut message = descriptive_pkg_name(&unit.pkg.name(), &unit.target, &unit.mode);
message.push_str(" generated ");
match count.total {
1 => message.push_str("1 warning"),
n => {
let _ = write!(message, "{} warnings", n);
}
};
match count.duplicates {
0 => {}
1 => message.push_str(" (1 duplicate)"),
n => {
let _ = write!(message, " ({} duplicates)", n);
}
}
// Only show the `cargo fix` message if its a local `Unit`
if unit.is_local() {
// Do not show this if there are any errors or no fixable warnings
if let FixableWarnings::Positive(fixable) = count.fixable {
// `cargo fix` doesnt have an option for custom builds
if !unit.target.is_custom_build() {
// To make sure the correct command is shown for `clippy` we
// check if `RUSTC_WORKSPACE_WRAPPER` is set and pointing towards
// `clippy-driver`.
let clippy = std::ffi::OsStr::new("clippy-driver");
let command = match rustc_workspace_wrapper.as_ref().and_then(|x| x.file_stem())
{
Some(wrapper) if wrapper == clippy => "cargo clippy --fix",
_ => "cargo fix",
};
let mut args = {
let named = unit.target.description_named();
// if its a lib we need to add the package to fix
if unit.target.is_lib() {
format!("{} -p {}", named, unit.pkg.name())
} else {
named
}
};
if unit.mode.is_rustc_test()
&& !(unit.target.is_test() || unit.target.is_bench())
{
args.push_str(" --tests");
}
let mut suggestions = format!("{} suggestion", fixable);
if fixable > 1 {
suggestions.push_str("s")
}
let _ = write!(
message,
" (run `{command} --{args}` to apply {suggestions})"
);
}
}
}
// Errors are ignored here because it is tricky to handle them
// correctly, and they aren't important.
let _ = config.shell().warn(message);
}
fn finish(
&mut self,
id: JobId,
unit: &Unit,
artifact: Artifact,
cx: &mut Context<'_, '_>,
) -> CargoResult<()> {
if unit.mode.is_run_custom_build() && unit.show_warnings(cx.bcx.config) {
self.emit_warnings(None, unit, cx)?;
}
let unlocked = self.queue.finish(unit, &artifact);
match artifact {
Artifact::All => self.timings.unit_finished(id, unlocked),
Artifact::Metadata => self.timings.unit_rmeta_finished(id, unlocked),
}
Ok(())
}
// This isn't super trivial because we don't want to print loads and
// loads of information to the console, but we also want to produce a
// faithful representation of what's happening. This is somewhat nuanced
// as a package can start compiling *very* early on because of custom
// build commands and such.
//
// In general, we try to print "Compiling" for the first nontrivial task
// run for a package, regardless of when that is. We then don't print
// out any more information for a package after we've printed it once.
fn note_working_on(
&mut self,
config: &Config,
ws_root: &Path,
unit: &Unit,
fresh: &Freshness,
) -> CargoResult<()> {
if (self.compiled.contains(&unit.pkg.package_id())
&& !unit.mode.is_doc()
&& !unit.mode.is_doc_scrape())
|| (self.documented.contains(&unit.pkg.package_id()) && unit.mode.is_doc())
|| (self.scraped.contains(&unit.pkg.package_id()) && unit.mode.is_doc_scrape())
{
return Ok(());
}
match fresh {
// Any dirty stage which runs at least one command gets printed as
// being a compiled package.
Dirty(dirty_reason) => {
if let Some(reason) = dirty_reason {
config
.shell()
.verbose(|shell| reason.present_to(shell, unit, ws_root))?;
}
if unit.mode.is_doc() {
self.documented.insert(unit.pkg.package_id());
config.shell().status("Documenting", &unit.pkg)?;
} else if unit.mode.is_doc_test() {
// Skip doc test.
} else if unit.mode.is_doc_scrape() {
self.scraped.insert(unit.pkg.package_id());
config.shell().status("Scraping", &unit.pkg)?;
} else {
self.compiled.insert(unit.pkg.package_id());
if unit.mode.is_check() {
config.shell().status("Checking", &unit.pkg)?;
} else {
config.shell().status("Compiling", &unit.pkg)?;
}
}
}
Fresh => {
// If doc test are last, only print "Fresh" if nothing has been printed.
if self.counts[&unit.pkg.package_id()] == 0
&& !(unit.mode.is_doc_test() && self.compiled.contains(&unit.pkg.package_id()))
{
self.compiled.insert(unit.pkg.package_id());
config.shell().verbose(|c| c.status("Fresh", &unit.pkg))?;
}
}
}
Ok(())
}
fn back_compat_notice(&self, cx: &Context<'_, '_>, unit: &Unit) -> CargoResult<()> {
if unit.pkg.name() != "diesel"
|| unit.pkg.version() >= &Version::new(1, 4, 8)
|| cx.bcx.ws.resolve_behavior() == ResolveBehavior::V1
|| !unit.pkg.package_id().source_id().is_registry()
|| !unit.features.is_empty()
{
return Ok(());
}
if !cx
.bcx
.unit_graph
.keys()
.any(|unit| unit.pkg.name() == "diesel" && !unit.features.is_empty())
{
return Ok(());
}
cx.bcx.config.shell().note(
"\
This error may be due to an interaction between diesel and Cargo's new
feature resolver. Try updating to diesel 1.4.8 to fix this error.
",
)?;
Ok(())
}
}
impl ErrorsDuringDrain {
fn to_error(&self) -> Option<anyhow::Error> {
match self.count {
0 => None,
1 => Some(format_err!("1 job failed")),
n => Some(format_err!("{} jobs failed", n)),
}
}
}