blob: ff354821c7fb05846cec8c481576301121707717 [file] [log] [blame]
//! Reconnect services when they fail.
//!
//! Reconnect takes some [`MakeService`] and transforms it into a
//! [`Service`]. It then attempts to lazily connect and
//! reconnect on failure. The `Reconnect` service becomes unavailable
//! when the inner `MakeService::poll_ready` returns an error. When the
//! connection future returned from `MakeService::call` fails this will be
//! returned in the next call to `Reconnect::call`. This allows the user to
//! call the service again even if the inner `MakeService` was unable to
//! connect on the last call.
//!
//! [`MakeService`]: crate::make::MakeService
//! [`Service`]: crate::Service
mod future;
pub use future::ResponseFuture;
use crate::make::MakeService;
use std::fmt;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tower_service::Service;
use tracing::trace;
/// Reconnect to failed services.
pub struct Reconnect<M, Target>
where
M: Service<Target>,
{
mk_service: M,
state: State<M::Future, M::Response>,
target: Target,
error: Option<M::Error>,
}
#[derive(Debug)]
enum State<F, S> {
Idle,
Connecting(F),
Connected(S),
}
impl<M, Target> Reconnect<M, Target>
where
M: Service<Target>,
{
/// Lazily connect and reconnect to a [`Service`].
pub fn new<S, Request>(mk_service: M, target: Target) -> Self {
Reconnect {
mk_service,
state: State::Idle,
target,
error: None,
}
}
/// Reconnect to a already connected [`Service`].
pub fn with_connection(init_conn: M::Response, mk_service: M, target: Target) -> Self {
Reconnect {
mk_service,
state: State::Connected(init_conn),
target,
error: None,
}
}
}
impl<M, Target, S, Request> Service<Request> for Reconnect<M, Target>
where
M: Service<Target, Response = S>,
S: Service<Request>,
M::Future: Unpin,
crate::BoxError: From<M::Error> + From<S::Error>,
Target: Clone,
{
type Response = S::Response;
type Error = crate::BoxError;
type Future = ResponseFuture<S::Future, M::Error>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
loop {
match &mut self.state {
State::Idle => {
trace!("poll_ready; idle");
match self.mk_service.poll_ready(cx) {
Poll::Ready(r) => r?,
Poll::Pending => {
trace!("poll_ready; MakeService not ready");
return Poll::Pending;
}
}
let fut = self.mk_service.make_service(self.target.clone());
self.state = State::Connecting(fut);
continue;
}
State::Connecting(ref mut f) => {
trace!("poll_ready; connecting");
match Pin::new(f).poll(cx) {
Poll::Ready(Ok(service)) => {
self.state = State::Connected(service);
}
Poll::Pending => {
trace!("poll_ready; not ready");
return Poll::Pending;
}
Poll::Ready(Err(e)) => {
trace!("poll_ready; error");
self.state = State::Idle;
self.error = Some(e);
break;
}
}
}
State::Connected(ref mut inner) => {
trace!("poll_ready; connected");
match inner.poll_ready(cx) {
Poll::Ready(Ok(())) => {
trace!("poll_ready; ready");
return Poll::Ready(Ok(()));
}
Poll::Pending => {
trace!("poll_ready; not ready");
return Poll::Pending;
}
Poll::Ready(Err(_)) => {
trace!("poll_ready; error");
self.state = State::Idle;
}
}
}
}
}
Poll::Ready(Ok(()))
}
fn call(&mut self, request: Request) -> Self::Future {
if let Some(error) = self.error.take() {
return ResponseFuture::error(error);
}
let service = match self.state {
State::Connected(ref mut service) => service,
_ => panic!("service not ready; poll_ready must be called first"),
};
let fut = service.call(request);
ResponseFuture::new(fut)
}
}
impl<M, Target> fmt::Debug for Reconnect<M, Target>
where
M: Service<Target> + fmt::Debug,
M::Future: fmt::Debug,
M::Response: fmt::Debug,
Target: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Reconnect")
.field("mk_service", &self.mk_service)
.field("state", &self.state)
.field("target", &self.target)
.finish()
}
}