Upgrade tokio-stream to 0.1.12 am: a78006577e am: a9cba4aff2 am: 0e4b52ea4f am: 18447d24b8

Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/tokio-stream/+/2513497

Change-Id: If465eafdbf90be7d951ffa80c2fabc00bac77ee7
Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index 94e3fd4..d866201 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,6 +1,6 @@
 {
   "git": {
-    "sha1": "ae0d49d59c0c63efafde73306af5d0d94046b50d"
+    "sha1": "46f974d8cfcb56c251d80cf1dc4a6bcf9fd1d7a0"
   },
   "path_in_vcs": "tokio-stream"
 }
\ No newline at end of file
diff --git a/Android.bp b/Android.bp
index 08e9901..d80fdf7 100644
--- a/Android.bp
+++ b/Android.bp
@@ -23,7 +23,7 @@
     host_supported: true,
     crate_name: "tokio_stream",
     cargo_env_compat: true,
-    cargo_pkg_version: "0.1.11",
+    cargo_pkg_version: "0.1.12",
     srcs: ["src/lib.rs"],
     edition: "2018",
     features: [
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 05c2b18..c475c7c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,11 @@
+# 0.1.12 (January 20, 2022)
+
+- time: remove `Unpin` bound on `Throttle` methods ([#5105])
+- time: document that `throttle` operates on ms granularity ([#5101])
+
+[#5105]: https://github.com/tokio-rs/tokio/pull/5105
+[#5101]: https://github.com/tokio-rs/tokio/pull/5101
+
 # 0.1.11 (October 11, 2022)
 
 - time: allow `StreamExt::chunks_timeout` outside of a runtime ([#5036])
diff --git a/Cargo.toml b/Cargo.toml
index df1dc53..5a3542e 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,7 +13,7 @@
 edition = "2018"
 rust-version = "1.49"
 name = "tokio-stream"
-version = "0.1.11"
+version = "0.1.12"
 authors = ["Tokio Contributors <team@tokio.rs>"]
 description = """
 Utilities to work with `Stream` and `tokio`.
@@ -76,6 +76,3 @@
     "tokio-util",
 ]
 time = ["tokio/time"]
-
-[target."cfg(not(target_arch = \"wasm32\"))".dev-dependencies.proptest]
-version = "1"
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 6dfa978..f87b59a 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -4,7 +4,7 @@
 # - Remove path dependencies
 # - Update CHANGELOG.md.
 # - Create "tokio-stream-0.1.x" git tag.
-version = "0.1.11"
+version = "0.1.12"
 edition = "2018"
 rust-version = "1.49"
 authors = ["Tokio Contributors <team@tokio.rs>"]
@@ -38,9 +38,6 @@
 tokio-test = { path = "../tokio-test" }
 futures = { version = "0.3", default-features = false }
 
-[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies]
-proptest = "1"
-
 [package.metadata.docs.rs]
 all-features = true
 rustdoc-args = ["--cfg", "docsrs"]
diff --git a/LICENSE b/LICENSE
index 8af5baf..8bdf6bd 100644
--- a/LICENSE
+++ b/LICENSE
@@ -1,4 +1,4 @@
-Copyright (c) 2022 Tokio Contributors
+Copyright (c) 2023 Tokio Contributors
 
 Permission is hereby granted, free of charge, to any
 person obtaining a copy of this software and associated
diff --git a/METADATA b/METADATA
index 1bf6cf9..390ea84 100644
--- a/METADATA
+++ b/METADATA
@@ -11,13 +11,13 @@
   }
   url {
     type: ARCHIVE
-    value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.11.crate"
+    value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.12.crate"
   }
-  version: "0.1.11"
+  version: "0.1.12"
   license_type: NOTICE
   last_upgrade_date {
-    year: 2022
-    month: 12
-    day: 12
+    year: 2023
+    month: 3
+    day: 30
   }
 }
diff --git a/src/stream_ext.rs b/src/stream_ext.rs
index 6cea7b5..52d3202 100644
--- a/src/stream_ext.rs
+++ b/src/stream_ext.rs
@@ -982,6 +982,8 @@
 
     /// Slows down a stream by enforcing a delay between items.
     ///
+    /// The underlying timer behind this utility has a granularity of one millisecond.
+    ///
     /// # Example
     ///
     /// Create a throttled stream.
diff --git a/src/stream_ext/collect.rs b/src/stream_ext/collect.rs
index 4b157a9..8548b74 100644
--- a/src/stream_ext/collect.rs
+++ b/src/stream_ext/collect.rs
@@ -195,11 +195,7 @@
         } else {
             let res = mem::replace(collection, Ok(U::initialize(sealed::Internal, 0, Some(0))));
 
-            if let Err(err) = res {
-                Err(err)
-            } else {
-                unreachable!();
-            }
+            Err(res.map(drop).unwrap_err())
         }
     }
 }
diff --git a/src/stream_ext/then.rs b/src/stream_ext/then.rs
index 7f6b5a2..cc7caa7 100644
--- a/src/stream_ext/then.rs
+++ b/src/stream_ext/then.rs
@@ -72,7 +72,7 @@
     }
 
     fn size_hint(&self) -> (usize, Option<usize>) {
-        let future_len = if self.future.is_some() { 1 } else { 0 };
+        let future_len = usize::from(self.future.is_some());
         let (lower, upper) = self.stream.size_hint();
 
         let lower = lower.saturating_add(future_len);
diff --git a/src/stream_ext/throttle.rs b/src/stream_ext/throttle.rs
index f36c66a..5000139 100644
--- a/src/stream_ext/throttle.rs
+++ b/src/stream_ext/throttle.rs
@@ -4,7 +4,6 @@
 use tokio::time::{Duration, Instant, Sleep};
 
 use std::future::Future;
-use std::marker::Unpin;
 use std::pin::Pin;
 use std::task::{self, Poll};
 
@@ -41,8 +40,7 @@
     }
 }
 
-// XXX: are these safe if `T: !Unpin`?
-impl<T: Unpin> Throttle<T> {
+impl<T> Throttle<T> {
     /// Acquires a reference to the underlying stream that this combinator is
     /// pulling from.
     pub fn get_ref(&self) -> &T {
diff --git a/src/stream_ext/timeout.rs b/src/stream_ext/timeout.rs
index 98d7cd5..a440d20 100644
--- a/src/stream_ext/timeout.rs
+++ b/src/stream_ext/timeout.rs
@@ -24,7 +24,7 @@
 }
 
 /// Error returned by `Timeout`.
-#[derive(Debug, PartialEq)]
+#[derive(Debug, PartialEq, Eq)]
 pub struct Elapsed(());
 
 impl<S: Stream> Timeout<S> {
diff --git a/src/wrappers/broadcast.rs b/src/wrappers/broadcast.rs
index 10184bf..7110664 100644
--- a/src/wrappers/broadcast.rs
+++ b/src/wrappers/broadcast.rs
@@ -18,7 +18,7 @@
 }
 
 /// An error returned from the inner stream of a [`BroadcastStream`].
-#[derive(Debug, PartialEq, Clone)]
+#[derive(Debug, PartialEq, Eq, Clone)]
 pub enum BroadcastStreamRecvError {
     /// The receiver lagged too far behind. Attempting to receive again will
     /// return the oldest message still retained by the channel.
diff --git a/src/wrappers/watch.rs b/src/wrappers/watch.rs
index c682c9c..ec8ead0 100644
--- a/src/wrappers/watch.rs
+++ b/src/wrappers/watch.rs
@@ -10,8 +10,9 @@
 
 /// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`].
 ///
-/// This stream will always start by yielding the current value when the WatchStream is polled,
-/// regardless of whether it was the initial value or sent afterwards.
+/// This stream will start by yielding the current value when the WatchStream is polled,
+/// regardless of whether it was the initial value or sent afterwards,
+/// unless you use [`WatchStream<T>::from_changes`].
 ///
 /// # Examples
 ///
@@ -40,6 +41,28 @@
 /// let (tx, rx) = watch::channel("hello");
 /// let mut rx = WatchStream::new(rx);
 ///
+/// // existing rx output with "hello" is ignored here
+///
+/// tx.send("goodbye").unwrap();
+/// assert_eq!(rx.next().await, Some("goodbye"));
+/// # }
+/// ```
+///
+/// Example with [`WatchStream<T>::from_changes`]:
+///
+/// ```
+/// # #[tokio::main]
+/// # async fn main() {
+/// use futures::future::FutureExt;
+/// use tokio::sync::watch;
+/// use tokio_stream::{StreamExt, wrappers::WatchStream};
+///
+/// let (tx, rx) = watch::channel("hello");
+/// let mut rx = WatchStream::from_changes(rx);
+///
+/// // no output from rx is available at this point - let's check this:
+/// assert!(rx.next().now_or_never().is_none());
+///
 /// tx.send("goodbye").unwrap();
 /// assert_eq!(rx.next().await, Some("goodbye"));
 /// # }
@@ -66,6 +89,13 @@
             inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }),
         }
     }
+
+    /// Create a new `WatchStream` that waits for the value to be changed.
+    pub fn from_changes(rx: Receiver<T>) -> Self {
+        Self {
+            inner: ReusableBoxFuture::new(make_future(rx)),
+        }
+    }
 }
 
 impl<T: Clone + 'static + Send + Sync> Stream for WatchStream<T> {
diff --git a/tests/stream_stream_map.rs b/tests/stream_stream_map.rs
index ffc489b..b6b87e9 100644
--- a/tests/stream_stream_map.rs
+++ b/tests/stream_stream_map.rs
@@ -325,63 +325,6 @@
     }
 }
 
-#[cfg(not(target_os = "wasi"))]
-proptest::proptest! {
-    #[test]
-    fn fuzz_pending_complete_mix(kinds: Vec<bool>) {
-        use std::task::{Context, Poll};
-
-        struct DidPoll<T> {
-            did_poll: bool,
-            inner: T,
-        }
-
-        impl<T: Stream + Unpin> Stream for DidPoll<T> {
-            type Item = T::Item;
-
-            fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-                -> Poll<Option<T::Item>>
-            {
-                self.did_poll = true;
-                Pin::new(&mut self.inner).poll_next(cx)
-            }
-        }
-
-        for _ in 0..10 {
-            let mut map = task::spawn(StreamMap::new());
-            let mut expect = 0;
-
-            for (i, &is_empty) in kinds.iter().enumerate() {
-                let inner = if is_empty {
-                    pin_box(stream::empty::<()>())
-                } else {
-                    expect += 1;
-                    pin_box(stream::pending::<()>())
-                };
-
-                let stream = DidPoll {
-                    did_poll: false,
-                    inner,
-                };
-
-                map.insert(i, stream);
-            }
-
-            if expect == 0 {
-                assert_ready_none!(map.poll_next());
-            } else {
-                assert_pending!(map.poll_next());
-
-                assert_eq!(expect, map.values().count());
-
-                for stream in map.values() {
-                    assert!(stream.did_poll);
-                }
-            }
-        }
-    }
-}
-
 fn pin_box<T: Stream<Item = U> + 'static, U>(s: T) -> Pin<Box<dyn Stream<Item = U>>> {
     Box::pin(s)
 }
diff --git a/tests/watch.rs b/tests/watch.rs
index a56254e..3a39aaf 100644
--- a/tests/watch.rs
+++ b/tests/watch.rs
@@ -3,9 +3,11 @@
 use tokio::sync::watch;
 use tokio_stream::wrappers::WatchStream;
 use tokio_stream::StreamExt;
+use tokio_test::assert_pending;
+use tokio_test::task::spawn;
 
 #[tokio::test]
-async fn message_not_twice() {
+async fn watch_stream_message_not_twice() {
     let (tx, rx) = watch::channel("hello");
 
     let mut counter = 0;
@@ -27,3 +29,29 @@
     drop(tx);
     task.await.unwrap();
 }
+
+#[tokio::test]
+async fn watch_stream_from_rx() {
+    let (tx, rx) = watch::channel("hello");
+
+    let mut stream = WatchStream::from(rx);
+
+    assert_eq!(stream.next().await.unwrap(), "hello");
+
+    tx.send("bye").unwrap();
+
+    assert_eq!(stream.next().await.unwrap(), "bye");
+}
+
+#[tokio::test]
+async fn watch_stream_from_changes() {
+    let (tx, rx) = watch::channel("hello");
+
+    let mut stream = WatchStream::from_changes(rx);
+
+    assert_pending!(spawn(&mut stream).poll_next());
+
+    tx.send("bye").unwrap();
+
+    assert_eq!(stream.next().await.unwrap(), "bye");
+}