From 689f8fd6dea1277ea40ec6c4de8e3a22b8e72be0 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Wed, 30 Sep 2020 22:15:44 +0000 Subject: [PATCH] request-filter: Allow altering request type The request filter takes ownership of the request but does not support changing its type. Furthermore, the trait requires that the error type be named even though it always coerced to an `Error`. This change cleans up the request-filter module as follows: - The `RequestFilter` trait is now named `FilterRequest` (traits generally are verbs). This type now has a `Request` type attribute instead of an `Error` type attribute. - The `Service` has been renamed to `RequestFilter` and the `RequstFilterLayer` type has been eliminated. - The manual future implementation can be eliminated with `Either`. - The stack helper has been removed, as it's only used in one place. --- Cargo.lock | 2 +- linkerd/app/core/src/svc.rs | 6 +-- linkerd/app/src/dst/mod.rs | 15 +++--- linkerd/app/src/dst/permit.rs | 12 +++-- linkerd/request-filter/Cargo.toml | 4 +- linkerd/request-filter/src/lib.rs | 86 +++++++++---------------------- 6 files changed, 42 insertions(+), 83 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f07e75bfff..95620dbc14 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1462,7 +1462,7 @@ version = "0.1.0" dependencies = [ "futures 0.3.5", "linkerd2-error", - "pin-project", + "linkerd2-stack", "tower", "tracing", ] diff --git a/linkerd/app/core/src/svc.rs b/linkerd/app/core/src/svc.rs index 489e43e34b..8a21c6913e 100644 --- a/linkerd/app/core/src/svc.rs +++ b/linkerd/app/core/src/svc.rs @@ -2,7 +2,7 @@ pub use crate::proxy::http; use crate::transport::Connect; -use crate::{cache, request_filter, Error}; +use crate::{cache, Error}; pub use linkerd2_buffer as buffer; use linkerd2_concurrency_limit as concurrency_limit; pub use linkerd2_stack::{self as stack, layer, NewService}; @@ -267,10 +267,6 @@ impl Stack { self.push(stack::FallbackLayer::new(fallback).with_predicate(predicate)) } - pub fn push_request_filter(self, filter: F) -> Stack> { - self.push(request_filter::RequestFilterLayer::new(filter)) - } - // pub fn box_http_request(self) -> Stack> // where // B: hyper::body::HttpBody + 'static, diff --git a/linkerd/app/src/dst/mod.rs b/linkerd/app/src/dst/mod.rs index 4a4a4981b2..5b1802c53d 100644 --- a/linkerd/app/src/dst/mod.rs +++ b/linkerd/app/src/dst/mod.rs @@ -3,7 +3,7 @@ mod resolve; use indexmap::IndexSet; use linkerd2_app_core::{ - control, dns, profiles, proxy::identity, request_filter, svc, transport::tls, + control, dns, profiles, proxy::identity, request_filter::RequestFilter, svc, transport::tls, ControlHttpMetrics, Error, }; use permit::PermitConfiguredDsts; @@ -26,12 +26,11 @@ pub struct Config { /// The addr is preserved for logging. pub struct Dst { pub addr: control::ControlAddr, - pub profiles: request_filter::Service< + pub profiles: RequestFilter< PermitConfiguredDsts, profiles::Client, resolve::BackoffUnlessInvalidArgument>, >, - pub resolve: - request_filter::Service>>, + pub resolve: RequestFilter>>, } impl Config { @@ -45,10 +44,10 @@ impl Config { let backoff = self.control.connect.backoff.clone(); let svc = self.control.build(dns, metrics, identity); let resolve = svc::stack(resolve::new(svc.clone(), &self.context, backoff)) - .push_request_filter(PermitConfiguredDsts::new( + .push(RequestFilter::layer(PermitConfiguredDsts::new( self.get_suffixes, self.get_networks, - )) + ))) .into_inner(); let profiles = svc::stack(profiles::Client::new( @@ -57,10 +56,10 @@ impl Config { self.initial_profile_timeout, self.context, )) - .push_request_filter( + .push(RequestFilter::layer( PermitConfiguredDsts::new(self.profile_suffixes, self.profile_networks) .with_error::(), - ) + )) .into_inner(); Ok(Dst { diff --git a/linkerd/app/src/dst/permit.rs b/linkerd/app/src/dst/permit.rs index dcba096ce0..cc33caad33 100644 --- a/linkerd/app/src/dst/permit.rs +++ b/linkerd/app/src/dst/permit.rs @@ -1,5 +1,7 @@ use ipnet::{Contains, IpNet}; -use linkerd2_app_core::{dns::Suffix, request_filter, Addr, DiscoveryRejected, Error}; +use linkerd2_app_core::{ + dns::Suffix, request_filter::FilterRequest, Addr, DiscoveryRejected, Error, +}; use std::marker::PhantomData; use std::net::IpAddr; use std::sync::Arc; @@ -48,14 +50,14 @@ impl Clone for PermitConfiguredDsts { } } -impl request_filter::RequestFilter for PermitConfiguredDsts +impl FilterRequest for PermitConfiguredDsts where E: Into + From, for<'t> &'t T: Into, { - type Error = E; + type Request = T; - fn filter(&self, t: T) -> Result { + fn filter(&self, t: T) -> Result { let addr = (&t).into(); let permitted = match addr { Addr::Name(ref name) => self @@ -72,7 +74,7 @@ where if permitted { Ok(t) } else { - Err(E::from(addr.clone())) + Err(E::from(addr.clone()).into()) } } } diff --git a/linkerd/request-filter/Cargo.toml b/linkerd/request-filter/Cargo.toml index dedac261e2..3b1cb12a38 100644 --- a/linkerd/request-filter/Cargo.toml +++ b/linkerd/request-filter/Cargo.toml @@ -7,7 +7,7 @@ publish = false [dependencies] futures = "0.3" +linkerd2-error = { path = "../error" } +linkerd2-stack = { path = "../stack" } tower = { version = "0.3", default-features = false } tracing = "0.1.19" -linkerd2-error = { path = "../error" } -pin-project = "0.4" \ No newline at end of file diff --git a/linkerd/request-filter/src/lib.rs b/linkerd/request-filter/src/lib.rs index d2eba8437f..47dacf2dbc 100644 --- a/linkerd/request-filter/src/lib.rs +++ b/linkerd/request-filter/src/lib.rs @@ -3,71 +3,51 @@ #![deny(warnings, rust_2018_idioms)] +use futures::{future, prelude::*}; use linkerd2_error::Error; -use pin_project::pin_project; -use std::future::Future; -use std::pin::Pin; +use linkerd2_stack::layer; use std::task::{Context, Poll}; -pub trait RequestFilter { - type Error: Into; +pub trait FilterRequest { + type Request; - fn filter(&self, request: T) -> Result; + fn filter(&self, request: Req) -> Result; } #[derive(Clone, Debug)] -pub struct RequestFilterLayer { - filter: T, -} - -#[derive(Clone, Debug)] -pub struct Service { +pub struct RequestFilter { filter: I, service: S, } -#[pin_project(project = ResponseFutureProj)] -#[derive(Debug)] -pub enum ResponseFuture { - Future(#[pin] F), - Rejected(Option), -} - -// === impl Layer === - -impl RequestFilterLayer { - pub fn new(filter: T) -> Self { - Self { filter } - } -} - -impl tower::Layer for RequestFilterLayer { - type Service = Service; - - fn layer(&self, inner: S) -> Self::Service { - Service::new(self.filter.clone(), inner) - } -} +// === impl RequestFilter === -// === impl Service === - -impl Service { +impl RequestFilter { pub fn new(filter: I, service: S) -> Self { Self { filter, service } } + + pub fn layer(filter: I) -> impl layer::Layer + Clone + where + I: Clone, + { + layer::mk(move |inner| Self::new(filter.clone(), inner)) + } } -impl tower::Service for Service +impl tower::Service for RequestFilter where - I: RequestFilter, - S: tower::Service, + F: FilterRequest, + S: tower::Service, S::Error: Into, { type Response = S::Response; type Error = Error; - type Future = ResponseFuture; + type Future = future::Either< + future::ErrInto, + future::Ready>, + >; - #[inline] fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.service.poll_ready(cx).map_err(Into::into) } @@ -76,30 +56,12 @@ where match self.filter.filter(request) { Ok(req) => { tracing::trace!("accepted"); - let f = self.service.call(req); - ResponseFuture::Future(f) + future::Either::Left(self.service.call(req).err_into::()) } Err(e) => { tracing::trace!("rejected"); - ResponseFuture::Rejected(Some(e.into())) + future::Either::Right(future::err(e)) } } } } - -// === impl ResponseFuture === - -impl Future for ResponseFuture -where - F: Future>, - E: Into, -{ - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.project() { - ResponseFutureProj::Future(f) => f.poll(cx).map(|r| r.map_err(Into::into)), - ResponseFutureProj::Rejected(e) => Poll::Ready(Err(e.take().unwrap())), - } - } -}