blob: 710d42c8d472aab4652d05312eeb89dbbe4a5ea2 [file] [log] [blame]
use super::*;
use std::usize;
#[derive(Debug)]
pub(super) struct Counts {
/// Acting as a client or server. This allows us to track which values to
/// inc / dec.
peer: peer::Dyn,
/// Maximum number of locally initiated streams
max_send_streams: usize,
/// Current number of remote initiated streams
num_send_streams: usize,
/// Maximum number of remote initiated streams
max_recv_streams: usize,
/// Current number of locally initiated streams
num_recv_streams: usize,
/// Maximum number of pending locally reset streams
max_local_reset_streams: usize,
/// Current number of pending locally reset streams
num_local_reset_streams: usize,
/// Max number of "pending accept" streams that were remotely reset
max_remote_reset_streams: usize,
/// Current number of "pending accept" streams that were remotely reset
num_remote_reset_streams: usize,
/// Maximum number of locally reset streams due to protocol error across
/// the lifetime of the connection.
///
/// When this gets exceeded, we issue GOAWAYs.
max_local_error_reset_streams: Option<usize>,
/// Total number of locally reset streams due to protocol error across the
/// lifetime of the connection.
num_local_error_reset_streams: usize,
}
impl Counts {
/// Create a new `Counts` using the provided configuration values.
pub fn new(peer: peer::Dyn, config: &Config) -> Self {
Counts {
peer,
max_send_streams: config.initial_max_send_streams,
num_send_streams: 0,
max_recv_streams: config.remote_max_initiated.unwrap_or(usize::MAX),
num_recv_streams: 0,
max_local_reset_streams: config.local_reset_max,
num_local_reset_streams: 0,
max_remote_reset_streams: config.remote_reset_max,
num_remote_reset_streams: 0,
max_local_error_reset_streams: config.local_max_error_reset_streams,
num_local_error_reset_streams: 0,
}
}
/// Returns true when the next opened stream will reach capacity of outbound streams
///
/// The number of client send streams is incremented in prioritize; send_request has to guess if
/// it should wait before allowing another request to be sent.
pub fn next_send_stream_will_reach_capacity(&self) -> bool {
self.max_send_streams <= (self.num_send_streams + 1)
}
/// Returns the current peer
pub fn peer(&self) -> peer::Dyn {
self.peer
}
pub fn has_streams(&self) -> bool {
self.num_send_streams != 0 || self.num_recv_streams != 0
}
/// Returns true if we can issue another local reset due to protocol error.
pub fn can_inc_num_local_error_resets(&self) -> bool {
if let Some(max) = self.max_local_error_reset_streams {
max > self.num_local_error_reset_streams
} else {
true
}
}
pub fn inc_num_local_error_resets(&mut self) {
assert!(self.can_inc_num_local_error_resets());
// Increment the number of remote initiated streams
self.num_local_error_reset_streams += 1;
}
pub(crate) fn max_local_error_resets(&self) -> Option<usize> {
self.max_local_error_reset_streams
}
/// Returns true if the receive stream concurrency can be incremented
pub fn can_inc_num_recv_streams(&self) -> bool {
self.max_recv_streams > self.num_recv_streams
}
/// Increments the number of concurrent receive streams.
///
/// # Panics
///
/// Panics on failure as this should have been validated before hand.
pub fn inc_num_recv_streams(&mut self, stream: &mut store::Ptr) {
assert!(self.can_inc_num_recv_streams());
assert!(!stream.is_counted);
// Increment the number of remote initiated streams
self.num_recv_streams += 1;
stream.is_counted = true;
}
/// Returns true if the send stream concurrency can be incremented
pub fn can_inc_num_send_streams(&self) -> bool {
self.max_send_streams > self.num_send_streams
}
/// Increments the number of concurrent send streams.
///
/// # Panics
///
/// Panics on failure as this should have been validated before hand.
pub fn inc_num_send_streams(&mut self, stream: &mut store::Ptr) {
assert!(self.can_inc_num_send_streams());
assert!(!stream.is_counted);
// Increment the number of remote initiated streams
self.num_send_streams += 1;
stream.is_counted = true;
}
/// Returns true if the number of pending reset streams can be incremented.
pub fn can_inc_num_reset_streams(&self) -> bool {
self.max_local_reset_streams > self.num_local_reset_streams
}
/// Increments the number of pending reset streams.
///
/// # Panics
///
/// Panics on failure as this should have been validated before hand.
pub fn inc_num_reset_streams(&mut self) {
assert!(self.can_inc_num_reset_streams());
self.num_local_reset_streams += 1;
}
pub(crate) fn max_remote_reset_streams(&self) -> usize {
self.max_remote_reset_streams
}
/// Returns true if the number of pending REMOTE reset streams can be
/// incremented.
pub(crate) fn can_inc_num_remote_reset_streams(&self) -> bool {
self.max_remote_reset_streams > self.num_remote_reset_streams
}
/// Increments the number of pending REMOTE reset streams.
///
/// # Panics
///
/// Panics on failure as this should have been validated before hand.
pub(crate) fn inc_num_remote_reset_streams(&mut self) {
assert!(self.can_inc_num_remote_reset_streams());
self.num_remote_reset_streams += 1;
}
pub(crate) fn dec_num_remote_reset_streams(&mut self) {
assert!(self.num_remote_reset_streams > 0);
self.num_remote_reset_streams -= 1;
}
pub fn apply_remote_settings(&mut self, settings: &frame::Settings) {
if let Some(val) = settings.max_concurrent_streams() {
self.max_send_streams = val as usize;
}
}
/// Run a block of code that could potentially transition a stream's state.
///
/// If the stream state transitions to closed, this function will perform
/// all necessary cleanup.
///
/// TODO: Is this function still needed?
pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
where
F: FnOnce(&mut Self, &mut store::Ptr) -> U,
{
// TODO: Does this need to be computed before performing the action?
let is_pending_reset = stream.is_pending_reset_expiration();
// Run the action
let ret = f(self, &mut stream);
self.transition_after(stream, is_pending_reset);
ret
}
// TODO: move this to macro?
pub fn transition_after(&mut self, mut stream: store::Ptr, is_reset_counted: bool) {
tracing::trace!(
"transition_after; stream={:?}; state={:?}; is_closed={:?}; \
pending_send_empty={:?}; buffered_send_data={}; \
num_recv={}; num_send={}",
stream.id,
stream.state,
stream.is_closed(),
stream.pending_send.is_empty(),
stream.buffered_send_data,
self.num_recv_streams,
self.num_send_streams
);
if stream.is_closed() {
if !stream.is_pending_reset_expiration() {
stream.unlink();
if is_reset_counted {
self.dec_num_reset_streams();
}
}
if stream.is_counted {
tracing::trace!("dec_num_streams; stream={:?}", stream.id);
// Decrement the number of active streams.
self.dec_num_streams(&mut stream);
}
}
// Release the stream if it requires releasing
if stream.is_released() {
stream.remove();
}
}
/// Returns the maximum number of streams that can be initiated by this
/// peer.
pub(crate) fn max_send_streams(&self) -> usize {
self.max_send_streams
}
/// Returns the maximum number of streams that can be initiated by the
/// remote peer.
pub(crate) fn max_recv_streams(&self) -> usize {
self.max_recv_streams
}
fn dec_num_streams(&mut self, stream: &mut store::Ptr) {
assert!(stream.is_counted);
if self.peer.is_local_init(stream.id) {
assert!(self.num_send_streams > 0);
self.num_send_streams -= 1;
stream.is_counted = false;
} else {
assert!(self.num_recv_streams > 0);
self.num_recv_streams -= 1;
stream.is_counted = false;
}
}
fn dec_num_reset_streams(&mut self) {
assert!(self.num_local_reset_streams > 0);
self.num_local_reset_streams -= 1;
}
}
impl Drop for Counts {
fn drop(&mut self) {
use std::thread;
if !thread::panicking() {
debug_assert!(!self.has_streams());
}
}
}