blob: 88f1bca4b9de08a7be0eab5010427be3932c35cb [file] [log] [blame]
use super::*;
use super::store::Resolve;
use frame::{Reason, StreamId};
use codec::UserError;
use codec::UserError::*;
use bytes::buf::Take;
use std::{cmp, fmt, mem};
use std::io;
/// # Warning
/// Queued streams are ordered by stream ID, as we need to ensure that
/// lower-numbered streams are sent headers before higher-numbered ones.
/// This is because "idle" stream IDs – those which have been initiated but
/// have yet to receive frames – will be implicitly closed on receipt of a
/// frame on a higher stream ID. If these queues was not ordered by stream
/// IDs, some mechanism would be necessary to ensure that the lowest-numberedh]
/// idle stream is opened first.
pub(super) struct Prioritize {
/// Queue of streams waiting for socket capacity to send a frame.
pending_send: store::Queue<stream::NextSend>,
/// Queue of streams waiting for window capacity to produce data.
pending_capacity: store::Queue<stream::NextSendCapacity>,
/// Streams waiting for capacity due to max concurrency
/// The `SendRequest` handle is `Clone`. This enables initiating requests
/// from many tasks. However, offering this capability while supporting
/// backpressure at some level is tricky. If there are many `SendRequest`
/// handles and a single stream becomes available, which handle gets
/// assigned that stream? Maybe that handle is no longer ready to send a
/// request.
/// The strategy used is to allow each `SendRequest` handle one buffered
/// request. A `SendRequest` handle is ready to send a request if it has no
/// associated buffered requests. This is the same strategy as `mpsc` in the
/// futures library.
pending_open: store::Queue<stream::NextOpen>,
/// Connection level flow control governing sent data
flow: FlowControl,
/// Stream ID of the last stream opened.
last_opened_id: StreamId,
/// What `DATA` frame is currently being sent in the codec.
in_flight_data_frame: InFlightData,
#[derive(Debug, Eq, PartialEq)]
enum InFlightData {
/// There is no `DATA` frame in flight.
/// There is a `DATA` frame in flight belonging to the given stream.
/// There was a `DATA` frame, but the stream's queue was since cleared.
pub(crate) struct Prioritized<B> {
// The buffer
inner: Take<B>,
end_of_stream: bool,
// The stream that this is associated with
stream: store::Key,
// ===== impl Prioritize =====
impl Prioritize {
pub fn new(config: &Config) -> Prioritize {
let mut flow = FlowControl::new();
.expect("invalid initial window size");
trace!("Prioritize::new; flow={:?}", flow);
Prioritize {
pending_send: store::Queue::new(),
pending_capacity: store::Queue::new(),
pending_open: store::Queue::new(),
flow: flow,
last_opened_id: StreamId::ZERO,
in_flight_data_frame: InFlightData::Nothing,
/// Queue a frame to be sent to the remote
pub fn queue_frame<B>(
&mut self,
frame: Frame<B>,
buffer: &mut Buffer<Frame<B>>,
stream: &mut store::Ptr,
task: &mut Option<Task>,
) {
// Queue the frame in the buffer
stream.pending_send.push_back(buffer, frame);
self.schedule_send(stream, task);
pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Task>) {
// If the stream is waiting to be opened, nothing more to do.
if !stream.is_pending_open {
trace!("schedule_send; {:?}",;
// Queue the stream
// Notify the connection.
if let Some(task) = task.take() {
pub fn queue_open(&mut self, stream: &mut store::Ptr) {
/// Send a data frame
pub fn send_data<B>(
&mut self,
frame: frame::Data<B>,
buffer: &mut Buffer<Frame<B>>,
stream: &mut store::Ptr,
counts: &mut Counts,
task: &mut Option<Task>,
) -> Result<(), UserError>
B: Buf,
let sz = frame.payload().remaining();
if sz > MAX_WINDOW_SIZE as usize {
return Err(UserError::PayloadTooBig);
let sz = sz as WindowSize;
if !stream.state.is_send_streaming() {
if stream.state.is_closed() {
return Err(InactiveStreamId);
} else {
return Err(UnexpectedFrameType);
// Update the buffered data counter
stream.buffered_send_data += sz;
"send_data; sz={}; buffered={}; requested={}",
// Implicitly request more send capacity if not enough has been
// requested yet.
if stream.requested_send_capacity < stream.buffered_send_data {
// Update the target requested capacity
stream.requested_send_capacity = stream.buffered_send_data;
if frame.is_end_stream() {
self.reserve_capacity(0, stream, counts);
"send_data (2); available={}; buffered={}",
// The `stream.buffered_send_data == 0` check is here so that, if a zero
// length data frame is queued to the front (there is no previously
// queued data), it gets sent out immediately even if there is no
// available send window.
// Sending out zero length data frames can be done to signal
// end-of-stream.
if stream.send_flow.available() > 0 || stream.buffered_send_data == 0 {
// The stream currently has capacity to send the data frame, so
// queue it up and notify the connection task.
self.queue_frame(frame.into(), buffer, stream, task);
} else {
// The stream has no capacity to send the frame now, save it but
// don't notify the connection task. Once additional capacity
// becomes available, the frame will be flushed.
.push_back(buffer, frame.into());
/// Request capacity to send data
pub fn reserve_capacity(
&mut self,
capacity: WindowSize,
stream: &mut store::Ptr,
counts: &mut Counts) {
"reserve_capacity; stream={:?}; requested={:?}; effective={:?}; curr={:?}",,
capacity + stream.buffered_send_data,
// Actual capacity is `capacity` + the current amount of buffered data.
// If it were less, then we could never send out the buffered data.
let capacity = capacity + stream.buffered_send_data;
if capacity == stream.requested_send_capacity {
// Nothing to do
} else if capacity < stream.requested_send_capacity {
// Update the target requested capacity
stream.requested_send_capacity = capacity;
// Currently available capacity assigned to the stream
let available = stream.send_flow.available().as_size();
// If the stream has more assigned capacity than requested, reclaim
// some for the connection
if available > capacity {
let diff = available - capacity;
self.assign_connection_capacity(diff, stream, counts);
} else {
// If trying to *add* capacity, but the stream send side is closed,
// there's nothing to be done.
if stream.state.is_send_closed() {
// Update the target requested capacity
stream.requested_send_capacity = capacity;
// Try to assign additional capacity to the stream. If none is
// currently available, the stream will be queued to receive some
// when more becomes available.
pub fn recv_stream_window_update(
&mut self,
inc: WindowSize,
stream: &mut store::Ptr,
) -> Result<(), Reason> {
"recv_stream_window_update; stream={:?}; state={:?}; inc={}; flow={:?}",,
if stream.state.is_send_closed() && stream.buffered_send_data == 0 {
// We can't send any data, so don't bother doing anything else.
return Ok(());
// Update the stream level flow control.
// If the stream is waiting on additional capacity, then this will
// assign it (if available on the connection) and notify the producer
pub fn recv_connection_window_update(
&mut self,
inc: WindowSize,
store: &mut Store,
counts: &mut Counts,
) -> Result<(), Reason> {
// Update the connection's window
self.assign_connection_capacity(inc, store, counts);
/// Reclaim all capacity assigned to the stream and re-assign it to the
/// connection
pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
let available = stream.send_flow.available().as_size();
// Re-assign all capacity to the connection
self.assign_connection_capacity(available, stream, counts);
/// Reclaim just reserved capacity, not buffered capacity, and re-assign
/// it to the connection
pub fn reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
// only reclaim requested capacity that isn't already buffered
if stream.requested_send_capacity > stream.buffered_send_data {
let reserved = stream.requested_send_capacity - stream.buffered_send_data;
self.assign_connection_capacity(reserved, stream, counts);
pub fn clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts) {
while let Some(stream) = self.pending_capacity.pop(store) {
counts.transition(stream, |_, stream| {
trace!("clear_pending_capacity; stream={:?}",;
pub fn assign_connection_capacity<R>(
&mut self,
inc: WindowSize,
store: &mut R,
counts: &mut Counts)
R: Resolve,
trace!("assign_connection_capacity; inc={}", inc);
// Assign newly acquired capacity to streams pending capacity.
while self.flow.available() > 0 {
let stream = match self.pending_capacity.pop(store) {
Some(stream) => stream,
None => return,
// Streams pending capacity may have been reset before capacity
// became available. In that case, the stream won't want any
// capacity, and so we shouldn't "transition" on it, but just evict
// it and continue the loop.
if !(stream.state.is_send_streaming() || stream.buffered_send_data > 0) {
counts.transition(stream, |_, mut stream| {
// Try to assign capacity to the stream. This will also re-queue the
// stream if there isn't enough connection level capacity to fulfill
// the capacity request.
self.try_assign_capacity(&mut stream);
/// Request capacity to send data
fn try_assign_capacity(&mut self, stream: &mut store::Ptr) {
let total_requested = stream.requested_send_capacity;
// Total requested should never go below actual assigned
// (Note: the window size can go lower than assigned)
debug_assert!(total_requested >= stream.send_flow.available());
// The amount of additional capacity that the stream requests.
// Don't assign more than the window has available!
let additional = cmp::min(
total_requested - stream.send_flow.available().as_size(),
// Can't assign more than what is available
stream.send_flow.window_size() - stream.send_flow.available().as_size(),
"try_assign_capacity; stream={:?}, requested={}; additional={}; buffered={}; window={}; conn={}",,
if additional == 0 {
// Nothing more to do
// If the stream has requested capacity, then it must be in the
// streaming state (more data could be sent) or there is buffered data
// waiting to be sent.
stream.state.is_send_streaming() || stream.buffered_send_data > 0,
// The amount of currently available capacity on the connection
let conn_available = self.flow.available().as_size();
// First check if capacity is immediately available
if conn_available > 0 {
// The amount of capacity to assign to the stream
// TODO: Should prioritization factor into this?
let assign = cmp::min(conn_available, additional);
" assigning; stream={:?}, capacity={}",,
// Assign the capacity to the stream
// Claim the capacity from the connection
"try_assign_capacity(2); available={}; requested={}; buffered={}; has_unavailable={:?}",
if stream.send_flow.available() < stream.requested_send_capacity {
if stream.send_flow.has_unavailable() {
// The stream requires additional capacity and the stream's
// window has available capacity, but the connection window
// does not.
// In this case, the stream needs to be queued up for when the
// connection has more capacity.
// If data is buffered and the stream is not pending open, then
// schedule the stream for execution
// Why do we not push into pending_send when the stream is in pending_open?
// We allow users to call send_request() which schedules a stream to be pending_open
// if there is no room according to the concurrency limit (max_send_streams), and we
// also allow data to be buffered for send with send_data() if there is no capacity for
// the stream to send the data, which attempts to place the stream in pending_send.
// If the stream is not open, we don't want the stream to be scheduled for
// execution (pending_send). Note that if the stream is in pending_open, it will be
// pushed to pending_send when there is room for an open stream.
if stream.buffered_send_data > 0 && !stream.is_pending_open {
// TODO: This assertion isn't *exactly* correct. There can still be
// buffered send data while the stream's pending send queue is
// empty. This can happen when a large data frame is in the process
// of being **partially** sent. Once the window has been sent, the
// data frame will be returned to the prioritization layer to be
// re-scheduled.
// That said, it would be nice to figure out how to make this
// assertion correctly.
// debug_assert!(!stream.pending_send.is_empty());
pub fn poll_complete<T, B>(
&mut self,
buffer: &mut Buffer<Frame<B>>,
store: &mut Store,
counts: &mut Counts,
dst: &mut Codec<T, Prioritized<B>>,
) -> Poll<(), io::Error>
T: AsyncWrite,
B: Buf,
// Ensure codec is ready
// Reclaim any frame that has previously been written
self.reclaim_frame(buffer, store, dst);
// The max frame length
let max_frame_len = dst.max_send_frame_size();
loop {
self.schedule_pending_open(store, counts);
match self.pop_frame(buffer, store, max_frame_len, counts) {
Some(frame) => {
trace!("writing frame={:?}", frame);
debug_assert_eq!(self.in_flight_data_frame, InFlightData::Nothing);
if let Frame::Data(ref frame) = frame {
self.in_flight_data_frame = InFlightData::DataFrame(frame.payload().stream);
dst.buffer(frame).ok().expect("invalid frame");
// Ensure the codec is ready to try the loop again.
// Because, always try to reclaim...
self.reclaim_frame(buffer, store, dst);
None => {
// Try to flush the codec.
// This might release a data frame...
if !self.reclaim_frame(buffer, store, dst) {
return Ok(().into());
// No need to poll ready as poll_complete() does this for
// us...
/// Tries to reclaim a pending data frame from the codec.
/// Returns true if a frame was reclaimed.
/// When a data frame is written to the codec, it may not be written in its
/// entirety (large chunks are split up into potentially many data frames).
/// In this case, the stream needs to be reprioritized.
fn reclaim_frame<T, B>(
&mut self,
buffer: &mut Buffer<Frame<B>>,
store: &mut Store,
dst: &mut Codec<T, Prioritized<B>>,
) -> bool
B: Buf,
trace!("try reclaim frame");
// First check if there are any data chunks to take back
if let Some(frame) = dst.take_last_data_frame() {
" -> reclaimed; frame={:?}; sz={}",
let mut eos = false;
let key = frame.payload().stream;
match mem::replace(&mut self.in_flight_data_frame, InFlightData::Nothing) {
InFlightData::Nothing => panic!("wasn't expecting a frame to reclaim"),
InFlightData::Drop => {
trace!("not reclaiming frame for cancelled stream");
return false;
InFlightData::DataFrame(k) => {
debug_assert_eq!(k, key);
let mut frame =|prioritized| {
// TODO: Ensure fully written
eos = prioritized.end_of_stream;
if frame.payload().has_remaining() {
let mut stream = store.resolve(key);
if eos {
self.push_back_frame(frame.into(), buffer, &mut stream);
return true;
/// Push the frame to the front of the stream's deque, scheduling the
/// stream if needed.
fn push_back_frame<B>(&mut self,
frame: Frame<B>,
buffer: &mut Buffer<Frame<B>>,
stream: &mut store::Ptr)
// Push the frame to the front of the stream's deque
stream.pending_send.push_front(buffer, frame);
// If needed, schedule the sender
if stream.send_flow.available() > 0 {
pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) {
trace!("clear_queue; stream={:?}",;
// TODO: make this more efficient?
while let Some(frame) = stream.pending_send.pop_front(buffer) {
trace!("dropping; frame={:?}", frame);
stream.buffered_send_data = 0;
stream.requested_send_capacity = 0;
if let InFlightData::DataFrame(key) = self.in_flight_data_frame {
if stream.key() == key {
// This stream could get cleaned up now - don't allow the buffered frame to get reclaimed.
self.in_flight_data_frame = InFlightData::Drop;
pub fn clear_pending_send(&mut self, store: &mut Store, counts: &mut Counts) {
while let Some(stream) = self.pending_send.pop(store) {
let is_pending_reset = stream.is_pending_reset_expiration();
counts.transition_after(stream, is_pending_reset);
pub fn clear_pending_open(&mut self, store: &mut Store, counts: &mut Counts) {
while let Some(stream) = self.pending_open.pop(store) {
let is_pending_reset = stream.is_pending_reset_expiration();
counts.transition_after(stream, is_pending_reset);
fn pop_frame<B>(
&mut self,
buffer: &mut Buffer<Frame<B>>,
store: &mut Store,
max_len: usize,
counts: &mut Counts,
) -> Option<Frame<Prioritized<B>>>
B: Buf,
loop {
match self.pending_send.pop(store) {
Some(mut stream) => {
trace!("pop_frame; stream={:?}; stream.state={:?}",, stream.state);
// It's possible that this stream, besides having data to send,
// is also queued to send a reset, and thus is already in the queue
// to wait for "some time" after a reset.
// To be safe, we just always ask the stream.
let is_pending_reset = stream.is_pending_reset_expiration();
trace!(" --> stream={:?}; is_pending_reset={:?};",, is_pending_reset);
let frame = match stream.pending_send.pop_front(buffer) {
Some(Frame::Data(mut frame)) => {
// Get the amount of capacity remaining for stream's
// window.
let stream_capacity = stream.send_flow.available();
let sz = frame.payload().remaining();
" --> data frame; stream={:?}; sz={}; eos={:?}; window={}; \
available={}; requested={}; buffered={};",
// Zero length data frames always have capacity to
// be sent.
if sz > 0 && stream_capacity == 0 {
" --> stream capacity is 0; requested={}",
// Ensure that the stream is waiting for
// connection level capacity
// TODO: uncomment
// debug_assert!(stream.is_pending_send_capacity);
// The stream has no more capacity, this can
// happen if the remote reduced the stream
// window. In this case, we need to buffer the
// frame and wait for a window update...
.push_front(buffer, frame.into());
// Only send up to the max frame length
let len = cmp::min(sz, max_len);
// Only send up to the stream's window capacity
let len = cmp::min(len, stream_capacity.as_size() as usize) as WindowSize;
// There *must* be be enough connection level
// capacity at this point.
debug_assert!(len <= self.flow.window_size());
trace!(" --> sending data frame; len={}", len);
// Update the flow control
trace!(" -- updating stream flow --");
// Decrement the stream's buffered data counter
debug_assert!(stream.buffered_send_data >= len);
stream.buffered_send_data -= len;
stream.requested_send_capacity -= len;
// Assign the capacity back to the connection that
// was just consumed from the stream in the previous
// line.
trace!(" -- updating connection flow --");
// Wrap the frame's data payload to ensure that the
// correct amount of data gets written.
let eos = frame.is_end_stream();
let len = len as usize;
if frame.payload().remaining() > len {
Frame::Data(|buf| {
Prioritized {
inner: buf.take(len),
end_of_stream: eos,
stream: stream.key(),
Some(frame) =>|_|
"Frame::map closure will only be called \
on DATA frames."
None => {
if let Some(reason) = stream.state.get_scheduled_reset() {
let frame = frame::Reset::new(, reason);
} else {
// If the stream receives a RESET from the peer, it may have
// had data buffered to be sent, but all the frames are cleared
// in clear_queue(). Instead of doing O(N) traversal through queue
// to remove, lets just ignore the stream here.
trace!("removing dangling stream from pending_send");
// Since this should only happen as a consequence of `clear_queue`,
// we must be in a closed state of some kind.
counts.transition_after(stream, is_pending_reset);
trace!("pop_frame; frame={:?}", frame);
if cfg!(debug_assertions) && stream.state.is_idle() {
debug_assert!( > self.last_opened_id);
self.last_opened_id =;
if !stream.pending_send.is_empty() || stream.state.is_scheduled_reset() {
// TODO: Only requeue the sender IF it is ready to send
// the next frame. i.e. don't requeue it if the next
// frame is a data frame and the stream does not have
// any more capacity.
self.pending_send.push(&mut stream);
counts.transition_after(stream, is_pending_reset);
return Some(frame);
None => return None,
fn schedule_pending_open(&mut self, store: &mut Store, counts: &mut Counts) {
// check for any pending open streams
while counts.can_inc_num_send_streams() {
if let Some(mut stream) = self.pending_open.pop(store) {
trace!("schedule_pending_open; stream={:?}",;
counts.inc_num_send_streams(&mut stream);
self.pending_send.push(&mut stream);
} else {
// ===== impl Prioritized =====
impl<B> Buf for Prioritized<B>
B: Buf,
fn remaining(&self) -> usize {
fn bytes(&self) -> &[u8] {
fn advance(&mut self, cnt: usize) {
impl<B: Buf> fmt::Debug for Prioritized<B> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
.field("remaining", &self.inner.get_ref().remaining())
.field("end_of_stream", &self.end_of_stream)
.field("stream", &