diff --git a/Cargo.lock b/Cargo.lock index f07e75bfff..95620dbc14 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1462,7 +1462,7 @@ version = "0.1.0" dependencies = [ "futures 0.3.5", "linkerd2-error", - "pin-project", + "linkerd2-stack", "tower", "tracing", ] diff --git a/linkerd/app/core/src/svc.rs b/linkerd/app/core/src/svc.rs index 489e43e34b..8a21c6913e 100644 --- a/linkerd/app/core/src/svc.rs +++ b/linkerd/app/core/src/svc.rs @@ -2,7 +2,7 @@ pub use crate::proxy::http; use crate::transport::Connect; -use crate::{cache, request_filter, Error}; +use crate::{cache, Error}; pub use linkerd2_buffer as buffer; use linkerd2_concurrency_limit as concurrency_limit; pub use linkerd2_stack::{self as stack, layer, NewService}; @@ -267,10 +267,6 @@ impl Stack { self.push(stack::FallbackLayer::new(fallback).with_predicate(predicate)) } - pub fn push_request_filter(self, filter: F) -> Stack> { - self.push(request_filter::RequestFilterLayer::new(filter)) - } - // pub fn box_http_request(self) -> Stack> // where // B: hyper::body::HttpBody + 'static, diff --git a/linkerd/app/src/dst/mod.rs b/linkerd/app/src/dst/mod.rs index 4a4a4981b2..5b1802c53d 100644 --- a/linkerd/app/src/dst/mod.rs +++ b/linkerd/app/src/dst/mod.rs @@ -3,7 +3,7 @@ mod resolve; use indexmap::IndexSet; use linkerd2_app_core::{ - control, dns, profiles, proxy::identity, request_filter, svc, transport::tls, + control, dns, profiles, proxy::identity, request_filter::RequestFilter, svc, transport::tls, ControlHttpMetrics, Error, }; use permit::PermitConfiguredDsts; @@ -26,12 +26,11 @@ pub struct Config { /// The addr is preserved for logging. pub struct Dst { pub addr: control::ControlAddr, - pub profiles: request_filter::Service< + pub profiles: RequestFilter< PermitConfiguredDsts, profiles::Client, resolve::BackoffUnlessInvalidArgument>, >, - pub resolve: - request_filter::Service>>, + pub resolve: RequestFilter>>, } impl Config { @@ -45,10 +44,10 @@ impl Config { let backoff = self.control.connect.backoff.clone(); let svc = self.control.build(dns, metrics, identity); let resolve = svc::stack(resolve::new(svc.clone(), &self.context, backoff)) - .push_request_filter(PermitConfiguredDsts::new( + .push(RequestFilter::layer(PermitConfiguredDsts::new( self.get_suffixes, self.get_networks, - )) + ))) .into_inner(); let profiles = svc::stack(profiles::Client::new( @@ -57,10 +56,10 @@ impl Config { self.initial_profile_timeout, self.context, )) - .push_request_filter( + .push(RequestFilter::layer( PermitConfiguredDsts::new(self.profile_suffixes, self.profile_networks) .with_error::(), - ) + )) .into_inner(); Ok(Dst { diff --git a/linkerd/app/src/dst/permit.rs b/linkerd/app/src/dst/permit.rs index dcba096ce0..cc33caad33 100644 --- a/linkerd/app/src/dst/permit.rs +++ b/linkerd/app/src/dst/permit.rs @@ -1,5 +1,7 @@ use ipnet::{Contains, IpNet}; -use linkerd2_app_core::{dns::Suffix, request_filter, Addr, DiscoveryRejected, Error}; +use linkerd2_app_core::{ + dns::Suffix, request_filter::FilterRequest, Addr, DiscoveryRejected, Error, +}; use std::marker::PhantomData; use std::net::IpAddr; use std::sync::Arc; @@ -48,14 +50,14 @@ impl Clone for PermitConfiguredDsts { } } -impl request_filter::RequestFilter for PermitConfiguredDsts +impl FilterRequest for PermitConfiguredDsts where E: Into + From, for<'t> &'t T: Into, { - type Error = E; + type Request = T; - fn filter(&self, t: T) -> Result { + fn filter(&self, t: T) -> Result { let addr = (&t).into(); let permitted = match addr { Addr::Name(ref name) => self @@ -72,7 +74,7 @@ where if permitted { Ok(t) } else { - Err(E::from(addr.clone())) + Err(E::from(addr.clone()).into()) } } } diff --git a/linkerd/request-filter/Cargo.toml b/linkerd/request-filter/Cargo.toml index dedac261e2..3b1cb12a38 100644 --- a/linkerd/request-filter/Cargo.toml +++ b/linkerd/request-filter/Cargo.toml @@ -7,7 +7,7 @@ publish = false [dependencies] futures = "0.3" +linkerd2-error = { path = "../error" } +linkerd2-stack = { path = "../stack" } tower = { version = "0.3", default-features = false } tracing = "0.1.19" -linkerd2-error = { path = "../error" } -pin-project = "0.4" \ No newline at end of file diff --git a/linkerd/request-filter/src/lib.rs b/linkerd/request-filter/src/lib.rs index d2eba8437f..47dacf2dbc 100644 --- a/linkerd/request-filter/src/lib.rs +++ b/linkerd/request-filter/src/lib.rs @@ -3,71 +3,51 @@ #![deny(warnings, rust_2018_idioms)] +use futures::{future, prelude::*}; use linkerd2_error::Error; -use pin_project::pin_project; -use std::future::Future; -use std::pin::Pin; +use linkerd2_stack::layer; use std::task::{Context, Poll}; -pub trait RequestFilter { - type Error: Into; +pub trait FilterRequest { + type Request; - fn filter(&self, request: T) -> Result; + fn filter(&self, request: Req) -> Result; } #[derive(Clone, Debug)] -pub struct RequestFilterLayer { - filter: T, -} - -#[derive(Clone, Debug)] -pub struct Service { +pub struct RequestFilter { filter: I, service: S, } -#[pin_project(project = ResponseFutureProj)] -#[derive(Debug)] -pub enum ResponseFuture { - Future(#[pin] F), - Rejected(Option), -} - -// === impl Layer === - -impl RequestFilterLayer { - pub fn new(filter: T) -> Self { - Self { filter } - } -} - -impl tower::Layer for RequestFilterLayer { - type Service = Service; - - fn layer(&self, inner: S) -> Self::Service { - Service::new(self.filter.clone(), inner) - } -} +// === impl RequestFilter === -// === impl Service === - -impl Service { +impl RequestFilter { pub fn new(filter: I, service: S) -> Self { Self { filter, service } } + + pub fn layer(filter: I) -> impl layer::Layer + Clone + where + I: Clone, + { + layer::mk(move |inner| Self::new(filter.clone(), inner)) + } } -impl tower::Service for Service +impl tower::Service for RequestFilter where - I: RequestFilter, - S: tower::Service, + F: FilterRequest, + S: tower::Service, S::Error: Into, { type Response = S::Response; type Error = Error; - type Future = ResponseFuture; + type Future = future::Either< + future::ErrInto, + future::Ready>, + >; - #[inline] fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.service.poll_ready(cx).map_err(Into::into) } @@ -76,30 +56,12 @@ where match self.filter.filter(request) { Ok(req) => { tracing::trace!("accepted"); - let f = self.service.call(req); - ResponseFuture::Future(f) + future::Either::Left(self.service.call(req).err_into::()) } Err(e) => { tracing::trace!("rejected"); - ResponseFuture::Rejected(Some(e.into())) + future::Either::Right(future::err(e)) } } } } - -// === impl ResponseFuture === - -impl Future for ResponseFuture -where - F: Future>, - E: Into, -{ - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.project() { - ResponseFutureProj::Future(f) => f.poll(cx).map(|r| r.map_err(Into::into)), - ResponseFutureProj::Rejected(e) => Poll::Ready(Err(e.take().unwrap())), - } - } -}