blob: 866c11408d5db351af143df3e916bb36cf96286c [file] [log] [blame]
use bytes::{Bytes, BytesMut};
use futures_core::stream::Stream;
use pin_project_lite::pin_project;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::AsyncRead;
const DEFAULT_CAPACITY: usize = 4096;
pin_project! {
/// Convert an [`AsyncRead`] into a [`Stream`] of byte chunks.
///
/// This stream is fused. It performs the inverse operation of
/// [`StreamReader`].
///
/// # Example
///
/// ```
/// # #[tokio::main]
/// # async fn main() -> std::io::Result<()> {
/// use tokio_stream::StreamExt;
/// use tokio_util::io::ReaderStream;
///
/// // Create a stream of data.
/// let data = b"hello, world!";
/// let mut stream = ReaderStream::new(&data[..]);
///
/// // Read all of the chunks into a vector.
/// let mut stream_contents = Vec::new();
/// while let Some(chunk) = stream.next().await {
/// stream_contents.extend_from_slice(&chunk?);
/// }
///
/// // Once the chunks are concatenated, we should have the
/// // original data.
/// assert_eq!(stream_contents, data);
/// # Ok(())
/// # }
/// ```
///
/// [`AsyncRead`]: tokio::io::AsyncRead
/// [`StreamReader`]: crate::io::StreamReader
/// [`Stream`]: futures_core::Stream
#[derive(Debug)]
pub struct ReaderStream<R> {
// Reader itself.
//
// This value is `None` if the stream has terminated.
#[pin]
reader: Option<R>,
// Working buffer, used to optimize allocations.
buf: BytesMut,
capacity: usize,
}
}
impl<R: AsyncRead> ReaderStream<R> {
/// Convert an [`AsyncRead`] into a [`Stream`] with item type
/// `Result<Bytes, std::io::Error>`.
///
/// [`AsyncRead`]: tokio::io::AsyncRead
/// [`Stream`]: futures_core::Stream
pub fn new(reader: R) -> Self {
ReaderStream {
reader: Some(reader),
buf: BytesMut::new(),
capacity: DEFAULT_CAPACITY,
}
}
/// Convert an [`AsyncRead`] into a [`Stream`] with item type
/// `Result<Bytes, std::io::Error>`,
/// with a specific read buffer initial capacity.
///
/// [`AsyncRead`]: tokio::io::AsyncRead
/// [`Stream`]: futures_core::Stream
pub fn with_capacity(reader: R, capacity: usize) -> Self {
ReaderStream {
reader: Some(reader),
buf: BytesMut::with_capacity(capacity),
capacity,
}
}
}
impl<R: AsyncRead> Stream for ReaderStream<R> {
type Item = std::io::Result<Bytes>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
use crate::util::poll_read_buf;
let mut this = self.as_mut().project();
let reader = match this.reader.as_pin_mut() {
Some(r) => r,
None => return Poll::Ready(None),
};
if this.buf.capacity() == 0 {
this.buf.reserve(*this.capacity);
}
match poll_read_buf(reader, cx, &mut this.buf) {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(err)) => {
self.project().reader.set(None);
Poll::Ready(Some(Err(err)))
}
Poll::Ready(Ok(0)) => {
self.project().reader.set(None);
Poll::Ready(None)
}
Poll::Ready(Ok(_)) => {
let chunk = this.buf.split();
Poll::Ready(Some(Ok(chunk.freeze())))
}
}
}
}