blob: 3d31110c38e92e15435439cb8a4e1eb9498fed2f [file] [log] [blame]
//! Access to a HTTP-based crate registry. See [`HttpRegistry`] for details.
use crate::core::{PackageId, SourceId};
use crate::sources::registry::download;
use crate::sources::registry::MaybeLock;
use crate::sources::registry::{LoadResponse, RegistryConfig, RegistryData};
use crate::util::cache_lock::CacheLockMode;
use crate::util::errors::{CargoResult, HttpNotSuccessful};
use crate::util::network::http::http_handle;
use crate::util::network::retry::{Retry, RetryResult};
use crate::util::network::sleep::SleepTracker;
use crate::util::{auth, Config, Filesystem, IntoUrl, Progress, ProgressStyle};
use anyhow::Context;
use cargo_credential::Operation;
use cargo_util::paths;
use curl::easy::{Easy, List};
use curl::multi::{EasyHandle, Multi};
use std::cell::RefCell;
use std::collections::{HashMap, HashSet};
use std::fs::{self, File};
use std::io::ErrorKind;
use std::path::{Path, PathBuf};
use std::str;
use std::task::{ready, Poll};
use std::time::Duration;
use tracing::{debug, trace};
use url::Url;
// HTTP headers
const ETAG: &'static str = "etag";
const LAST_MODIFIED: &'static str = "last-modified";
const WWW_AUTHENTICATE: &'static str = "www-authenticate";
const IF_NONE_MATCH: &'static str = "if-none-match";
const IF_MODIFIED_SINCE: &'static str = "if-modified-since";
const UNKNOWN: &'static str = "Unknown";
/// A registry served by the HTTP-based registry API.
///
/// This type is primarily accessed through the [`RegistryData`] trait.
///
/// `HttpRegistry` implements the HTTP-based registry API outlined in [RFC 2789]. Read the RFC for
/// the complete protocol, but _roughly_ the implementation loads each index file (e.g.,
/// config.json or re/ge/regex) from an HTTP service rather than from a locally cloned git
/// repository. The remote service can more or less be a static file server that simply serves the
/// contents of the origin git repository.
///
/// Implemented naively, this leads to a significant amount of network traffic, as a lookup of any
/// index file would need to check with the remote backend if the index file has changed. This
/// cost is somewhat mitigated by the use of HTTP conditional fetches (`If-Modified-Since` and
/// `If-None-Match` for `ETag`s) which can be efficiently handled by HTTP/2.
///
/// [RFC 2789]: https://github.com/rust-lang/rfcs/pull/2789
pub struct HttpRegistry<'cfg> {
/// Path to the registry index (`$CARGO_HOME/registry/index/$REG-HASH`).
///
/// To be fair, `HttpRegistry` doesn't store the registry index it
/// downloads on the file system, but other cached data like registry
/// configuration could be stored here.
index_path: Filesystem,
/// Path to the cache of `.crate` files (`$CARGO_HOME/registry/cache/$REG-HASH`).
cache_path: Filesystem,
/// The unique identifier of this registry source.
source_id: SourceId,
config: &'cfg Config,
/// Store the server URL without the protocol prefix (sparse+)
url: Url,
/// HTTP multi-handle for asynchronous/parallel requests.
multi: Multi,
/// Has the client requested a cache update?
///
/// Only if they have do we double-check the freshness of each locally-stored index file.
requested_update: bool,
/// State for currently pending index downloads.
downloads: Downloads<'cfg>,
/// Does the config say that we can use HTTP multiplexing?
multiplexing: bool,
/// What paths have we already fetched since the last index update?
///
/// We do not need to double-check any of these index files since we have already done so.
fresh: HashSet<PathBuf>,
/// Have we started to download any index files?
fetch_started: bool,
/// Cached registry configuration.
registry_config: Option<RegistryConfig>,
/// Should we include the authorization header?
auth_required: bool,
/// Url to get a token for the registry.
login_url: Option<Url>,
/// Headers received with an HTTP 401.
auth_error_headers: Vec<String>,
/// Disables status messages.
quiet: bool,
}
/// State for currently pending index file downloads.
struct Downloads<'cfg> {
/// When a download is started, it is added to this map. The key is a
/// "token" (see [`Download::token`]). It is removed once the download is
/// finished.
pending: HashMap<usize, (Download<'cfg>, EasyHandle)>,
/// Set of paths currently being downloaded.
/// This should stay in sync with the `pending` field.
pending_paths: HashSet<PathBuf>,
/// Downloads that have failed and are waiting to retry again later.
sleeping: SleepTracker<(Download<'cfg>, Easy)>,
/// The final result of each download.
results: HashMap<PathBuf, CargoResult<CompletedDownload>>,
/// The next ID to use for creating a token (see [`Download::token`]).
next: usize,
/// Progress bar.
progress: RefCell<Option<Progress<'cfg>>>,
/// Number of downloads that have successfully finished.
downloads_finished: usize,
/// Number of times the caller has requested blocking. This is used for
/// an estimate of progress.
blocking_calls: usize,
}
/// Represents a single index file download, including its progress and retry.
struct Download<'cfg> {
/// The token for this download, used as the key of the
/// [`Downloads::pending`] map and stored in [`EasyHandle`] as well.
token: usize,
/// The path of the package that we're downloading.
path: PathBuf,
/// Actual downloaded data, updated throughout the lifetime of this download.
data: RefCell<Vec<u8>>,
/// HTTP headers.
header_map: RefCell<Headers>,
/// Logic used to track retrying this download if it's a spurious failure.
retry: Retry<'cfg>,
}
/// HTTPS headers [`HttpRegistry`] cares about.
#[derive(Default)]
struct Headers {
last_modified: Option<String>,
etag: Option<String>,
www_authenticate: Vec<String>,
/// All headers, including explicit headers above.
all: Vec<String>,
}
/// HTTP status code [`HttpRegistry`] cares about.
enum StatusCode {
Success,
NotModified,
NotFound,
Unauthorized,
}
/// Represents a complete [`Download`] from an HTTP request.
///
/// Usually it is constructed in [`HttpRegistry::handle_completed_downloads`],
/// and then returns to the caller of [`HttpRegistry::load()`].
struct CompletedDownload {
response_code: StatusCode,
data: Vec<u8>,
header_map: Headers,
}
impl<'cfg> HttpRegistry<'cfg> {
/// Creates a HTTP-rebased remote registry for `source_id`.
///
/// * `name` --- Name of a path segment where `.crate` tarballs and the
/// registry index are stored. Expect to be unique.
pub fn new(
source_id: SourceId,
config: &'cfg Config,
name: &str,
) -> CargoResult<HttpRegistry<'cfg>> {
let url = source_id.url().as_str();
// Ensure the url ends with a slash so we can concatenate paths.
if !url.ends_with('/') {
anyhow::bail!("sparse registry url must end in a slash `/`: {url}")
}
assert!(source_id.is_sparse());
let url = url
.strip_prefix("sparse+")
.expect("sparse registry needs sparse+ prefix")
.into_url()
.expect("a url with the sparse+ stripped should still be valid");
Ok(HttpRegistry {
index_path: config.registry_index_path().join(name),
cache_path: config.registry_cache_path().join(name),
source_id,
config,
url,
multi: Multi::new(),
multiplexing: false,
downloads: Downloads {
next: 0,
pending: HashMap::new(),
pending_paths: HashSet::new(),
sleeping: SleepTracker::new(),
results: HashMap::new(),
progress: RefCell::new(Some(Progress::with_style(
"Fetch",
ProgressStyle::Indeterminate,
config,
))),
downloads_finished: 0,
blocking_calls: 0,
},
fresh: HashSet::new(),
requested_update: false,
fetch_started: false,
registry_config: None,
auth_required: false,
login_url: None,
auth_error_headers: vec![],
quiet: false,
})
}
/// Splits HTTP `HEADER: VALUE` to a tuple.
fn handle_http_header(buf: &[u8]) -> Option<(&str, &str)> {
if buf.is_empty() {
return None;
}
let buf = std::str::from_utf8(buf).ok()?.trim_end();
// Don't let server sneak extra lines anywhere.
if buf.contains('\n') {
return None;
}
let (tag, value) = buf.split_once(':')?;
let value = value.trim();
Some((tag, value))
}
/// Setup the necessary works before the first fetch gets started.
///
/// This is a no-op if called more than one time.
fn start_fetch(&mut self) -> CargoResult<()> {
if self.fetch_started {
// We only need to run the setup code once.
return Ok(());
}
self.fetch_started = true;
// We've enabled the `http2` feature of `curl` in Cargo, so treat
// failures here as fatal as it would indicate a build-time problem.
self.multiplexing = self.config.http_config()?.multiplexing.unwrap_or(true);
self.multi
.pipelining(false, self.multiplexing)
.with_context(|| "failed to enable multiplexing/pipelining in curl")?;
// let's not flood the server with connections
self.multi.set_max_host_connections(2)?;
if !self.quiet {
self.config
.shell()
.status("Updating", self.source_id.display_index())?;
}
Ok(())
}
/// Checks the results inside the [`HttpRegistry::multi`] handle, and
/// updates relevant state in [`HttpRegistry::downloads`] accordingly.
fn handle_completed_downloads(&mut self) -> CargoResult<()> {
assert_eq!(
self.downloads.pending.len(),
self.downloads.pending_paths.len()
);
// Collect the results from the Multi handle.
let results = {
let mut results = Vec::new();
let pending = &mut self.downloads.pending;
self.multi.messages(|msg| {
let token = msg.token().expect("failed to read token");
let (_, handle) = &pending[&token];
if let Some(result) = msg.result_for(handle) {
results.push((token, result));
};
});
results
};
for (token, result) in results {
let (mut download, handle) = self.downloads.pending.remove(&token).unwrap();
let was_present = self.downloads.pending_paths.remove(&download.path);
assert!(
was_present,
"expected pending_paths to contain {:?}",
download.path
);
let mut handle = self.multi.remove(handle)?;
let data = download.data.take();
let url = self.full_url(&download.path);
let result = match download.retry.r#try(|| {
result.with_context(|| format!("failed to download from `{}`", url))?;
let code = handle.response_code()?;
// Keep this list of expected status codes in sync with the codes handled in `load`
let code = match code {
200 => StatusCode::Success,
304 => StatusCode::NotModified,
401 => StatusCode::Unauthorized,
404 | 410 | 451 => StatusCode::NotFound,
_ => {
return Err(HttpNotSuccessful::new_from_handle(
&mut handle,
&url,
data,
download.header_map.take().all,
)
.into());
}
};
Ok((data, code))
}) {
RetryResult::Success((data, code)) => Ok(CompletedDownload {
response_code: code,
data,
header_map: download.header_map.take(),
}),
RetryResult::Err(e) => Err(e),
RetryResult::Retry(sleep) => {
debug!(target: "network", "download retry {:?} for {sleep}ms", download.path);
self.downloads.sleeping.push(sleep, (download, handle));
continue;
}
};
self.downloads.results.insert(download.path, result);
self.downloads.downloads_finished += 1;
}
self.downloads.tick()?;
Ok(())
}
/// Constructs the full URL to download a index file.
fn full_url(&self, path: &Path) -> String {
// self.url always ends with a slash.
format!("{}{}", self.url, path.display())
}
/// Check if an index file of `path` is up-to-date.
///
/// The `path` argument is the same as in [`RegistryData::load`].
fn is_fresh(&self, path: &Path) -> bool {
if !self.requested_update {
trace!(
"using local {} as user did not request update",
path.display()
);
true
} else if self.config.cli_unstable().no_index_update {
trace!("using local {} in no_index_update mode", path.display());
true
} else if self.config.offline() {
trace!("using local {} in offline mode", path.display());
true
} else if self.fresh.contains(path) {
trace!("using local {} as it was already fetched", path.display());
true
} else {
debug!("checking freshness of {}", path.display());
false
}
}
/// Get the cached registry configuration, if it exists.
fn config_cached(&mut self) -> CargoResult<Option<&RegistryConfig>> {
if self.registry_config.is_some() {
return Ok(self.registry_config.as_ref());
}
let config_json_path = self
.assert_index_locked(&self.index_path)
.join(RegistryConfig::NAME);
match fs::read(&config_json_path) {
Ok(raw_data) => match serde_json::from_slice(&raw_data) {
Ok(json) => {
self.registry_config = Some(json);
}
Err(e) => tracing::debug!("failed to decode cached config.json: {}", e),
},
Err(e) => {
if e.kind() != ErrorKind::NotFound {
tracing::debug!("failed to read config.json cache: {}", e)
}
}
}
Ok(self.registry_config.as_ref())
}
/// Get the registry configuration from either cache or remote.
fn config(&mut self) -> Poll<CargoResult<&RegistryConfig>> {
debug!("loading config");
let index_path = self.assert_index_locked(&self.index_path);
let config_json_path = index_path.join(RegistryConfig::NAME);
if self.is_fresh(Path::new(RegistryConfig::NAME)) && self.config_cached()?.is_some() {
return Poll::Ready(Ok(self.registry_config.as_ref().unwrap()));
}
match ready!(self.load(Path::new(""), Path::new(RegistryConfig::NAME), None)?) {
LoadResponse::Data {
raw_data,
index_version: _,
} => {
trace!("config loaded");
self.registry_config = Some(serde_json::from_slice(&raw_data)?);
if paths::create_dir_all(&config_json_path.parent().unwrap()).is_ok() {
if let Err(e) = fs::write(&config_json_path, &raw_data) {
tracing::debug!("failed to write config.json cache: {}", e);
}
}
Poll::Ready(Ok(self.registry_config.as_ref().unwrap()))
}
LoadResponse::NotFound => {
Poll::Ready(Err(anyhow::anyhow!("config.json not found in registry")))
}
LoadResponse::CacheValid => Poll::Ready(Err(crate::util::internal(
"config.json is never stored in the index cache",
))),
}
}
/// Moves failed [`Download`]s that are ready to retry to the pending queue.
fn add_sleepers(&mut self) -> CargoResult<()> {
for (dl, handle) in self.downloads.sleeping.to_retry() {
let mut handle = self.multi.add(handle)?;
handle.set_token(dl.token)?;
let is_new = self.downloads.pending_paths.insert(dl.path.to_path_buf());
assert!(is_new, "path queued for download more than once");
let previous = self.downloads.pending.insert(dl.token, (dl, handle));
assert!(previous.is_none(), "dl token queued more than once");
}
Ok(())
}
}
impl<'cfg> RegistryData for HttpRegistry<'cfg> {
fn prepare(&self) -> CargoResult<()> {
Ok(())
}
fn index_path(&self) -> &Filesystem {
&self.index_path
}
fn assert_index_locked<'a>(&self, path: &'a Filesystem) -> &'a Path {
self.config
.assert_package_cache_locked(CacheLockMode::DownloadExclusive, path)
}
fn is_updated(&self) -> bool {
self.requested_update
}
fn load(
&mut self,
_root: &Path,
path: &Path,
index_version: Option<&str>,
) -> Poll<CargoResult<LoadResponse>> {
trace!("load: {}", path.display());
if let Some(_token) = self.downloads.pending_paths.get(path) {
debug!("dependency is still pending: {}", path.display());
return Poll::Pending;
}
if let Some(index_version) = index_version {
trace!(
"local cache of {} is available at version `{}`",
path.display(),
index_version
);
if self.is_fresh(path) {
return Poll::Ready(Ok(LoadResponse::CacheValid));
}
} else if self.fresh.contains(path) {
// We have no cached copy of this file, and we already downloaded it.
debug!(
"cache did not contain previously downloaded file {}",
path.display()
);
return Poll::Ready(Ok(LoadResponse::NotFound));
}
if self.config.offline() || self.config.cli_unstable().no_index_update {
// Return NotFound in offline mode when the file doesn't exist in the cache.
// If this results in resolution failure, the resolver will suggest
// removing the --offline flag.
return Poll::Ready(Ok(LoadResponse::NotFound));
}
if let Some(result) = self.downloads.results.remove(path) {
let result =
result.with_context(|| format!("download of {} failed", path.display()))?;
let is_new = self.fresh.insert(path.to_path_buf());
assert!(
is_new,
"downloaded the index file `{}` twice",
path.display()
);
// The status handled here need to be kept in sync with the codes handled
// in `handle_completed_downloads`
match result.response_code {
StatusCode::Success => {
let response_index_version = if let Some(etag) = result.header_map.etag {
format!("{}: {}", ETAG, etag)
} else if let Some(lm) = result.header_map.last_modified {
format!("{}: {}", LAST_MODIFIED, lm)
} else {
UNKNOWN.to_string()
};
trace!("index file version: {}", response_index_version);
return Poll::Ready(Ok(LoadResponse::Data {
raw_data: result.data,
index_version: Some(response_index_version),
}));
}
StatusCode::NotModified => {
// Not Modified: the data in the cache is still the latest.
if index_version.is_none() {
return Poll::Ready(Err(anyhow::anyhow!(
"server said not modified (HTTP 304) when no local cache exists"
)));
}
return Poll::Ready(Ok(LoadResponse::CacheValid));
}
StatusCode::NotFound => {
// The crate was not found or deleted from the registry.
return Poll::Ready(Ok(LoadResponse::NotFound));
}
StatusCode::Unauthorized
if !self.auth_required && path == Path::new(RegistryConfig::NAME) =>
{
debug!(target: "network", "re-attempting request for config.json with authorization included.");
self.fresh.remove(path);
self.auth_required = true;
// Look for a `www-authenticate` header with the `Cargo` scheme.
for header in &result.header_map.www_authenticate {
for challenge in http_auth::ChallengeParser::new(header) {
match challenge {
Ok(challenge) if challenge.scheme.eq_ignore_ascii_case("Cargo") => {
// Look for the `login_url` parameter.
for (param, value) in challenge.params {
if param.eq_ignore_ascii_case("login_url") {
self.login_url = Some(value.to_unescaped().into_url()?);
}
}
}
Ok(challenge) => {
debug!(target: "network", "ignoring non-Cargo challenge: {}", challenge.scheme)
}
Err(e) => {
debug!(target: "network", "failed to parse challenge: {}", e)
}
}
}
}
self.auth_error_headers = result.header_map.all;
}
StatusCode::Unauthorized => {
let err = Err(HttpNotSuccessful {
code: 401,
body: result.data,
url: self.full_url(path),
ip: None,
headers: result.header_map.all,
}
.into());
if self.auth_required {
let auth_error = auth::AuthorizationError::new(
self.config,
self.source_id,
self.login_url.clone(),
auth::AuthorizationErrorReason::TokenRejected,
)?;
return Poll::Ready(err.context(auth_error));
} else {
return Poll::Ready(err);
}
}
}
}
if path != Path::new(RegistryConfig::NAME) {
self.auth_required = ready!(self.config()?).auth_required;
} else if !self.auth_required {
// Check if there's a cached config that says auth is required.
// This allows avoiding the initial unauthenticated request to probe.
if let Some(config) = self.config_cached()? {
self.auth_required = config.auth_required;
}
}
// Looks like we're going to have to do a network request.
self.start_fetch()?;
let mut handle = http_handle(self.config)?;
let full_url = self.full_url(path);
debug!(target: "network", "fetch {}", full_url);
handle.get(true)?;
handle.url(&full_url)?;
handle.follow_location(true)?;
// Enable HTTP/2 if possible.
crate::try_old_curl_http2_pipewait!(self.multiplexing, handle);
let mut headers = List::new();
// Include a header to identify the protocol. This allows the server to
// know that Cargo is attempting to use the sparse protocol.
headers.append("cargo-protocol: version=1")?;
headers.append("accept: text/plain")?;
// If we have a cached copy of the file, include IF_NONE_MATCH or IF_MODIFIED_SINCE header.
if let Some(index_version) = index_version {
if let Some((key, value)) = index_version.split_once(':') {
match key {
ETAG => headers.append(&format!("{}: {}", IF_NONE_MATCH, value.trim()))?,
LAST_MODIFIED => {
headers.append(&format!("{}: {}", IF_MODIFIED_SINCE, value.trim()))?
}
_ => debug!("unexpected index version: {}", index_version),
}
}
}
if self.auth_required {
let authorization = auth::auth_token(
self.config,
&self.source_id,
self.login_url.as_ref(),
Operation::Read,
self.auth_error_headers.clone(),
true,
)?;
headers.append(&format!("Authorization: {}", authorization))?;
trace!(target: "network", "including authorization for {}", full_url);
}
handle.http_headers(headers)?;
// We're going to have a bunch of downloads all happening "at the same time".
// So, we need some way to track what headers/data/responses are for which request.
// We do that through this token. Each request (and associated response) gets one.
let token = self.downloads.next;
self.downloads.next += 1;
debug!(target: "network", "downloading {} as {}", path.display(), token);
let is_new = self.downloads.pending_paths.insert(path.to_path_buf());
assert!(is_new, "path queued for download more than once");
// Each write should go to self.downloads.pending[&token].data.
// Since the write function must be 'static, we access downloads through a thread-local.
// That thread-local is set up in `block_until_ready` when it calls self.multi.perform,
// which is what ultimately calls this method.
handle.write_function(move |buf| {
trace!(target: "network", "{} - {} bytes of data", token, buf.len());
tls::with(|downloads| {
if let Some(downloads) = downloads {
downloads.pending[&token]
.0
.data
.borrow_mut()
.extend_from_slice(buf);
}
});
Ok(buf.len())
})?;
// And ditto for the header function.
handle.header_function(move |buf| {
if let Some((tag, value)) = Self::handle_http_header(buf) {
tls::with(|downloads| {
if let Some(downloads) = downloads {
let mut header_map = downloads.pending[&token].0.header_map.borrow_mut();
header_map.all.push(format!("{tag}: {value}"));
match tag.to_ascii_lowercase().as_str() {
LAST_MODIFIED => header_map.last_modified = Some(value.to_string()),
ETAG => header_map.etag = Some(value.to_string()),
WWW_AUTHENTICATE => header_map.www_authenticate.push(value.to_string()),
_ => {}
}
}
});
}
true
})?;
let dl = Download {
token,
path: path.to_path_buf(),
data: RefCell::new(Vec::new()),
header_map: Default::default(),
retry: Retry::new(self.config)?,
};
// Finally add the request we've lined up to the pool of requests that cURL manages.
let mut handle = self.multi.add(handle)?;
handle.set_token(token)?;
self.downloads.pending.insert(dl.token, (dl, handle));
Poll::Pending
}
fn config(&mut self) -> Poll<CargoResult<Option<RegistryConfig>>> {
let cfg = ready!(self.config()?).clone();
Poll::Ready(Ok(Some(cfg)))
}
fn invalidate_cache(&mut self) {
// Actually updating the index is more or less a no-op for this implementation.
// All it does is ensure that a subsequent load will double-check files with the
// server rather than rely on a locally cached copy of the index files.
debug!("invalidated index cache");
self.fresh.clear();
self.requested_update = true;
}
fn set_quiet(&mut self, quiet: bool) {
self.quiet = quiet;
self.downloads.progress.replace(None);
}
fn download(&mut self, pkg: PackageId, checksum: &str) -> CargoResult<MaybeLock> {
let registry_config = loop {
match self.config()? {
Poll::Pending => self.block_until_ready()?,
Poll::Ready(cfg) => break cfg.to_owned(),
}
};
download::download(
&self.cache_path,
&self.config,
pkg,
checksum,
registry_config,
)
}
fn finish_download(
&mut self,
pkg: PackageId,
checksum: &str,
data: &[u8],
) -> CargoResult<File> {
download::finish_download(&self.cache_path, &self.config, pkg, checksum, data)
}
fn is_crate_downloaded(&self, pkg: PackageId) -> bool {
download::is_crate_downloaded(&self.cache_path, &self.config, pkg)
}
fn block_until_ready(&mut self) -> CargoResult<()> {
trace!(target: "network",
"block_until_ready: {} transfers pending",
self.downloads.pending.len()
);
self.downloads.blocking_calls += 1;
loop {
self.handle_completed_downloads()?;
self.add_sleepers()?;
let remaining_in_multi = tls::set(&self.downloads, || {
self.multi
.perform()
.with_context(|| "failed to perform http requests")
})?;
trace!(target: "network", "{} transfers remaining", remaining_in_multi);
if remaining_in_multi + self.downloads.sleeping.len() as u32 == 0 {
return Ok(());
}
if self.downloads.pending.is_empty() {
let delay = self.downloads.sleeping.time_to_next().unwrap();
debug!(target: "network", "sleeping main thread for {delay:?}");
std::thread::sleep(delay);
} else {
// We have no more replies to provide the caller with,
// so we need to wait until cURL has something new for us.
let timeout = self
.multi
.get_timeout()?
.unwrap_or_else(|| Duration::new(1, 0));
self.multi
.wait(&mut [], timeout)
.with_context(|| "failed to wait on curl `Multi`")?;
}
}
}
}
impl<'cfg> Downloads<'cfg> {
/// Updates the state of the progress bar for downloads.
fn tick(&self) -> CargoResult<()> {
let mut progress = self.progress.borrow_mut();
let Some(progress) = progress.as_mut() else {
return Ok(());
};
// Since the sparse protocol discovers dependencies as it goes,
// it's not possible to get an accurate progress indication.
//
// As an approximation, we assume that the depth of the dependency graph
// is fixed, and base the progress on how many times the caller has asked
// for blocking. If there are actually additional dependencies, the progress
// bar will get stuck. If there are fewer dependencies, it will disappear
// early. It will never go backwards.
//
// The status text also contains the number of completed & pending requests, which
// gives an better indication of forward progress.
let approximate_tree_depth = 10;
progress.tick(
self.blocking_calls.min(approximate_tree_depth),
approximate_tree_depth + 1,
&format!(
" {} complete; {} pending",
self.downloads_finished,
self.pending.len() + self.sleeping.len()
),
)
}
}
mod tls {
use super::Downloads;
use std::cell::Cell;
thread_local!(static PTR: Cell<usize> = Cell::new(0));
pub(super) fn with<R>(f: impl FnOnce(Option<&Downloads<'_>>) -> R) -> R {
let ptr = PTR.with(|p| p.get());
if ptr == 0 {
f(None)
} else {
// Safety: * `ptr` is only set by `set` below which ensures the type is correct.
let ptr = unsafe { &*(ptr as *const Downloads<'_>) };
f(Some(ptr))
}
}
pub(super) fn set<R>(dl: &Downloads<'_>, f: impl FnOnce() -> R) -> R {
struct Reset<'a, T: Copy>(&'a Cell<T>, T);
impl<'a, T: Copy> Drop for Reset<'a, T> {
fn drop(&mut self) {
self.0.set(self.1);
}
}
PTR.with(|p| {
let _reset = Reset(p, p.get());
p.set(dl as *const Downloads<'_> as usize);
f()
})
}
}