| use std::cell::{Cell, Ref, RefCell, RefMut}; |
| use std::cmp::Ordering; |
| use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; |
| use std::fmt; |
| use std::hash; |
| use std::mem; |
| use std::path::{Path, PathBuf}; |
| use std::rc::Rc; |
| use std::time::{Duration, Instant}; |
| |
| use anyhow::Context; |
| use bytesize::ByteSize; |
| use curl::easy::Easy; |
| use curl::multi::{EasyHandle, Multi}; |
| use lazycell::LazyCell; |
| use semver::Version; |
| use serde::Serialize; |
| use tracing::debug; |
| |
| use crate::core::compiler::{CompileKind, RustcTargetData}; |
| use crate::core::dependency::DepKind; |
| use crate::core::resolver::features::ForceAllTargets; |
| use crate::core::resolver::{HasDevUnits, Resolve}; |
| use crate::core::{Dependency, Manifest, PackageId, SourceId, Target}; |
| use crate::core::{Summary, Workspace}; |
| use crate::sources::source::{MaybePackage, SourceMap}; |
| use crate::util::cache_lock::{CacheLock, CacheLockMode}; |
| use crate::util::errors::{CargoResult, HttpNotSuccessful}; |
| use crate::util::interning::InternedString; |
| use crate::util::network::http::http_handle_and_timeout; |
| use crate::util::network::http::HttpTimeout; |
| use crate::util::network::retry::{Retry, RetryResult}; |
| use crate::util::network::sleep::SleepTracker; |
| use crate::util::RustVersion; |
| use crate::util::{self, internal, Config, Progress, ProgressStyle}; |
| |
| pub const MANIFEST_PREAMBLE: &str = "\ |
| # THIS FILE IS AUTOMATICALLY GENERATED BY CARGO |
| # |
| # When uploading crates to the registry Cargo will automatically |
| # \"normalize\" Cargo.toml files for maximal compatibility |
| # with all versions of Cargo and also rewrite `path` dependencies |
| # to registry (e.g., crates.io) dependencies. |
| # |
| # If you are reading this file be aware that the original Cargo.toml |
| # will likely look very different (and much more reasonable). |
| # See Cargo.toml.orig for the original contents. |
| "; |
| |
| /// Information about a package that is available somewhere in the file system. |
| /// |
| /// A package is a `Cargo.toml` file plus all the files that are part of it. |
| #[derive(Clone)] |
| pub struct Package { |
| inner: Rc<PackageInner>, |
| } |
| |
| #[derive(Clone)] |
| // TODO: is `manifest_path` a relic? |
| struct PackageInner { |
| /// The package's manifest. |
| manifest: Manifest, |
| /// The root of the package. |
| manifest_path: PathBuf, |
| } |
| |
| impl Ord for Package { |
| fn cmp(&self, other: &Package) -> Ordering { |
| self.package_id().cmp(&other.package_id()) |
| } |
| } |
| |
| impl PartialOrd for Package { |
| fn partial_cmp(&self, other: &Package) -> Option<Ordering> { |
| Some(self.cmp(other)) |
| } |
| } |
| |
| /// A Package in a form where `Serialize` can be derived. |
| #[derive(Serialize)] |
| pub struct SerializedPackage { |
| name: InternedString, |
| version: Version, |
| id: PackageId, |
| license: Option<String>, |
| license_file: Option<String>, |
| description: Option<String>, |
| source: SourceId, |
| dependencies: Vec<Dependency>, |
| targets: Vec<Target>, |
| features: BTreeMap<InternedString, Vec<InternedString>>, |
| manifest_path: PathBuf, |
| metadata: Option<toml::Value>, |
| publish: Option<Vec<String>>, |
| authors: Vec<String>, |
| categories: Vec<String>, |
| keywords: Vec<String>, |
| readme: Option<String>, |
| repository: Option<String>, |
| homepage: Option<String>, |
| documentation: Option<String>, |
| edition: String, |
| links: Option<String>, |
| #[serde(skip_serializing_if = "Option::is_none")] |
| metabuild: Option<Vec<String>>, |
| default_run: Option<String>, |
| rust_version: Option<RustVersion>, |
| } |
| |
| impl Package { |
| /// Creates a package from a manifest and its location. |
| pub fn new(manifest: Manifest, manifest_path: &Path) -> Package { |
| Package { |
| inner: Rc::new(PackageInner { |
| manifest, |
| manifest_path: manifest_path.to_path_buf(), |
| }), |
| } |
| } |
| |
| /// Gets the manifest dependencies. |
| pub fn dependencies(&self) -> &[Dependency] { |
| self.manifest().dependencies() |
| } |
| /// Gets the manifest. |
| pub fn manifest(&self) -> &Manifest { |
| &self.inner.manifest |
| } |
| /// Gets the manifest. |
| pub fn manifest_mut(&mut self) -> &mut Manifest { |
| &mut Rc::make_mut(&mut self.inner).manifest |
| } |
| /// Gets the path to the manifest. |
| pub fn manifest_path(&self) -> &Path { |
| &self.inner.manifest_path |
| } |
| /// Gets the name of the package. |
| pub fn name(&self) -> InternedString { |
| self.package_id().name() |
| } |
| /// Gets the `PackageId` object for the package (fully defines a package). |
| pub fn package_id(&self) -> PackageId { |
| self.manifest().package_id() |
| } |
| /// Gets the root folder of the package. |
| pub fn root(&self) -> &Path { |
| self.manifest_path().parent().unwrap() |
| } |
| /// Gets the summary for the package. |
| pub fn summary(&self) -> &Summary { |
| self.manifest().summary() |
| } |
| /// Gets the targets specified in the manifest. |
| pub fn targets(&self) -> &[Target] { |
| self.manifest().targets() |
| } |
| /// Gets the library crate for this package, if it exists. |
| pub fn library(&self) -> Option<&Target> { |
| self.targets().iter().find(|t| t.is_lib()) |
| } |
| /// Gets the current package version. |
| pub fn version(&self) -> &Version { |
| self.package_id().version() |
| } |
| /// Gets the package authors. |
| pub fn authors(&self) -> &Vec<String> { |
| &self.manifest().metadata().authors |
| } |
| |
| /// Returns `None` if the package is set to publish. |
| /// Returns `Some(allowed_registries)` if publishing is limited to specified |
| /// registries or if package is set to not publish. |
| pub fn publish(&self) -> &Option<Vec<String>> { |
| self.manifest().publish() |
| } |
| /// Returns `true` if this package is a proc-macro. |
| pub fn proc_macro(&self) -> bool { |
| self.targets().iter().any(|target| target.proc_macro()) |
| } |
| /// Gets the package's minimum Rust version. |
| pub fn rust_version(&self) -> Option<&RustVersion> { |
| self.manifest().rust_version() |
| } |
| |
| /// Returns `true` if the package uses a custom build script for any target. |
| pub fn has_custom_build(&self) -> bool { |
| self.targets().iter().any(|t| t.is_custom_build()) |
| } |
| |
| pub fn map_source(self, to_replace: SourceId, replace_with: SourceId) -> Package { |
| Package { |
| inner: Rc::new(PackageInner { |
| manifest: self.manifest().clone().map_source(to_replace, replace_with), |
| manifest_path: self.manifest_path().to_owned(), |
| }), |
| } |
| } |
| |
| pub fn to_registry_toml(&self, ws: &Workspace<'_>) -> CargoResult<String> { |
| let manifest = self |
| .manifest() |
| .original() |
| .prepare_for_publish(ws, self.root())?; |
| let toml = toml::to_string_pretty(&manifest)?; |
| Ok(format!("{}\n{}", MANIFEST_PREAMBLE, toml)) |
| } |
| |
| /// Returns if package should include `Cargo.lock`. |
| pub fn include_lockfile(&self) -> bool { |
| self.targets().iter().any(|t| t.is_example() || t.is_bin()) |
| } |
| |
| pub fn serialized(&self) -> SerializedPackage { |
| let summary = self.manifest().summary(); |
| let package_id = summary.package_id(); |
| let manmeta = self.manifest().metadata(); |
| // Filter out metabuild targets. They are an internal implementation |
| // detail that is probably not relevant externally. There's also not a |
| // real path to show in `src_path`, and this avoids changing the format. |
| let targets: Vec<Target> = self |
| .manifest() |
| .targets() |
| .iter() |
| .filter(|t| t.src_path().is_path()) |
| .cloned() |
| .collect(); |
| // Convert Vec<FeatureValue> to Vec<InternedString> |
| let features = summary |
| .features() |
| .iter() |
| .map(|(k, v)| { |
| ( |
| *k, |
| v.iter() |
| .map(|fv| InternedString::new(&fv.to_string())) |
| .collect(), |
| ) |
| }) |
| .collect(); |
| |
| SerializedPackage { |
| name: package_id.name(), |
| version: package_id.version().clone(), |
| id: package_id, |
| license: manmeta.license.clone(), |
| license_file: manmeta.license_file.clone(), |
| description: manmeta.description.clone(), |
| source: summary.source_id(), |
| dependencies: summary.dependencies().to_vec(), |
| targets, |
| features, |
| manifest_path: self.manifest_path().to_path_buf(), |
| metadata: self.manifest().custom_metadata().cloned(), |
| authors: manmeta.authors.clone(), |
| categories: manmeta.categories.clone(), |
| keywords: manmeta.keywords.clone(), |
| readme: manmeta.readme.clone(), |
| repository: manmeta.repository.clone(), |
| homepage: manmeta.homepage.clone(), |
| documentation: manmeta.documentation.clone(), |
| edition: self.manifest().edition().to_string(), |
| links: self.manifest().links().map(|s| s.to_owned()), |
| metabuild: self.manifest().metabuild().cloned(), |
| publish: self.publish().as_ref().cloned(), |
| default_run: self.manifest().default_run().map(|s| s.to_owned()), |
| rust_version: self.rust_version().cloned(), |
| } |
| } |
| } |
| |
| impl fmt::Display for Package { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| write!(f, "{}", self.summary().package_id()) |
| } |
| } |
| |
| impl fmt::Debug for Package { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| f.debug_struct("Package") |
| .field("id", &self.summary().package_id()) |
| .field("..", &"..") |
| .finish() |
| } |
| } |
| |
| impl PartialEq for Package { |
| fn eq(&self, other: &Package) -> bool { |
| self.package_id() == other.package_id() |
| } |
| } |
| |
| impl Eq for Package {} |
| |
| impl hash::Hash for Package { |
| fn hash<H: hash::Hasher>(&self, into: &mut H) { |
| self.package_id().hash(into) |
| } |
| } |
| |
| /// A set of packages, with the intent to download. |
| /// |
| /// This is primarily used to convert a set of `PackageId`s to `Package`s. It |
| /// will download as needed, or used the cached download if available. |
| pub struct PackageSet<'cfg> { |
| packages: HashMap<PackageId, LazyCell<Package>>, |
| sources: RefCell<SourceMap<'cfg>>, |
| config: &'cfg Config, |
| multi: Multi, |
| /// Used to prevent reusing the PackageSet to download twice. |
| downloading: Cell<bool>, |
| /// Whether or not to use curl HTTP/2 multiplexing. |
| multiplexing: bool, |
| } |
| |
| /// Helper for downloading crates. |
| pub struct Downloads<'a, 'cfg> { |
| set: &'a PackageSet<'cfg>, |
| /// When a download is started, it is added to this map. The key is a |
| /// "token" (see `Download::token`). It is removed once the download is |
| /// finished. |
| pending: HashMap<usize, (Download<'cfg>, EasyHandle)>, |
| /// Set of packages currently being downloaded. This should stay in sync |
| /// with `pending`. |
| pending_ids: HashSet<PackageId>, |
| /// Downloads that have failed and are waiting to retry again later. |
| sleeping: SleepTracker<(Download<'cfg>, Easy)>, |
| /// The final result of each download. A pair `(token, result)`. This is a |
| /// temporary holding area, needed because curl can report multiple |
| /// downloads at once, but the main loop (`wait`) is written to only |
| /// handle one at a time. |
| results: Vec<(usize, Result<(), curl::Error>)>, |
| /// The next ID to use for creating a token (see `Download::token`). |
| next: usize, |
| /// Progress bar. |
| progress: RefCell<Option<Progress<'cfg>>>, |
| /// Number of downloads that have successfully finished. |
| downloads_finished: usize, |
| /// Total bytes for all successfully downloaded packages. |
| downloaded_bytes: u64, |
| /// Size (in bytes) and package name of the largest downloaded package. |
| largest: (u64, InternedString), |
| /// Time when downloading started. |
| start: Instant, |
| /// Indicates *all* downloads were successful. |
| success: bool, |
| |
| /// Timeout management, both of timeout thresholds as well as whether or not |
| /// our connection has timed out (and accompanying message if it has). |
| /// |
| /// Note that timeout management is done manually here instead of in libcurl |
| /// because we want to apply timeouts to an entire batch of operations, not |
| /// any one particular single operation. |
| timeout: HttpTimeout, |
| /// Last time bytes were received. |
| updated_at: Cell<Instant>, |
| /// This is a slow-speed check. It is reset to `now + timeout_duration` |
| /// every time at least `threshold` bytes are received. If the current |
| /// time ever exceeds `next_speed_check`, then give up and report a |
| /// timeout error. |
| next_speed_check: Cell<Instant>, |
| /// This is the slow-speed threshold byte count. It starts at the |
| /// configured threshold value (default 10), and is decremented by the |
| /// number of bytes received in each chunk. If it is <= zero, the |
| /// threshold has been met and data is being received fast enough not to |
| /// trigger a timeout; reset `next_speed_check` and set this back to the |
| /// configured threshold. |
| next_speed_check_bytes_threshold: Cell<u64>, |
| /// Global filesystem lock to ensure only one Cargo is downloading at a |
| /// time. |
| _lock: CacheLock<'cfg>, |
| } |
| |
| struct Download<'cfg> { |
| /// The token for this download, used as the key of the `Downloads::pending` map |
| /// and stored in `EasyHandle` as well. |
| token: usize, |
| |
| /// The package that we're downloading. |
| id: PackageId, |
| |
| /// Actual downloaded data, updated throughout the lifetime of this download. |
| data: RefCell<Vec<u8>>, |
| |
| /// HTTP headers for debugging. |
| headers: RefCell<Vec<String>>, |
| |
| /// The URL that we're downloading from, cached here for error messages and |
| /// reenqueuing. |
| url: String, |
| |
| /// A descriptive string to print when we've finished downloading this crate. |
| descriptor: String, |
| |
| /// Statistics updated from the progress callback in libcurl. |
| total: Cell<u64>, |
| current: Cell<u64>, |
| |
| /// The moment we started this transfer at. |
| start: Instant, |
| timed_out: Cell<Option<String>>, |
| |
| /// Logic used to track retrying this download if it's a spurious failure. |
| retry: Retry<'cfg>, |
| } |
| |
| impl<'cfg> PackageSet<'cfg> { |
| pub fn new( |
| package_ids: &[PackageId], |
| sources: SourceMap<'cfg>, |
| config: &'cfg Config, |
| ) -> CargoResult<PackageSet<'cfg>> { |
| // We've enabled the `http2` feature of `curl` in Cargo, so treat |
| // failures here as fatal as it would indicate a build-time problem. |
| let mut multi = Multi::new(); |
| let multiplexing = config.http_config()?.multiplexing.unwrap_or(true); |
| multi |
| .pipelining(false, multiplexing) |
| .with_context(|| "failed to enable multiplexing/pipelining in curl")?; |
| |
| // let's not flood crates.io with connections |
| multi.set_max_host_connections(2)?; |
| |
| Ok(PackageSet { |
| packages: package_ids |
| .iter() |
| .map(|&id| (id, LazyCell::new())) |
| .collect(), |
| sources: RefCell::new(sources), |
| config, |
| multi, |
| downloading: Cell::new(false), |
| multiplexing, |
| }) |
| } |
| |
| pub fn package_ids(&self) -> impl Iterator<Item = PackageId> + '_ { |
| self.packages.keys().cloned() |
| } |
| |
| pub fn packages(&self) -> impl Iterator<Item = &Package> { |
| self.packages.values().filter_map(|p| p.borrow()) |
| } |
| |
| pub fn enable_download<'a>(&'a self) -> CargoResult<Downloads<'a, 'cfg>> { |
| assert!(!self.downloading.replace(true)); |
| let timeout = HttpTimeout::new(self.config)?; |
| Ok(Downloads { |
| start: Instant::now(), |
| set: self, |
| next: 0, |
| pending: HashMap::new(), |
| pending_ids: HashSet::new(), |
| sleeping: SleepTracker::new(), |
| results: Vec::new(), |
| progress: RefCell::new(Some(Progress::with_style( |
| "Downloading", |
| ProgressStyle::Ratio, |
| self.config, |
| ))), |
| downloads_finished: 0, |
| downloaded_bytes: 0, |
| largest: (0, InternedString::new("")), |
| success: false, |
| updated_at: Cell::new(Instant::now()), |
| timeout, |
| next_speed_check: Cell::new(Instant::now()), |
| next_speed_check_bytes_threshold: Cell::new(0), |
| _lock: self |
| .config |
| .acquire_package_cache_lock(CacheLockMode::DownloadExclusive)?, |
| }) |
| } |
| |
| pub fn get_one(&self, id: PackageId) -> CargoResult<&Package> { |
| if let Some(pkg) = self.packages.get(&id).and_then(|slot| slot.borrow()) { |
| return Ok(pkg); |
| } |
| Ok(self.get_many(Some(id))?.remove(0)) |
| } |
| |
| pub fn get_many(&self, ids: impl IntoIterator<Item = PackageId>) -> CargoResult<Vec<&Package>> { |
| let mut pkgs = Vec::new(); |
| let _lock = self |
| .config |
| .acquire_package_cache_lock(CacheLockMode::DownloadExclusive)?; |
| let mut downloads = self.enable_download()?; |
| for id in ids { |
| pkgs.extend(downloads.start(id)?); |
| } |
| while downloads.remaining() > 0 { |
| pkgs.push(downloads.wait()?); |
| } |
| downloads.success = true; |
| Ok(pkgs) |
| } |
| |
| /// Downloads any packages accessible from the give root ids. |
| pub fn download_accessible( |
| &self, |
| resolve: &Resolve, |
| root_ids: &[PackageId], |
| has_dev_units: HasDevUnits, |
| requested_kinds: &[CompileKind], |
| target_data: &RustcTargetData<'cfg>, |
| force_all_targets: ForceAllTargets, |
| ) -> CargoResult<()> { |
| fn collect_used_deps( |
| used: &mut BTreeSet<PackageId>, |
| resolve: &Resolve, |
| pkg_id: PackageId, |
| has_dev_units: HasDevUnits, |
| requested_kinds: &[CompileKind], |
| target_data: &RustcTargetData<'_>, |
| force_all_targets: ForceAllTargets, |
| ) -> CargoResult<()> { |
| if !used.insert(pkg_id) { |
| return Ok(()); |
| } |
| let filtered_deps = PackageSet::filter_deps( |
| pkg_id, |
| resolve, |
| has_dev_units, |
| requested_kinds, |
| target_data, |
| force_all_targets, |
| ); |
| for (pkg_id, _dep) in filtered_deps { |
| collect_used_deps( |
| used, |
| resolve, |
| pkg_id, |
| has_dev_units, |
| requested_kinds, |
| target_data, |
| force_all_targets, |
| )?; |
| } |
| Ok(()) |
| } |
| |
| // This is sorted by PackageId to get consistent behavior and error |
| // messages for Cargo's testsuite. Perhaps there is a better ordering |
| // that optimizes download time? |
| let mut to_download = BTreeSet::new(); |
| |
| for id in root_ids { |
| collect_used_deps( |
| &mut to_download, |
| resolve, |
| *id, |
| has_dev_units, |
| requested_kinds, |
| target_data, |
| force_all_targets, |
| )?; |
| } |
| self.get_many(to_download.into_iter())?; |
| Ok(()) |
| } |
| |
| /// Check if there are any dependency packages that violate artifact constraints |
| /// to instantly abort, or that do not have any libs which results in warnings. |
| pub(crate) fn warn_no_lib_packages_and_artifact_libs_overlapping_deps( |
| &self, |
| ws: &Workspace<'cfg>, |
| resolve: &Resolve, |
| root_ids: &[PackageId], |
| has_dev_units: HasDevUnits, |
| requested_kinds: &[CompileKind], |
| target_data: &RustcTargetData<'_>, |
| force_all_targets: ForceAllTargets, |
| ) -> CargoResult<()> { |
| let no_lib_pkgs: BTreeMap<PackageId, Vec<(&Package, &HashSet<Dependency>)>> = root_ids |
| .iter() |
| .map(|&root_id| { |
| let dep_pkgs_to_deps: Vec<_> = PackageSet::filter_deps( |
| root_id, |
| resolve, |
| has_dev_units, |
| requested_kinds, |
| target_data, |
| force_all_targets, |
| ) |
| .collect(); |
| |
| let dep_pkgs_and_deps = dep_pkgs_to_deps |
| .into_iter() |
| .filter(|(_id, deps)| deps.iter().any(|dep| dep.maybe_lib())) |
| .filter_map(|(dep_package_id, deps)| { |
| self.get_one(dep_package_id).ok().and_then(|dep_pkg| { |
| (!dep_pkg.targets().iter().any(|t| t.is_lib())).then(|| (dep_pkg, deps)) |
| }) |
| }) |
| .collect(); |
| (root_id, dep_pkgs_and_deps) |
| }) |
| .collect(); |
| |
| for (pkg_id, dep_pkgs) in no_lib_pkgs { |
| for (_dep_pkg_without_lib_target, deps) in dep_pkgs { |
| for dep in deps.iter().filter(|dep| { |
| dep.artifact() |
| .map(|artifact| artifact.is_lib()) |
| .unwrap_or(true) |
| }) { |
| ws.config().shell().warn(&format!( |
| "{} ignoring invalid dependency `{}` which is missing a lib target", |
| pkg_id, |
| dep.name_in_toml(), |
| ))?; |
| } |
| } |
| } |
| Ok(()) |
| } |
| |
| fn filter_deps<'a>( |
| pkg_id: PackageId, |
| resolve: &'a Resolve, |
| has_dev_units: HasDevUnits, |
| requested_kinds: &'a [CompileKind], |
| target_data: &'a RustcTargetData<'_>, |
| force_all_targets: ForceAllTargets, |
| ) -> impl Iterator<Item = (PackageId, &'a HashSet<Dependency>)> + 'a { |
| resolve |
| .deps(pkg_id) |
| .filter(move |&(_id, deps)| { |
| deps.iter().any(|dep| { |
| if dep.kind() == DepKind::Development && has_dev_units == HasDevUnits::No { |
| return false; |
| } |
| if force_all_targets == ForceAllTargets::No { |
| let activated = requested_kinds |
| .iter() |
| .chain(Some(&CompileKind::Host)) |
| .any(|kind| target_data.dep_platform_activated(dep, *kind)); |
| if !activated { |
| return false; |
| } |
| } |
| true |
| }) |
| }) |
| .into_iter() |
| } |
| |
| pub fn sources(&self) -> Ref<'_, SourceMap<'cfg>> { |
| self.sources.borrow() |
| } |
| |
| pub fn sources_mut(&self) -> RefMut<'_, SourceMap<'cfg>> { |
| self.sources.borrow_mut() |
| } |
| |
| /// Merge the given set into self. |
| pub fn add_set(&mut self, set: PackageSet<'cfg>) { |
| assert!(!self.downloading.get()); |
| assert!(!set.downloading.get()); |
| for (pkg_id, p_cell) in set.packages { |
| self.packages.entry(pkg_id).or_insert(p_cell); |
| } |
| let mut sources = self.sources.borrow_mut(); |
| let other_sources = set.sources.into_inner(); |
| sources.add_source_map(other_sources); |
| } |
| } |
| |
| impl<'a, 'cfg> Downloads<'a, 'cfg> { |
| /// Starts to download the package for the `id` specified. |
| /// |
| /// Returns `None` if the package is queued up for download and will |
| /// eventually be returned from `wait_for_download`. Returns `Some(pkg)` if |
| /// the package is ready and doesn't need to be downloaded. |
| pub fn start(&mut self, id: PackageId) -> CargoResult<Option<&'a Package>> { |
| self.start_inner(id) |
| .with_context(|| format!("failed to download `{}`", id)) |
| } |
| |
| fn start_inner(&mut self, id: PackageId) -> CargoResult<Option<&'a Package>> { |
| // First up see if we've already cached this package, in which case |
| // there's nothing to do. |
| let slot = self |
| .set |
| .packages |
| .get(&id) |
| .ok_or_else(|| internal(format!("couldn't find `{}` in package set", id)))?; |
| if let Some(pkg) = slot.borrow() { |
| return Ok(Some(pkg)); |
| } |
| |
| // Ask the original source for this `PackageId` for the corresponding |
| // package. That may immediately come back and tell us that the package |
| // is ready, or it could tell us that it needs to be downloaded. |
| let mut sources = self.set.sources.borrow_mut(); |
| let source = sources |
| .get_mut(id.source_id()) |
| .ok_or_else(|| internal(format!("couldn't find source for `{}`", id)))?; |
| let pkg = source |
| .download(id) |
| .with_context(|| "unable to get packages from source")?; |
| let (url, descriptor, authorization) = match pkg { |
| MaybePackage::Ready(pkg) => { |
| debug!("{} doesn't need a download", id); |
| assert!(slot.fill(pkg).is_ok()); |
| return Ok(Some(slot.borrow().unwrap())); |
| } |
| MaybePackage::Download { |
| url, |
| descriptor, |
| authorization, |
| } => (url, descriptor, authorization), |
| }; |
| |
| // Ok we're going to download this crate, so let's set up all our |
| // internal state and hand off an `Easy` handle to our libcurl `Multi` |
| // handle. This won't actually start the transfer, but later it'll |
| // happen during `wait_for_download` |
| let token = self.next; |
| self.next += 1; |
| debug!(target: "network", "downloading {} as {}", id, token); |
| assert!(self.pending_ids.insert(id)); |
| |
| let (mut handle, _timeout) = http_handle_and_timeout(self.set.config)?; |
| handle.get(true)?; |
| handle.url(&url)?; |
| handle.follow_location(true)?; // follow redirects |
| |
| // Add authorization header. |
| if let Some(authorization) = authorization { |
| let mut headers = curl::easy::List::new(); |
| headers.append(&format!("Authorization: {}", authorization))?; |
| handle.http_headers(headers)?; |
| } |
| |
| // Enable HTTP/2 if possible. |
| crate::try_old_curl_http2_pipewait!(self.set.multiplexing, handle); |
| |
| handle.write_function(move |buf| { |
| debug!(target: "network", "{} - {} bytes of data", token, buf.len()); |
| tls::with(|downloads| { |
| if let Some(downloads) = downloads { |
| downloads.pending[&token] |
| .0 |
| .data |
| .borrow_mut() |
| .extend_from_slice(buf); |
| } |
| }); |
| Ok(buf.len()) |
| })?; |
| handle.header_function(move |data| { |
| tls::with(|downloads| { |
| if let Some(downloads) = downloads { |
| // Headers contain trailing \r\n, trim them to make it easier |
| // to work with. |
| let h = String::from_utf8_lossy(data).trim().to_string(); |
| downloads.pending[&token].0.headers.borrow_mut().push(h); |
| } |
| }); |
| true |
| })?; |
| |
| handle.progress(true)?; |
| handle.progress_function(move |dl_total, dl_cur, _, _| { |
| tls::with(|downloads| match downloads { |
| Some(d) => d.progress(token, dl_total as u64, dl_cur as u64), |
| None => false, |
| }) |
| })?; |
| |
| // If the progress bar isn't enabled then it may be awhile before the |
| // first crate finishes downloading so we inform immediately that we're |
| // downloading crates here. |
| if self.downloads_finished == 0 |
| && self.pending.is_empty() |
| && !self.progress.borrow().as_ref().unwrap().is_enabled() |
| { |
| self.set |
| .config |
| .shell() |
| .status("Downloading", "crates ...")?; |
| } |
| |
| let dl = Download { |
| token, |
| data: RefCell::new(Vec::new()), |
| headers: RefCell::new(Vec::new()), |
| id, |
| url, |
| descriptor, |
| total: Cell::new(0), |
| current: Cell::new(0), |
| start: Instant::now(), |
| timed_out: Cell::new(None), |
| retry: Retry::new(self.set.config)?, |
| }; |
| self.enqueue(dl, handle)?; |
| self.tick(WhyTick::DownloadStarted)?; |
| |
| Ok(None) |
| } |
| |
| /// Returns the number of crates that are still downloading. |
| pub fn remaining(&self) -> usize { |
| self.pending.len() + self.sleeping.len() |
| } |
| |
| /// Blocks the current thread waiting for a package to finish downloading. |
| /// |
| /// This method will wait for a previously enqueued package to finish |
| /// downloading and return a reference to it after it's done downloading. |
| /// |
| /// # Panics |
| /// |
| /// This function will panic if there are no remaining downloads. |
| pub fn wait(&mut self) -> CargoResult<&'a Package> { |
| let (dl, data) = loop { |
| assert_eq!(self.pending.len(), self.pending_ids.len()); |
| let (token, result) = self.wait_for_curl()?; |
| debug!(target: "network", "{} finished with {:?}", token, result); |
| |
| let (mut dl, handle) = self |
| .pending |
| .remove(&token) |
| .expect("got a token for a non-in-progress transfer"); |
| let data = mem::take(&mut *dl.data.borrow_mut()); |
| let headers = mem::take(&mut *dl.headers.borrow_mut()); |
| let mut handle = self.set.multi.remove(handle)?; |
| self.pending_ids.remove(&dl.id); |
| |
| // Check if this was a spurious error. If it was a spurious error |
| // then we want to re-enqueue our request for another attempt and |
| // then we wait for another request to finish. |
| let ret = { |
| let timed_out = &dl.timed_out; |
| let url = &dl.url; |
| dl.retry.r#try(|| { |
| if let Err(e) = result { |
| // If this error is "aborted by callback" then that's |
| // probably because our progress callback aborted due to |
| // a timeout. We'll find out by looking at the |
| // `timed_out` field, looking for a descriptive message. |
| // If one is found we switch the error code (to ensure |
| // it's flagged as spurious) and then attach our extra |
| // information to the error. |
| if !e.is_aborted_by_callback() { |
| return Err(e.into()); |
| } |
| |
| return Err(match timed_out.replace(None) { |
| Some(msg) => { |
| let code = curl_sys::CURLE_OPERATION_TIMEDOUT; |
| let mut err = curl::Error::new(code); |
| err.set_extra(msg); |
| err |
| } |
| None => e, |
| } |
| .into()); |
| } |
| |
| let code = handle.response_code()?; |
| if code != 200 && code != 0 { |
| return Err(HttpNotSuccessful::new_from_handle( |
| &mut handle, |
| &url, |
| data, |
| headers, |
| ) |
| .into()); |
| } |
| Ok(data) |
| }) |
| }; |
| match ret { |
| RetryResult::Success(data) => break (dl, data), |
| RetryResult::Err(e) => { |
| return Err(e.context(format!("failed to download from `{}`", dl.url))) |
| } |
| RetryResult::Retry(sleep) => { |
| debug!(target: "network", "download retry {} for {sleep}ms", dl.url); |
| self.sleeping.push(sleep, (dl, handle)); |
| } |
| } |
| }; |
| |
| // If the progress bar isn't enabled then we still want to provide some |
| // semblance of progress of how we're downloading crates, and if the |
| // progress bar is enabled this provides a good log of what's happening. |
| self.progress.borrow_mut().as_mut().unwrap().clear(); |
| self.set |
| .config |
| .shell() |
| .status("Downloaded", &dl.descriptor)?; |
| |
| self.downloads_finished += 1; |
| self.downloaded_bytes += dl.total.get(); |
| if dl.total.get() > self.largest.0 { |
| self.largest = (dl.total.get(), dl.id.name()); |
| } |
| |
| // We're about to synchronously extract the crate below. While we're |
| // doing that our download progress won't actually be updated, nor do we |
| // have a great view into the progress of the extraction. Let's prepare |
| // the user for this CPU-heavy step if it looks like it'll take some |
| // time to do so. |
| if dl.total.get() < ByteSize::kb(400).0 { |
| self.tick(WhyTick::DownloadFinished)?; |
| } else { |
| self.tick(WhyTick::Extracting(&dl.id.name()))?; |
| } |
| |
| // Inform the original source that the download is finished which |
| // should allow us to actually get the package and fill it in now. |
| let mut sources = self.set.sources.borrow_mut(); |
| let source = sources |
| .get_mut(dl.id.source_id()) |
| .ok_or_else(|| internal(format!("couldn't find source for `{}`", dl.id)))?; |
| let start = Instant::now(); |
| let pkg = source.finish_download(dl.id, data)?; |
| |
| // Assume that no time has passed while we were calling |
| // `finish_download`, update all speed checks and timeout limits of all |
| // active downloads to make sure they don't fire because of a slowly |
| // extracted tarball. |
| let finish_dur = start.elapsed(); |
| self.updated_at.set(self.updated_at.get() + finish_dur); |
| self.next_speed_check |
| .set(self.next_speed_check.get() + finish_dur); |
| |
| let slot = &self.set.packages[&dl.id]; |
| assert!(slot.fill(pkg).is_ok()); |
| Ok(slot.borrow().unwrap()) |
| } |
| |
| fn enqueue(&mut self, dl: Download<'cfg>, handle: Easy) -> CargoResult<()> { |
| let mut handle = self.set.multi.add(handle)?; |
| let now = Instant::now(); |
| handle.set_token(dl.token)?; |
| self.updated_at.set(now); |
| self.next_speed_check.set(now + self.timeout.dur); |
| self.next_speed_check_bytes_threshold |
| .set(u64::from(self.timeout.low_speed_limit)); |
| dl.timed_out.set(None); |
| dl.current.set(0); |
| dl.total.set(0); |
| self.pending.insert(dl.token, (dl, handle)); |
| Ok(()) |
| } |
| |
| /// Block, waiting for curl. Returns a token and a `Result` for that token |
| /// (`Ok` means the download successfully finished). |
| fn wait_for_curl(&mut self) -> CargoResult<(usize, Result<(), curl::Error>)> { |
| // This is the main workhorse loop. We use libcurl's portable `wait` |
| // method to actually perform blocking. This isn't necessarily too |
| // efficient in terms of fd management, but we should only be juggling |
| // a few anyway. |
| // |
| // Here we start off by asking the `multi` handle to do some work via |
| // the `perform` method. This will actually do I/O work (non-blocking) |
| // and attempt to make progress. Afterwards we ask about the `messages` |
| // contained in the handle which will inform us if anything has finished |
| // transferring. |
| // |
| // If we've got a finished transfer after all that work we break out |
| // and process the finished transfer at the end. Otherwise we need to |
| // actually block waiting for I/O to happen, which we achieve with the |
| // `wait` method on `multi`. |
| loop { |
| self.add_sleepers()?; |
| let n = tls::set(self, || { |
| self.set |
| .multi |
| .perform() |
| .with_context(|| "failed to perform http requests") |
| })?; |
| debug!(target: "network", "handles remaining: {}", n); |
| let results = &mut self.results; |
| let pending = &self.pending; |
| self.set.multi.messages(|msg| { |
| let token = msg.token().expect("failed to read token"); |
| let handle = &pending[&token].1; |
| if let Some(result) = msg.result_for(handle) { |
| results.push((token, result)); |
| } else { |
| debug!(target: "network", "message without a result (?)"); |
| } |
| }); |
| |
| if let Some(pair) = results.pop() { |
| break Ok(pair); |
| } |
| assert_ne!(self.remaining(), 0); |
| if self.pending.is_empty() { |
| let delay = self.sleeping.time_to_next().unwrap(); |
| debug!(target: "network", "sleeping main thread for {delay:?}"); |
| std::thread::sleep(delay); |
| } else { |
| let min_timeout = Duration::new(1, 0); |
| let timeout = self.set.multi.get_timeout()?.unwrap_or(min_timeout); |
| let timeout = timeout.min(min_timeout); |
| self.set |
| .multi |
| .wait(&mut [], timeout) |
| .with_context(|| "failed to wait on curl `Multi`")?; |
| } |
| } |
| } |
| |
| fn add_sleepers(&mut self) -> CargoResult<()> { |
| for (dl, handle) in self.sleeping.to_retry() { |
| self.pending_ids.insert(dl.id); |
| self.enqueue(dl, handle)?; |
| } |
| Ok(()) |
| } |
| |
| fn progress(&self, token: usize, total: u64, cur: u64) -> bool { |
| let dl = &self.pending[&token].0; |
| dl.total.set(total); |
| let now = Instant::now(); |
| if cur > dl.current.get() { |
| let delta = cur - dl.current.get(); |
| let threshold = self.next_speed_check_bytes_threshold.get(); |
| |
| dl.current.set(cur); |
| self.updated_at.set(now); |
| |
| if delta >= threshold { |
| self.next_speed_check.set(now + self.timeout.dur); |
| self.next_speed_check_bytes_threshold |
| .set(u64::from(self.timeout.low_speed_limit)); |
| } else { |
| self.next_speed_check_bytes_threshold.set(threshold - delta); |
| } |
| } |
| if self.tick(WhyTick::DownloadUpdate).is_err() { |
| return false; |
| } |
| |
| // If we've spent too long not actually receiving any data we time out. |
| if now > self.updated_at.get() + self.timeout.dur { |
| self.updated_at.set(now); |
| let msg = format!( |
| "failed to download any data for `{}` within {}s", |
| dl.id, |
| self.timeout.dur.as_secs() |
| ); |
| dl.timed_out.set(Some(msg)); |
| return false; |
| } |
| |
| // If we reached the point in time that we need to check our speed |
| // limit, see if we've transferred enough data during this threshold. If |
| // it fails this check then we fail because the download is going too |
| // slowly. |
| if now >= self.next_speed_check.get() { |
| self.next_speed_check.set(now + self.timeout.dur); |
| assert!(self.next_speed_check_bytes_threshold.get() > 0); |
| let msg = format!( |
| "download of `{}` failed to transfer more \ |
| than {} bytes in {}s", |
| dl.id, |
| self.timeout.low_speed_limit, |
| self.timeout.dur.as_secs() |
| ); |
| dl.timed_out.set(Some(msg)); |
| return false; |
| } |
| |
| true |
| } |
| |
| fn tick(&self, why: WhyTick<'_>) -> CargoResult<()> { |
| let mut progress = self.progress.borrow_mut(); |
| let progress = progress.as_mut().unwrap(); |
| |
| if let WhyTick::DownloadUpdate = why { |
| if !progress.update_allowed() { |
| return Ok(()); |
| } |
| } |
| let pending = self.remaining(); |
| let mut msg = if pending == 1 { |
| format!("{} crate", pending) |
| } else { |
| format!("{} crates", pending) |
| }; |
| match why { |
| WhyTick::Extracting(krate) => { |
| msg.push_str(&format!(", extracting {} ...", krate)); |
| } |
| _ => { |
| let mut dur = Duration::new(0, 0); |
| let mut remaining = 0; |
| for (dl, _) in self.pending.values() { |
| dur += dl.start.elapsed(); |
| // If the total/current look weird just throw out the data |
| // point, sounds like curl has more to learn before we have |
| // the true information. |
| if dl.total.get() >= dl.current.get() { |
| remaining += dl.total.get() - dl.current.get(); |
| } |
| } |
| if remaining > 0 && dur > Duration::from_millis(500) { |
| msg.push_str(&format!(", remaining bytes: {}", ByteSize(remaining))); |
| } |
| } |
| } |
| progress.print_now(&msg) |
| } |
| } |
| |
| #[derive(Copy, Clone)] |
| enum WhyTick<'a> { |
| DownloadStarted, |
| DownloadUpdate, |
| DownloadFinished, |
| Extracting(&'a str), |
| } |
| |
| impl<'a, 'cfg> Drop for Downloads<'a, 'cfg> { |
| fn drop(&mut self) { |
| self.set.downloading.set(false); |
| let progress = self.progress.get_mut().take().unwrap(); |
| // Don't print a download summary if we're not using a progress bar, |
| // we've already printed lots of `Downloading...` items. |
| if !progress.is_enabled() { |
| return; |
| } |
| // If we didn't download anything, no need for a summary. |
| if self.downloads_finished == 0 { |
| return; |
| } |
| // If an error happened, let's not clutter up the output. |
| if !self.success { |
| return; |
| } |
| // pick the correct plural of crate(s) |
| let crate_string = if self.downloads_finished == 1 { |
| "crate" |
| } else { |
| "crates" |
| }; |
| let mut status = format!( |
| "{} {} ({}) in {}", |
| self.downloads_finished, |
| crate_string, |
| ByteSize(self.downloaded_bytes), |
| util::elapsed(self.start.elapsed()) |
| ); |
| // print the size of largest crate if it was >1mb |
| // however don't print if only a single crate was downloaded |
| // because it is obvious that it will be the largest then |
| if self.largest.0 > ByteSize::mb(1).0 && self.downloads_finished > 1 { |
| status.push_str(&format!( |
| " (largest was `{}` at {})", |
| self.largest.1, |
| ByteSize(self.largest.0), |
| )); |
| } |
| // Clear progress before displaying final summary. |
| drop(progress); |
| drop(self.set.config.shell().status("Downloaded", status)); |
| } |
| } |
| |
| mod tls { |
| use std::cell::Cell; |
| |
| use super::Downloads; |
| |
| thread_local!(static PTR: Cell<usize> = Cell::new(0)); |
| |
| pub(crate) fn with<R>(f: impl FnOnce(Option<&Downloads<'_, '_>>) -> R) -> R { |
| let ptr = PTR.with(|p| p.get()); |
| if ptr == 0 { |
| f(None) |
| } else { |
| unsafe { f(Some(&*(ptr as *const Downloads<'_, '_>))) } |
| } |
| } |
| |
| pub(crate) fn set<R>(dl: &Downloads<'_, '_>, f: impl FnOnce() -> R) -> R { |
| struct Reset<'a, T: Copy>(&'a Cell<T>, T); |
| |
| impl<'a, T: Copy> Drop for Reset<'a, T> { |
| fn drop(&mut self) { |
| self.0.set(self.1); |
| } |
| } |
| |
| PTR.with(|p| { |
| let _reset = Reset(p, p.get()); |
| p.set(dl as *const Downloads<'_, '_> as usize); |
| f() |
| }) |
| } |
| } |