| use std::io; |
| |
| use crate::codec::UserError; |
| use crate::frame::{self, Reason, StreamId}; |
| use crate::proto::{self, Error, Initiator, PollReset}; |
| |
| use self::Inner::*; |
| use self::Peer::*; |
| |
| /// Represents the state of an H2 stream |
| /// |
| /// ```not_rust |
| /// +--------+ |
| /// send PP | | recv PP |
| /// ,--------| idle |--------. |
| /// / | | \ |
| /// v +--------+ v |
| /// +----------+ | +----------+ |
| /// | | | send H / | | |
| /// ,------| reserved | | recv H | reserved |------. |
| /// | | (local) | | | (remote) | | |
| /// | +----------+ v +----------+ | |
| /// | | +--------+ | | |
| /// | | recv ES | | send ES | | |
| /// | send H | ,-------| open |-------. | recv H | |
| /// | | / | | \ | | |
| /// | v v +--------+ v v | |
| /// | +----------+ | +----------+ | |
| /// | | half | | | half | | |
| /// | | closed | | send R / | closed | | |
| /// | | (remote) | | recv R | (local) | | |
| /// | +----------+ | +----------+ | |
| /// | | | | | |
| /// | | send ES / | recv ES / | | |
| /// | | send R / v send R / | | |
| /// | | recv R +--------+ recv R | | |
| /// | send R / `----------->| |<-----------' send R / | |
| /// | recv R | closed | recv R | |
| /// `----------------------->| |<----------------------' |
| /// +--------+ |
| /// |
| /// send: endpoint sends this frame |
| /// recv: endpoint receives this frame |
| /// |
| /// H: HEADERS frame (with implied CONTINUATIONs) |
| /// PP: PUSH_PROMISE frame (with implied CONTINUATIONs) |
| /// ES: END_STREAM flag |
| /// R: RST_STREAM frame |
| /// ``` |
| #[derive(Debug, Clone)] |
| pub struct State { |
| inner: Inner, |
| } |
| |
| #[derive(Debug, Clone)] |
| enum Inner { |
| Idle, |
| // TODO: these states shouldn't count against concurrency limits: |
| ReservedLocal, |
| ReservedRemote, |
| Open { local: Peer, remote: Peer }, |
| HalfClosedLocal(Peer), // TODO: explicitly name this value |
| HalfClosedRemote(Peer), |
| Closed(Cause), |
| } |
| |
| #[derive(Debug, Copy, Clone)] |
| enum Peer { |
| AwaitingHeaders, |
| Streaming, |
| } |
| |
| #[derive(Debug, Clone)] |
| enum Cause { |
| EndStream, |
| Error(Error), |
| |
| /// This indicates to the connection that a reset frame must be sent out |
| /// once the send queue has been flushed. |
| /// |
| /// Examples of when this could happen: |
| /// - User drops all references to a stream, so we want to CANCEL the it. |
| /// - Header block size was too large, so we want to REFUSE, possibly |
| /// after sending a 431 response frame. |
| ScheduledLibraryReset(Reason), |
| } |
| |
| impl State { |
| /// Opens the send-half of a stream if it is not already open. |
| pub fn send_open(&mut self, eos: bool) -> Result<(), UserError> { |
| let local = Streaming; |
| |
| self.inner = match self.inner { |
| Idle => { |
| if eos { |
| HalfClosedLocal(AwaitingHeaders) |
| } else { |
| Open { |
| local, |
| remote: AwaitingHeaders, |
| } |
| } |
| } |
| Open { |
| local: AwaitingHeaders, |
| remote, |
| } => { |
| if eos { |
| HalfClosedLocal(remote) |
| } else { |
| Open { local, remote } |
| } |
| } |
| HalfClosedRemote(AwaitingHeaders) | ReservedLocal => { |
| if eos { |
| Closed(Cause::EndStream) |
| } else { |
| HalfClosedRemote(local) |
| } |
| } |
| _ => { |
| // All other transitions result in a protocol error |
| return Err(UserError::UnexpectedFrameType); |
| } |
| }; |
| |
| Ok(()) |
| } |
| |
| /// Opens the receive-half of the stream when a HEADERS frame is received. |
| /// |
| /// Returns true if this transitions the state to Open. |
| pub fn recv_open(&mut self, frame: &frame::Headers) -> Result<bool, Error> { |
| let mut initial = false; |
| let eos = frame.is_end_stream(); |
| |
| self.inner = match self.inner { |
| Idle => { |
| initial = true; |
| |
| if eos { |
| HalfClosedRemote(AwaitingHeaders) |
| } else { |
| Open { |
| local: AwaitingHeaders, |
| remote: if frame.is_informational() { |
| tracing::trace!("skipping 1xx response headers"); |
| AwaitingHeaders |
| } else { |
| Streaming |
| }, |
| } |
| } |
| } |
| ReservedRemote => { |
| initial = true; |
| |
| if eos { |
| Closed(Cause::EndStream) |
| } else if frame.is_informational() { |
| tracing::trace!("skipping 1xx response headers"); |
| ReservedRemote |
| } else { |
| HalfClosedLocal(Streaming) |
| } |
| } |
| Open { |
| local, |
| remote: AwaitingHeaders, |
| } => { |
| if eos { |
| HalfClosedRemote(local) |
| } else { |
| Open { |
| local, |
| remote: if frame.is_informational() { |
| tracing::trace!("skipping 1xx response headers"); |
| AwaitingHeaders |
| } else { |
| Streaming |
| }, |
| } |
| } |
| } |
| HalfClosedLocal(AwaitingHeaders) => { |
| if eos { |
| Closed(Cause::EndStream) |
| } else if frame.is_informational() { |
| tracing::trace!("skipping 1xx response headers"); |
| HalfClosedLocal(AwaitingHeaders) |
| } else { |
| HalfClosedLocal(Streaming) |
| } |
| } |
| ref state => { |
| // All other transitions result in a protocol error |
| proto_err!(conn: "recv_open: in unexpected state {:?}", state); |
| return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); |
| } |
| }; |
| |
| Ok(initial) |
| } |
| |
| /// Transition from Idle -> ReservedRemote |
| pub fn reserve_remote(&mut self) -> Result<(), Error> { |
| match self.inner { |
| Idle => { |
| self.inner = ReservedRemote; |
| Ok(()) |
| } |
| ref state => { |
| proto_err!(conn: "reserve_remote: in unexpected state {:?}", state); |
| Err(Error::library_go_away(Reason::PROTOCOL_ERROR)) |
| } |
| } |
| } |
| |
| /// Transition from Idle -> ReservedLocal |
| pub fn reserve_local(&mut self) -> Result<(), UserError> { |
| match self.inner { |
| Idle => { |
| self.inner = ReservedLocal; |
| Ok(()) |
| } |
| _ => Err(UserError::UnexpectedFrameType), |
| } |
| } |
| |
| /// Indicates that the remote side will not send more data to the local. |
| pub fn recv_close(&mut self) -> Result<(), Error> { |
| match self.inner { |
| Open { local, .. } => { |
| // The remote side will continue to receive data. |
| tracing::trace!("recv_close: Open => HalfClosedRemote({:?})", local); |
| self.inner = HalfClosedRemote(local); |
| Ok(()) |
| } |
| HalfClosedLocal(..) => { |
| tracing::trace!("recv_close: HalfClosedLocal => Closed"); |
| self.inner = Closed(Cause::EndStream); |
| Ok(()) |
| } |
| ref state => { |
| proto_err!(conn: "recv_close: in unexpected state {:?}", state); |
| Err(Error::library_go_away(Reason::PROTOCOL_ERROR)) |
| } |
| } |
| } |
| |
| /// The remote explicitly sent a RST_STREAM. |
| /// |
| /// # Arguments |
| /// - `frame`: the received RST_STREAM frame. |
| /// - `queued`: true if this stream has frames in the pending send queue. |
| pub fn recv_reset(&mut self, frame: frame::Reset, queued: bool) { |
| match self.inner { |
| // If the stream is already in a `Closed` state, do nothing, |
| // provided that there are no frames still in the send queue. |
| Closed(..) if !queued => {} |
| // A notionally `Closed` stream may still have queued frames in |
| // the following cases: |
| // |
| // - if the cause is `Cause::Scheduled(..)` (i.e. we have not |
| // actually closed the stream yet). |
| // - if the cause is `Cause::EndStream`: we transition to this |
| // state when an EOS frame is *enqueued* (so that it's invalid |
| // to enqueue more frames), not when the EOS frame is *sent*; |
| // therefore, there may still be frames ahead of the EOS frame |
| // in the send queue. |
| // |
| // In either of these cases, we want to overwrite the stream's |
| // previous state with the received RST_STREAM, so that the queue |
| // will be cleared by `Prioritize::pop_frame`. |
| ref state => { |
| tracing::trace!( |
| "recv_reset; frame={:?}; state={:?}; queued={:?}", |
| frame, |
| state, |
| queued |
| ); |
| self.inner = Closed(Cause::Error(Error::remote_reset( |
| frame.stream_id(), |
| frame.reason(), |
| ))); |
| } |
| } |
| } |
| |
| /// Handle a connection-level error. |
| pub fn handle_error(&mut self, err: &proto::Error) { |
| match self.inner { |
| Closed(..) => {} |
| _ => { |
| tracing::trace!("handle_error; err={:?}", err); |
| self.inner = Closed(Cause::Error(err.clone())); |
| } |
| } |
| } |
| |
| pub fn recv_eof(&mut self) { |
| match self.inner { |
| Closed(..) => {} |
| ref state => { |
| tracing::trace!("recv_eof; state={:?}", state); |
| self.inner = Closed(Cause::Error( |
| io::Error::new( |
| io::ErrorKind::BrokenPipe, |
| "stream closed because of a broken pipe", |
| ) |
| .into(), |
| )); |
| } |
| } |
| } |
| |
| /// Indicates that the local side will not send more data to the local. |
| pub fn send_close(&mut self) { |
| match self.inner { |
| Open { remote, .. } => { |
| // The remote side will continue to receive data. |
| tracing::trace!("send_close: Open => HalfClosedLocal({:?})", remote); |
| self.inner = HalfClosedLocal(remote); |
| } |
| HalfClosedRemote(..) => { |
| tracing::trace!("send_close: HalfClosedRemote => Closed"); |
| self.inner = Closed(Cause::EndStream); |
| } |
| ref state => panic!("send_close: unexpected state {:?}", state), |
| } |
| } |
| |
| /// Set the stream state to reset locally. |
| pub fn set_reset(&mut self, stream_id: StreamId, reason: Reason, initiator: Initiator) { |
| self.inner = Closed(Cause::Error(Error::Reset(stream_id, reason, initiator))); |
| } |
| |
| /// Set the stream state to a scheduled reset. |
| pub fn set_scheduled_reset(&mut self, reason: Reason) { |
| debug_assert!(!self.is_closed()); |
| self.inner = Closed(Cause::ScheduledLibraryReset(reason)); |
| } |
| |
| pub fn get_scheduled_reset(&self) -> Option<Reason> { |
| match self.inner { |
| Closed(Cause::ScheduledLibraryReset(reason)) => Some(reason), |
| _ => None, |
| } |
| } |
| |
| pub fn is_scheduled_reset(&self) -> bool { |
| matches!(self.inner, Closed(Cause::ScheduledLibraryReset(..))) |
| } |
| |
| pub fn is_local_error(&self) -> bool { |
| match self.inner { |
| Closed(Cause::Error(ref e)) => e.is_local(), |
| Closed(Cause::ScheduledLibraryReset(..)) => true, |
| _ => false, |
| } |
| } |
| |
| pub fn is_remote_reset(&self) -> bool { |
| match self.inner { |
| Closed(Cause::Error(Error::Reset(_, _, Initiator::Remote))) => true, |
| _ => false, |
| } |
| } |
| |
| /// Returns true if the stream is already reset. |
| pub fn is_reset(&self) -> bool { |
| match self.inner { |
| Closed(Cause::EndStream) => false, |
| Closed(_) => true, |
| _ => false, |
| } |
| } |
| |
| pub fn is_send_streaming(&self) -> bool { |
| matches!( |
| self.inner, |
| Open { |
| local: Streaming, |
| .. |
| } | HalfClosedRemote(Streaming) |
| ) |
| } |
| |
| /// Returns true when the stream is in a state to receive headers |
| pub fn is_recv_headers(&self) -> bool { |
| matches!( |
| self.inner, |
| Idle | Open { |
| remote: AwaitingHeaders, |
| .. |
| } | HalfClosedLocal(AwaitingHeaders) |
| | ReservedRemote |
| ) |
| } |
| |
| pub fn is_recv_streaming(&self) -> bool { |
| matches!( |
| self.inner, |
| Open { |
| remote: Streaming, |
| .. |
| } | HalfClosedLocal(Streaming) |
| ) |
| } |
| |
| pub fn is_closed(&self) -> bool { |
| matches!(self.inner, Closed(_)) |
| } |
| |
| pub fn is_recv_closed(&self) -> bool { |
| matches!( |
| self.inner, |
| Closed(..) | HalfClosedRemote(..) | ReservedLocal |
| ) |
| } |
| |
| pub fn is_send_closed(&self) -> bool { |
| matches!( |
| self.inner, |
| Closed(..) | HalfClosedLocal(..) | ReservedRemote |
| ) |
| } |
| |
| pub fn is_idle(&self) -> bool { |
| matches!(self.inner, Idle) |
| } |
| |
| pub fn ensure_recv_open(&self) -> Result<bool, proto::Error> { |
| // TODO: Is this correct? |
| match self.inner { |
| Closed(Cause::Error(ref e)) => Err(e.clone()), |
| Closed(Cause::ScheduledLibraryReset(reason)) => { |
| Err(proto::Error::library_go_away(reason)) |
| } |
| Closed(Cause::EndStream) | HalfClosedRemote(..) | ReservedLocal => Ok(false), |
| _ => Ok(true), |
| } |
| } |
| |
| /// Returns a reason if the stream has been reset. |
| pub(super) fn ensure_reason(&self, mode: PollReset) -> Result<Option<Reason>, crate::Error> { |
| match self.inner { |
| Closed(Cause::Error(Error::Reset(_, reason, _))) |
| | Closed(Cause::Error(Error::GoAway(_, reason, _))) |
| | Closed(Cause::ScheduledLibraryReset(reason)) => Ok(Some(reason)), |
| Closed(Cause::Error(ref e)) => Err(e.clone().into()), |
| Open { |
| local: Streaming, .. |
| } |
| | HalfClosedRemote(Streaming) => match mode { |
| PollReset::AwaitingHeaders => Err(UserError::PollResetAfterSendResponse.into()), |
| PollReset::Streaming => Ok(None), |
| }, |
| _ => Ok(None), |
| } |
| } |
| } |
| |
| impl Default for State { |
| fn default() -> State { |
| State { inner: Inner::Idle } |
| } |
| } |
| |
| impl Default for Peer { |
| fn default() -> Self { |
| AwaitingHeaders |
| } |
| } |