Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1277,6 +1277,7 @@ dependencies = [
"linkerd-app-test",
"linkerd-distribute",
"linkerd-http-classify",
"linkerd-http-prom",
"linkerd-http-retry",
"linkerd-http-route",
"linkerd-identity",
Expand Down Expand Up @@ -1493,6 +1494,24 @@ dependencies = [
"tracing",
]

[[package]]
name = "linkerd-http-prom"
version = "0.1.0"
dependencies = [
"futures",
"http",
"http-body",
"linkerd-error",
"linkerd-http-box",
"linkerd-metrics",
"linkerd-stack",
"parking_lot",
"pin-project",
"prometheus-client",
"thiserror",
"tokio",
]

[[package]]
name = "linkerd-http-retry"
version = "0.1.0"
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ members = [
"linkerd/http/classify",
"linkerd/http/h2",
"linkerd/http/metrics",
"linkerd/http/prom",
"linkerd/http/retry",
"linkerd/http/route",
"linkerd/identity",
Expand Down
2 changes: 2 additions & 0 deletions linkerd/app/outbound/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ linkerd-app-core = { path = "../core" }
linkerd-app-test = { path = "../test", optional = true }
linkerd-distribute = { path = "../../distribute" }
linkerd-http-classify = { path = "../../http/classify" }
linkerd-http-prom = { path = "../../http/prom" }
linkerd-http-retry = { path = "../../http/retry" }
linkerd-http-route = { path = "../../http/route" }
linkerd-identity = { path = "../../identity" }
Expand All @@ -49,6 +50,7 @@ linkerd-tonic-watch = { path = "../../tonic-watch" }
[dev-dependencies]
hyper = { version = "0.14", features = ["http1", "http2"] }
linkerd-app-test = { path = "../test", features = ["client-policy"] }
linkerd-http-prom = { path = "../../http/prom", features = ["test-util"] }
linkerd-io = { path = "../../io", features = ["tokio-test"] }
linkerd-meshtls = { path = "../../meshtls", features = ["rustls"] }
linkerd-meshtls-rustls = { path = "../../meshtls/rustls", features = [
Expand Down
8 changes: 4 additions & 4 deletions linkerd/app/outbound/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ pub struct Http<T>(T);
#[derive(Clone, Debug, Default)]
pub struct HttpMetrics {
balancer: concrete::BalancerMetrics,
http_route: policy::RouteMetrics,
grpc_route: policy::RouteMetrics,
http_route: policy::HttpRouteMetrics,
grpc_route: policy::GrpcRouteMetrics,
}

pub fn spawn_routes<T>(
Expand Down Expand Up @@ -132,12 +132,12 @@ where
impl HttpMetrics {
pub fn register(registry: &mut prom::Registry) -> Self {
let http = registry.sub_registry_with_prefix("http");
let http_route = policy::RouteMetrics::register(http.sub_registry_with_prefix("route"));
let http_route = policy::HttpRouteMetrics::register(http.sub_registry_with_prefix("route"));
let balancer =
concrete::BalancerMetrics::register(http.sub_registry_with_prefix("balancer"));

let grpc = registry.sub_registry_with_prefix("grpc");
let grpc_route = policy::RouteMetrics::register(grpc.sub_registry_with_prefix("route"));
let grpc_route = policy::GrpcRouteMetrics::register(grpc.sub_registry_with_prefix("route"));

Self {
balancer,
Expand Down
6 changes: 3 additions & 3 deletions linkerd/app/outbound/src/http/logical/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod router;
mod tests;

pub use self::{
route::{errors, RouteMetrics},
route::{errors, GrpcRouteMetrics, HttpRouteMetrics},
router::{GrpcParams, HttpParams},
};
pub use linkerd_proxy_client_policy::{ClientPolicy, FailureAccrual};
Expand Down Expand Up @@ -50,8 +50,8 @@ where
/// routing configurations to route requests over cached inner backend
/// services.
pub(super) fn layer<N, S>(
http_metrics: route::RouteMetrics,
grpc_metrics: route::RouteMetrics,
http_metrics: route::HttpRouteMetrics,
grpc_metrics: route::GrpcRouteMetrics,
) -> impl svc::Layer<N, Service = svc::ArcNewCloneHttp<Self>> + Clone
where
// Inner stack.
Expand Down
101 changes: 64 additions & 37 deletions linkerd/app/outbound/src/http/logical/policy/route.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
use super::super::Concrete;
use crate::RouteRef;
use linkerd_app_core::{classify, metrics::prom, proxy::http, svc, Addr, Error, Result};
use crate::{ParentRef, RouteRef};
use linkerd_app_core::{classify, proxy::http, svc, Addr, Error, Result};
use linkerd_distribute as distribute;
use linkerd_http_route as http_route;
use linkerd_proxy_client_policy as policy;
use std::{fmt::Debug, hash::Hash, sync::Arc};

pub(crate) mod backend;
pub(crate) mod filters;
pub(crate) mod metrics;

pub(crate) use self::backend::{Backend, MatchedBackend};
pub use self::filters::errors;
use self::metrics::labels::Route as RouteLabels;

#[derive(Clone, Debug, Default)]
pub struct RouteMetrics {
backend: backend::RouteBackendMetrics,
}
pub use self::metrics::{GrpcRouteMetrics, HttpRouteMetrics};

/// A target type that includes a summary of exactly how a request was matched.
/// This match state is required to apply route filters.
Expand All @@ -31,6 +30,7 @@ pub(crate) struct Matched<M, P> {
pub(crate) struct Route<T, F, E> {
pub(super) parent: T,
pub(super) addr: Addr,
pub(super) parent_ref: ParentRef,
pub(super) route_ref: RouteRef,
pub(super) filters: Arc<[F]>,
pub(super) distribution: BackendDistribution<T, F>,
Expand All @@ -55,6 +55,11 @@ pub(crate) type Grpc<T> = MatchedRoute<
pub(crate) type BackendDistribution<T, F> = distribute::Distribution<Backend<T, F>>;
pub(crate) type NewDistribute<T, F, N> = distribute::NewDistribute<Backend<T, F>, (), N>;

pub type Metrics<R, B> = metrics::RouteMetrics<
<R as metrics::MkStreamLabel>::StreamLabel,
<B as metrics::MkStreamLabel>::StreamLabel,
>;

/// Wraps errors with route metadata.
#[derive(Debug, thiserror::Error)]
#[error("route {}: {source}", route.0)]
Expand All @@ -64,28 +69,6 @@ struct RouteError {
source: Error,
}

// === impl RouteMetrics ===

impl RouteMetrics {
pub fn register(reg: &mut prom::Registry) -> Self {
Self {
backend: backend::RouteBackendMetrics::register(
reg.sub_registry_with_prefix("backend"),
),
}
}

#[cfg(test)]
pub(crate) fn request_count(
&self,
p: crate::ParentRef,
r: RouteRef,
b: crate::BackendRef,
) -> backend::RequestCount {
self.backend.request_count(p, r, b)
}
}

// === impl MatchedRoute ===

impl<T, M, F, E> MatchedRoute<T, M, F, E>
Expand All @@ -103,13 +86,15 @@ where
// Assert that filters can be applied.
Self: filters::Apply,
Self: svc::Param<classify::Request>,
Self: metrics::MkStreamLabel,
MatchedBackend<T, M, F>: filters::Apply,
MatchedBackend<T, M, F>: metrics::MkStreamLabel,
{
/// Builds a route stack that applies policy filters to requests and
/// distributes requests over each route's backends. These [`Concrete`]
/// backends are expected to be cached/shared by the inner stack.
pub(crate) fn layer<N, S>(
metrics: RouteMetrics,
metrics: Metrics<Self, MatchedBackend<T, M, F>>,
) -> impl svc::Layer<N, Service = svc::ArcNewCloneHttp<Self>> + Clone
where
// Inner stack.
Expand All @@ -134,10 +119,11 @@ where
// consideration, so we must eagerly fail requests to prevent
// leaking tasks onto the runtime.
.push_on_service(svc::LoadShed::layer())
// TODO(ver) attach the `E` typed failure policy to requests.
.push(filters::NewApplyFilters::<Self, _, _>::layer())
// Sets an optional request timeout.
.push(http::NewTimeout::layer())
.push(metrics::layer(&metrics.requests))
// Configure a classifier to use in the endpoint stack.
// FIXME(ver) move this into NewSetExtensions
.push(classify::NewClassify::layer())
.push(svc::NewMapErr::layer_with(|rt: &Self| {
let route = rt.params.route_ref.clone();
Expand All @@ -152,18 +138,29 @@ where
}
}

impl<T: Clone, M, F, E> svc::Param<BackendDistribution<T, F>> for MatchedRoute<T, M, F, E> {
impl<T: Clone, M, F, P> svc::Param<BackendDistribution<T, F>> for MatchedRoute<T, M, F, P> {
fn param(&self) -> BackendDistribution<T, F> {
self.params.distribution.clone()
}
}

impl<T, M, F, E> svc::Param<http::timeout::ResponseTimeout> for MatchedRoute<T, M, F, E> {
impl<T: Clone, M, F, P> svc::Param<RouteLabels> for MatchedRoute<T, M, F, P> {
fn param(&self) -> RouteLabels {
RouteLabels(
self.params.parent_ref.clone(),
self.params.route_ref.clone(),
)
}
}

impl<T, M, F, P> svc::Param<http::timeout::ResponseTimeout> for MatchedRoute<T, M, F, P> {
fn param(&self) -> http::timeout::ResponseTimeout {
http::timeout::ResponseTimeout(self.params.request_timeout)
}
}

// === impl Http ===

impl<T> filters::Apply for Http<T> {
#[inline]
fn apply_request<B>(&self, req: &mut ::http::Request<B>) -> Result<()> {
Expand All @@ -176,14 +173,30 @@ impl<T> filters::Apply for Http<T> {
}
}

impl<T> metrics::MkStreamLabel for Http<T> {
type StatusLabels = metrics::labels::HttpRouteRsp;
type DurationLabels = metrics::labels::Route;
type StreamLabel = metrics::LabelHttpRouteRsp;

fn mk_stream_labeler<B>(&self, _: &::http::Request<B>) -> Option<Self::StreamLabel> {
let parent = self.params.parent_ref.clone();
let route = self.params.route_ref.clone();
Some(metrics::LabelHttpRsp::from(metrics::labels::Route::from((
parent, route,
))))
}
}

impl<T> svc::Param<classify::Request> for Http<T> {
fn param(&self) -> classify::Request {
classify::Request::ClientPolicy(classify::ClientPolicy::Http(
self.params.failure_policy.clone(),
policy::http::StatusRanges::default(),
))
}
}

// === impl Grpc ===

impl<T> filters::Apply for Grpc<T> {
#[inline]
fn apply_request<B>(&self, req: &mut ::http::Request<B>) -> Result<()> {
Expand All @@ -196,10 +209,24 @@ impl<T> filters::Apply for Grpc<T> {
}
}

impl<T> metrics::MkStreamLabel for Grpc<T> {
type StatusLabels = metrics::labels::GrpcRouteRsp;
type DurationLabels = metrics::labels::Route;
type StreamLabel = metrics::LabelGrpcRouteRsp;

fn mk_stream_labeler<B>(&self, _: &::http::Request<B>) -> Option<Self::StreamLabel> {
let parent = self.params.parent_ref.clone();
let route = self.params.route_ref.clone();
Some(metrics::LabelGrpcRsp::from(metrics::labels::Route::from((
parent, route,
))))
}
}

impl<T> svc::Param<classify::Request> for Grpc<T> {
fn param(&self) -> classify::Request {
classify::Request::ClientPolicy(classify::ClientPolicy::Grpc(
self.params.failure_policy.clone(),
))
classify::Request::ClientPolicy(
classify::ClientPolicy::Grpc(policy::grpc::Codes::default()),
)
}
}
45 changes: 37 additions & 8 deletions linkerd/app/outbound/src/http/logical/policy/route/backend.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
use super::{super::Concrete, filters};
use crate::{BackendRef, ParentRef, RouteRef};
use linkerd_app_core::{proxy::http, svc, Error, Result};
use linkerd_http_prom::record_response::MkStreamLabel;
use linkerd_http_route as http_route;
use linkerd_proxy_client_policy as policy;
use std::{fmt::Debug, hash::Hash, sync::Arc};

mod count_reqs;
mod metrics;

pub use self::count_reqs::RequestCount;
pub use self::metrics::RouteBackendMetrics;
pub(super) mod metrics;

#[derive(Debug, PartialEq, Eq, Hash)]
pub(crate) struct Backend<T, F> {
Expand All @@ -25,6 +22,8 @@ pub(crate) type Http<T> =
pub(crate) type Grpc<T> =
MatchedBackend<T, http_route::grpc::r#match::RouteMatch, policy::grpc::Filter>;

pub type Metrics<T> = metrics::RouteBackendMetrics<<T as MkStreamLabel>::StreamLabel>;

/// Wraps errors with backend metadata.
#[derive(Debug, thiserror::Error)]
#[error("backend {}: {source}", backend.0)]
Expand Down Expand Up @@ -71,15 +70,15 @@ where
F: Clone + Send + Sync + 'static,
// Assert that filters can be applied.
Self: filters::Apply,
RouteBackendMetrics: svc::ExtractParam<RequestCount, Self>,
Self: metrics::MkStreamLabel,
{
/// Builds a stack that applies per-route-backend policy filters over an
/// inner [`Concrete`] stack.
///
/// This [`MatchedBackend`] must implement [`filters::Apply`] to apply these
/// filters.
pub(crate) fn layer<N, S>(
metrics: RouteBackendMetrics,
metrics: Metrics<Self>,
) -> impl svc::Layer<N, Service = svc::ArcNewCloneHttp<Self>> + Clone
where
// Inner stack.
Expand All @@ -103,7 +102,7 @@ where
)
.push(filters::NewApplyFilters::<Self, _, _>::layer())
.push(http::NewTimeout::layer())
.push(count_reqs::NewCountRequests::layer_via(metrics.clone()))
.push(metrics::layer(&metrics))
.push(svc::NewMapErr::layer_with(|t: &Self| {
let backend = t.params.concrete.backend_ref.clone();
move |source| {
Expand Down Expand Up @@ -155,6 +154,21 @@ impl<T> filters::Apply for Http<T> {
}
}

impl<T> metrics::MkStreamLabel for Http<T> {
type StatusLabels = metrics::labels::HttpRouteBackendRsp;
type DurationLabels = metrics::labels::RouteBackend;
type StreamLabel = metrics::LabelHttpRouteBackendRsp;

fn mk_stream_labeler<B>(&self, _: &::http::Request<B>) -> Option<Self::StreamLabel> {
let parent = self.params.concrete.parent_ref.clone();
let route = self.params.route_ref.clone();
let backend = self.params.concrete.backend_ref.clone();
Some(metrics::LabelHttpRsp::from(
metrics::labels::RouteBackend::from((parent, route, backend)),
))
}
}

impl<T> filters::Apply for Grpc<T> {
#[inline]
fn apply_request<B>(&self, req: &mut ::http::Request<B>) -> Result<()> {
Expand All @@ -165,3 +179,18 @@ impl<T> filters::Apply for Grpc<T> {
filters::apply_grpc_response(&self.params.filters, rsp)
}
}

impl<T> metrics::MkStreamLabel for Grpc<T> {
type StatusLabels = metrics::labels::GrpcRouteBackendRsp;
type DurationLabels = metrics::labels::RouteBackend;
type StreamLabel = metrics::LabelGrpcRouteBackendRsp;

fn mk_stream_labeler<B>(&self, _: &::http::Request<B>) -> Option<Self::StreamLabel> {
let parent = self.params.concrete.parent_ref.clone();
let route = self.params.route_ref.clone();
let backend = self.params.concrete.backend_ref.clone();
Some(metrics::LabelGrpcRsp::from(
metrics::labels::RouteBackend::from((parent, route, backend)),
))
}
}
Loading