use triomphe::Arc; | |
use crate::{DatabaseKeyIndex, RuntimeId}; | |
use parking_lot::{Condvar, MutexGuard}; | |
use rustc_hash::FxHashMap; | |
use smallvec::SmallVec; | |
use super::{ActiveQuery, WaitResult}; | |
type QueryStack = Vec<ActiveQuery>; | |
#[derive(Debug, Default)] | |
pub(super) struct DependencyGraph { | |
/// A `(K -> V)` pair in this map indicates that the the runtime | |
/// `K` is blocked on some query executing in the runtime `V`. | |
/// This encodes a graph that must be acyclic (or else deadlock | |
/// will result). | |
edges: FxHashMap<RuntimeId, Edge>, | |
/// Encodes the `RuntimeId` that are blocked waiting for the result | |
/// of a given query. | |
query_dependents: FxHashMap<DatabaseKeyIndex, SmallVec<[RuntimeId; 4]>>, | |
/// When a key K completes which had dependent queries Qs blocked on it, | |
/// it stores its `WaitResult` here. As they wake up, each query Q in Qs will | |
/// come here to fetch their results. | |
wait_results: FxHashMap<RuntimeId, (QueryStack, WaitResult)>, | |
} | |
#[derive(Debug)] | |
struct Edge { | |
blocked_on_id: RuntimeId, | |
blocked_on_key: DatabaseKeyIndex, | |
stack: QueryStack, | |
/// Signalled whenever a query with dependents completes. | |
/// Allows those dependents to check if they are ready to unblock. | |
condvar: Arc<parking_lot::Condvar>, | |
} | |
impl DependencyGraph { | |
/// True if `from_id` depends on `to_id`. | |
/// | |
/// (i.e., there is a path from `from_id` to `to_id` in the graph.) | |
pub(super) fn depends_on(&mut self, from_id: RuntimeId, to_id: RuntimeId) -> bool { | |
let mut p = from_id; | |
while let Some(q) = self.edges.get(&p).map(|edge| edge.blocked_on_id) { | |
if q == to_id { | |
return true; | |
} | |
p = q; | |
} | |
p == to_id | |
} | |
/// Invokes `closure` with a `&mut ActiveQuery` for each query that participates in the cycle. | |
/// The cycle runs as follows: | |
/// | |
/// 1. The runtime `from_id`, which has the stack `from_stack`, would like to invoke `database_key`... | |
/// 2. ...but `database_key` is already being executed by `to_id`... | |
/// 3. ...and `to_id` is transitively dependent on something which is present on `from_stack`. | |
pub(super) fn for_each_cycle_participant( | |
&mut self, | |
from_id: RuntimeId, | |
from_stack: &mut QueryStack, | |
database_key: DatabaseKeyIndex, | |
to_id: RuntimeId, | |
mut closure: impl FnMut(&mut [ActiveQuery]), | |
) { | |
debug_assert!(self.depends_on(to_id, from_id)); | |
// To understand this algorithm, consider this [drawing](https://is.gd/TGLI9v): | |
// | |
// database_key = QB2 | |
// from_id = A | |
// to_id = B | |
// from_stack = [QA1, QA2, QA3] | |
// | |
// self.edges[B] = { C, QC2, [QB1..QB3] } | |
// self.edges[C] = { A, QA2, [QC1..QC3] } | |
// | |
// The cyclic | |
// edge we have | |
// failed to add. | |
// : | |
// A : B C | |
// : | |
// QA1 v QB1 QC1 | |
// ┌► QA2 ┌──► QB2 ┌─► QC2 | |
// │ QA3 ───┘ QB3 ──┘ QC3 ───┐ | |
// │ │ | |
// └───────────────────────────────┘ | |
// | |
// Final output: [QB2, QB3, QC2, QC3, QA2, QA3] | |
let mut id = to_id; | |
let mut key = database_key; | |
while id != from_id { | |
// Looking at the diagram above, the idea is to | |
// take the edge from `to_id` starting at `key` | |
// (inclusive) and down to the end. We can then | |
// load up the next thread (i.e., we start at B/QB2, | |
// and then load up the dependency on C/QC2). | |
let edge = self.edges.get_mut(&id).unwrap(); | |
let prefix = edge | |
.stack | |
.iter_mut() | |
.take_while(|p| p.database_key_index != key) | |
.count(); | |
closure(&mut edge.stack[prefix..]); | |
id = edge.blocked_on_id; | |
key = edge.blocked_on_key; | |
} | |
// Finally, we copy in the results from `from_stack`. | |
let prefix = from_stack | |
.iter_mut() | |
.take_while(|p| p.database_key_index != key) | |
.count(); | |
closure(&mut from_stack[prefix..]); | |
} | |
/// Unblock each blocked runtime (excluding the current one) if some | |
/// query executing in that runtime is participating in cycle fallback. | |
/// | |
/// Returns a boolean (Current, Others) where: | |
/// * Current is true if the current runtime has cycle participants | |
/// with fallback; | |
/// * Others is true if other runtimes were unblocked. | |
pub(super) fn maybe_unblock_runtimes_in_cycle( | |
&mut self, | |
from_id: RuntimeId, | |
from_stack: &QueryStack, | |
database_key: DatabaseKeyIndex, | |
to_id: RuntimeId, | |
) -> (bool, bool) { | |
// See diagram in `for_each_cycle_participant`. | |
let mut id = to_id; | |
let mut key = database_key; | |
let mut others_unblocked = false; | |
while id != from_id { | |
let edge = self.edges.get(&id).unwrap(); | |
let prefix = edge | |
.stack | |
.iter() | |
.take_while(|p| p.database_key_index != key) | |
.count(); | |
let next_id = edge.blocked_on_id; | |
let next_key = edge.blocked_on_key; | |
if let Some(cycle) = edge.stack[prefix..] | |
.iter() | |
.rev() | |
.find_map(|aq| aq.cycle.clone()) | |
{ | |
// Remove `id` from the list of runtimes blocked on `next_key`: | |
self.query_dependents | |
.get_mut(&next_key) | |
.unwrap() | |
.retain(|r| *r != id); | |
// Unblock runtime so that it can resume execution once lock is released: | |
self.unblock_runtime(id, WaitResult::Cycle(cycle)); | |
others_unblocked = true; | |
} | |
id = next_id; | |
key = next_key; | |
} | |
let prefix = from_stack | |
.iter() | |
.take_while(|p| p.database_key_index != key) | |
.count(); | |
let this_unblocked = from_stack[prefix..].iter().any(|aq| aq.cycle.is_some()); | |
(this_unblocked, others_unblocked) | |
} | |
/// Modifies the graph so that `from_id` is blocked | |
/// on `database_key`, which is being computed by | |
/// `to_id`. | |
/// | |
/// For this to be reasonable, the lock on the | |
/// results table for `database_key` must be held. | |
/// This ensures that computing `database_key` doesn't | |
/// complete before `block_on` executes. | |
/// | |
/// Preconditions: | |
/// * No path from `to_id` to `from_id` | |
/// (i.e., `me.depends_on(to_id, from_id)` is false) | |
/// * `held_mutex` is a read lock (or stronger) on `database_key` | |
pub(super) fn block_on<QueryMutexGuard>( | |
mut me: MutexGuard<'_, Self>, | |
from_id: RuntimeId, | |
database_key: DatabaseKeyIndex, | |
to_id: RuntimeId, | |
from_stack: QueryStack, | |
query_mutex_guard: QueryMutexGuard, | |
) -> (QueryStack, WaitResult) { | |
let condvar = me.add_edge(from_id, database_key, to_id, from_stack); | |
// Release the mutex that prevents `database_key` | |
// from completing, now that the edge has been added. | |
drop(query_mutex_guard); | |
loop { | |
if let Some(stack_and_result) = me.wait_results.remove(&from_id) { | |
debug_assert!(!me.edges.contains_key(&from_id)); | |
return stack_and_result; | |
} | |
condvar.wait(&mut me); | |
} | |
} | |
/// Helper for `block_on`: performs actual graph modification | |
/// to add a dependency edge from `from_id` to `to_id`, which is | |
/// computing `database_key`. | |
fn add_edge( | |
&mut self, | |
from_id: RuntimeId, | |
database_key: DatabaseKeyIndex, | |
to_id: RuntimeId, | |
from_stack: QueryStack, | |
) -> Arc<parking_lot::Condvar> { | |
assert_ne!(from_id, to_id); | |
debug_assert!(!self.edges.contains_key(&from_id)); | |
debug_assert!(!self.depends_on(to_id, from_id)); | |
let condvar = Arc::new(Condvar::new()); | |
self.edges.insert( | |
from_id, | |
Edge { | |
blocked_on_id: to_id, | |
blocked_on_key: database_key, | |
stack: from_stack, | |
condvar: condvar.clone(), | |
}, | |
); | |
self.query_dependents | |
.entry(database_key) | |
.or_default() | |
.push(from_id); | |
condvar | |
} | |
/// Invoked when runtime `to_id` completes executing | |
/// `database_key`. | |
pub(super) fn unblock_runtimes_blocked_on( | |
&mut self, | |
database_key: DatabaseKeyIndex, | |
wait_result: WaitResult, | |
) { | |
let dependents = self | |
.query_dependents | |
.remove(&database_key) | |
.unwrap_or_default(); | |
for from_id in dependents { | |
self.unblock_runtime(from_id, wait_result.clone()); | |
} | |
} | |
/// Unblock the runtime with the given id with the given wait-result. | |
/// This will cause it resume execution (though it will have to grab | |
/// the lock on this data structure first, to recover the wait result). | |
fn unblock_runtime(&mut self, id: RuntimeId, wait_result: WaitResult) { | |
let edge = self.edges.remove(&id).expect("not blocked"); | |
self.wait_results.insert(id, (edge.stack, wait_result)); | |
// Now that we have inserted the `wait_results`, | |
// notify the thread. | |
edge.condvar.notify_one(); | |
} | |
} |