| #[path = "../support.rs"] |
| mod support; |
| use tokio_test::{assert_pending, assert_ready, assert_ready_ok}; |
| use tower::limit::concurrency::ConcurrencyLimitLayer; |
| use tower_test::{assert_request_eq, mock}; |
| |
| #[tokio::test(flavor = "current_thread")] |
| async fn basic_service_limit_functionality_with_poll_ready() { |
| let _t = support::trace_init(); |
| let limit = ConcurrencyLimitLayer::new(2); |
| let (mut service, mut handle) = mock::spawn_layer(limit); |
| |
| assert_ready_ok!(service.poll_ready()); |
| let r1 = service.call("hello 1"); |
| |
| assert_ready_ok!(service.poll_ready()); |
| let r2 = service.call("hello 2"); |
| |
| assert_pending!(service.poll_ready()); |
| |
| assert!(!service.is_woken()); |
| |
| // The request gets passed through |
| assert_request_eq!(handle, "hello 1").send_response("world 1"); |
| |
| // The next request gets passed through |
| assert_request_eq!(handle, "hello 2").send_response("world 2"); |
| |
| // There are no more requests |
| assert_pending!(handle.poll_request()); |
| |
| assert_eq!(r1.await.unwrap(), "world 1"); |
| |
| assert!(service.is_woken()); |
| |
| // Another request can be sent |
| assert_ready_ok!(service.poll_ready()); |
| |
| let r3 = service.call("hello 3"); |
| |
| assert_pending!(service.poll_ready()); |
| |
| assert_eq!(r2.await.unwrap(), "world 2"); |
| |
| // The request gets passed through |
| assert_request_eq!(handle, "hello 3").send_response("world 3"); |
| |
| assert_eq!(r3.await.unwrap(), "world 3"); |
| } |
| |
| #[tokio::test(flavor = "current_thread")] |
| async fn basic_service_limit_functionality_without_poll_ready() { |
| let _t = support::trace_init(); |
| let limit = ConcurrencyLimitLayer::new(2); |
| let (mut service, mut handle) = mock::spawn_layer(limit); |
| |
| assert_ready_ok!(service.poll_ready()); |
| let r1 = service.call("hello 1"); |
| |
| assert_ready_ok!(service.poll_ready()); |
| let r2 = service.call("hello 2"); |
| |
| assert_pending!(service.poll_ready()); |
| |
| // The request gets passed through |
| assert_request_eq!(handle, "hello 1").send_response("world 1"); |
| |
| assert!(!service.is_woken()); |
| |
| // The next request gets passed through |
| assert_request_eq!(handle, "hello 2").send_response("world 2"); |
| |
| assert!(!service.is_woken()); |
| |
| // There are no more requests |
| assert_pending!(handle.poll_request()); |
| |
| assert_eq!(r1.await.unwrap(), "world 1"); |
| |
| assert!(service.is_woken()); |
| |
| // One more request can be sent |
| assert_ready_ok!(service.poll_ready()); |
| let r4 = service.call("hello 4"); |
| |
| assert_pending!(service.poll_ready()); |
| |
| assert_eq!(r2.await.unwrap(), "world 2"); |
| assert!(service.is_woken()); |
| |
| // The request gets passed through |
| assert_request_eq!(handle, "hello 4").send_response("world 4"); |
| |
| assert_eq!(r4.await.unwrap(), "world 4"); |
| } |
| |
| #[tokio::test(flavor = "current_thread")] |
| async fn request_without_capacity() { |
| let _t = support::trace_init(); |
| let limit = ConcurrencyLimitLayer::new(0); |
| let (mut service, _) = mock::spawn_layer::<(), (), _>(limit); |
| |
| assert_pending!(service.poll_ready()); |
| } |
| |
| #[tokio::test(flavor = "current_thread")] |
| async fn reserve_capacity_without_sending_request() { |
| let _t = support::trace_init(); |
| let limit = ConcurrencyLimitLayer::new(1); |
| let (mut s1, mut handle) = mock::spawn_layer(limit); |
| |
| let mut s2 = s1.clone(); |
| |
| // Reserve capacity in s1 |
| assert_ready_ok!(s1.poll_ready()); |
| |
| // Service 2 cannot get capacity |
| assert_pending!(s2.poll_ready()); |
| |
| // s1 sends the request, then s2 is able to get capacity |
| let r1 = s1.call("hello"); |
| |
| assert_request_eq!(handle, "hello").send_response("world"); |
| |
| assert_pending!(s2.poll_ready()); |
| |
| r1.await.unwrap(); |
| |
| assert_ready_ok!(s2.poll_ready()); |
| } |
| |
| #[tokio::test(flavor = "current_thread")] |
| async fn service_drop_frees_capacity() { |
| let _t = support::trace_init(); |
| let limit = ConcurrencyLimitLayer::new(1); |
| let (mut s1, _handle) = mock::spawn_layer::<(), (), _>(limit); |
| |
| let mut s2 = s1.clone(); |
| |
| // Reserve capacity in s1 |
| assert_ready_ok!(s1.poll_ready()); |
| |
| // Service 2 cannot get capacity |
| assert_pending!(s2.poll_ready()); |
| |
| drop(s1); |
| |
| assert!(s2.is_woken()); |
| assert_ready_ok!(s2.poll_ready()); |
| } |
| |
| #[tokio::test(flavor = "current_thread")] |
| async fn response_error_releases_capacity() { |
| let _t = support::trace_init(); |
| let limit = ConcurrencyLimitLayer::new(1); |
| let (mut s1, mut handle) = mock::spawn_layer::<_, (), _>(limit); |
| |
| let mut s2 = s1.clone(); |
| |
| // Reserve capacity in s1 |
| assert_ready_ok!(s1.poll_ready()); |
| |
| // s1 sends the request, then s2 is able to get capacity |
| let r1 = s1.call("hello"); |
| |
| assert_request_eq!(handle, "hello").send_error("boom"); |
| |
| r1.await.unwrap_err(); |
| |
| assert_ready_ok!(s2.poll_ready()); |
| } |
| |
| #[tokio::test(flavor = "current_thread")] |
| async fn response_future_drop_releases_capacity() { |
| let _t = support::trace_init(); |
| let limit = ConcurrencyLimitLayer::new(1); |
| let (mut s1, _handle) = mock::spawn_layer::<_, (), _>(limit); |
| |
| let mut s2 = s1.clone(); |
| |
| // Reserve capacity in s1 |
| assert_ready_ok!(s1.poll_ready()); |
| |
| // s1 sends the request, then s2 is able to get capacity |
| let r1 = s1.call("hello"); |
| |
| assert_pending!(s2.poll_ready()); |
| |
| drop(r1); |
| |
| assert_ready_ok!(s2.poll_ready()); |
| } |
| |
| #[tokio::test(flavor = "current_thread")] |
| async fn multi_waiters() { |
| let _t = support::trace_init(); |
| let limit = ConcurrencyLimitLayer::new(1); |
| let (mut s1, _handle) = mock::spawn_layer::<(), (), _>(limit); |
| let mut s2 = s1.clone(); |
| let mut s3 = s1.clone(); |
| |
| // Reserve capacity in s1 |
| assert_ready_ok!(s1.poll_ready()); |
| |
| // s2 and s3 are not ready |
| assert_pending!(s2.poll_ready()); |
| assert_pending!(s3.poll_ready()); |
| |
| drop(s1); |
| |
| assert!(s2.is_woken()); |
| assert!(!s3.is_woken()); |
| |
| drop(s2); |
| |
| assert!(s3.is_woken()); |
| } |