From a731516d8f50966c1a38d1de214f1584781780c1 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Mon, 1 Dec 2025 00:00:00 +0000 Subject: [PATCH] feat(app/inbound): http and grpc status code metrics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit in linkerd/linkerd2-proxy#4298, we introduced a new metrics telemetry layer that can measure and report status codes, in a protocol-agnostic fashion. this commit integrates this status code telemtry into the inbound proxy, so that HTTP and gRPC traffic can be observed. a new family of metrics is introduced to the `InboundMetrics` structure, and the inbound http\* router's metrics layer is accordingly updated to thread this metrics family into an extractor, which is in turn provided to the `NewRecordStatusCode` layer. \* as a note for reviewers, the inbound proxy does not model the http and grpc protocols as distinct concepts in the network stack's type system, unlike the outbound proxy. this means that while target types in the outbound proxy like `Http<()>` are specific to HTTP, the inbound proxy's distinction of HTTP/gRPC is determined by obtaining and inspecting the `PermitVariant`. #### 🔗 related some previous pull requests related to this change: * linkerd/linkerd2-proxy#4298 * linkerd/linkerd2-proxy#4180 * linkerd/linkerd2-proxy#4203 * linkerd/linkerd2-proxy#4127 * linkerd/linkerd2-proxy#4119 Signed-off-by: katelyn martin --- .../app/inbound/src/http/router/metrics.rs | 14 +- .../inbound/src/http/router/metrics/status.rs | 223 ++++++++++++++++++ linkerd/app/inbound/src/metrics.rs | 7 +- 3 files changed, 240 insertions(+), 4 deletions(-) create mode 100644 linkerd/app/inbound/src/http/router/metrics/status.rs diff --git a/linkerd/app/inbound/src/http/router/metrics.rs b/linkerd/app/inbound/src/http/router/metrics.rs index 0e62621ad9..11f53ec848 100644 --- a/linkerd/app/inbound/src/http/router/metrics.rs +++ b/linkerd/app/inbound/src/http/router/metrics.rs @@ -1,18 +1,20 @@ use crate::InboundMetrics; use linkerd_app_core::svc; -pub use self::{count_reqs::*, labels::RouteLabels, req_body::*, rsp_body::*}; +pub use self::{count_reqs::*, labels::RouteLabels, req_body::*, rsp_body::*, status::*}; mod count_reqs; mod labels; mod req_body; mod rsp_body; +mod status; pub(super) fn layer( InboundMetrics { request_count, request_body_data, response_body_data, + status_codes, .. }: &InboundMetrics, ) -> impl svc::Layer> { @@ -33,8 +35,14 @@ pub(super) fn layer( NewRecordRequestBodyData::layer_via(extract) }; - svc::layer::mk(move |inner| count.layer(body.layer(request.layer(inner)))) + let status = { + let extract = ExtractStatusCodeParams::new(status_codes.clone()); + NewRecordStatusCode::layer_via(extract) + }; + + svc::layer::mk(move |inner| count.layer(body.layer(request.layer(status.layer(inner))))) } /// An `N`-typed service instrumented with metrics middleware. -type Instrumented = NewCountRequests>>; +type Instrumented = + NewCountRequests>>>; diff --git a/linkerd/app/inbound/src/http/router/metrics/status.rs b/linkerd/app/inbound/src/http/router/metrics/status.rs new file mode 100644 index 0000000000..ef76b82f81 --- /dev/null +++ b/linkerd/app/inbound/src/http/router/metrics/status.rs @@ -0,0 +1,223 @@ +use super::RouteLabels; +use crate::policy::PermitVariant; +use http::StatusCode; +use linkerd_app_core::{ + metrics::prom::{ + self, + encoding::{EncodeLabel, EncodeLabelSet, LabelSetEncoder}, + EncodeLabelSetMut, + }, + svc, Error, +}; +use linkerd_http_prom::{ + status, + stream_label::{ + status::{LabelGrpcStatus, LabelHttpStatus, MkLabelGrpcStatus, MkLabelHttpStatus}, + MkStreamLabel, StreamLabel, + }, +}; + +pub type NewRecordStatusCode = + status::NewRecordStatusCode; + +type StatusMetrics = status::StatusMetrics; + +type Params = status::Params; + +#[derive(Clone, Debug)] +pub struct StatusCodeFamilies { + grpc: StatusMetrics, + http: StatusMetrics, +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub struct StatusCodeLabels { + /// Labels from the inbound route authorizing traffic. + route: RouteLabels, + /// A status code. + status: Option, +} + +#[derive(Clone, Debug)] +pub struct ExtractStatusCodeParams(pub StatusCodeFamilies); + +pub enum MkLabelStatus { + Grpc { + mk_label_grpc: MkLabelGrpcStatus, + route: RouteLabels, + }, + Http { + mk_label_http: MkLabelHttpStatus, + route: RouteLabels, + }, +} + +pub enum LabelStatus { + Grpc { + label_grpc: LabelGrpcStatus, + route: RouteLabels, + }, + Http { + label_http: LabelHttpStatus, + route: RouteLabels, + }, +} + +// === impl StatusCodeFamilies === + +impl StatusCodeFamilies { + /// Registers a new [`StatusCodeFamilies`] with the given registry. + pub fn register(reg: &mut prom::Registry) -> Self { + let grpc = { + let reg = reg.sub_registry_with_prefix("grpc"); + status::StatusMetrics::register(reg, "Completed gRPC responses") + }; + + let http = { + let reg = reg.sub_registry_with_prefix("http"); + status::StatusMetrics::register(reg, "Completed HTTP responses") + }; + + Self { grpc, http } + } + + /// Fetches the proper status code family, given a permitted target. + fn family(&self, variant: PermitVariant) -> &StatusMetrics { + let Self { grpc, http } = self; + match variant { + PermitVariant::Grpc => grpc, + PermitVariant::Http => http, + } + } +} + +// === impl ExtractStatusCodeParams === + +impl ExtractStatusCodeParams { + pub fn new(metrics: StatusCodeFamilies) -> Self { + Self(metrics) + } +} + +impl svc::ExtractParam for ExtractStatusCodeParams +where + T: svc::Param + svc::Param, +{ + fn extract_param(&self, target: &T) -> Params { + let Self(families) = self; + let route: RouteLabels = target.param(); + let variant: PermitVariant = target.param(); + + let metrics = families.family(variant).clone(); + let mk_stream_label = match variant { + PermitVariant::Grpc => { + let mk_label_grpc = MkLabelGrpcStatus; + MkLabelStatus::Grpc { + mk_label_grpc, + route, + } + } + PermitVariant::Http => { + let mk_label_http = MkLabelHttpStatus; + MkLabelStatus::Http { + mk_label_http, + route, + } + } + }; + + Params { + mk_stream_label, + metrics, + } + } +} + +// === impl StatusCodeLabels === + +impl EncodeLabelSetMut for StatusCodeLabels { + fn encode_label_set(&self, enc: &mut LabelSetEncoder<'_>) -> std::fmt::Result { + let Self { route, status } = self; + + route.encode_label_set(enc)?; + ("status", *status).encode(enc.encode_label())?; + + Ok(()) + } +} + +impl EncodeLabelSet for StatusCodeLabels { + fn encode(&self, mut enc: LabelSetEncoder<'_>) -> std::fmt::Result { + self.encode_label_set(&mut enc) + } +} + +// === impl MkLabelStatus === + +impl MkStreamLabel for MkLabelStatus { + type StreamLabel = LabelStatus; + + type StatusLabels = StatusCodeLabels; + type DurationLabels = (); + + fn mk_stream_labeler(&self, req: &http::Request) -> Option { + match self { + Self::Grpc { + mk_label_grpc, + route, + } => mk_label_grpc + .mk_stream_labeler(req) + .map(|label_grpc| LabelStatus::Grpc { + label_grpc, + route: route.clone(), + }), + Self::Http { + mk_label_http, + route, + } => mk_label_http + .mk_stream_labeler(req) + .map(|label_http| LabelStatus::Http { + label_http, + route: route.clone(), + }), + } + } +} + +// === impl LabelStatus === + +impl StreamLabel for LabelStatus { + type StatusLabels = StatusCodeLabels; + type DurationLabels = (); + + fn init_response(&mut self, rsp: &http::Response) { + match self { + Self::Grpc { label_grpc, .. } => label_grpc.init_response(rsp), + Self::Http { label_http, .. } => label_http.init_response(rsp), + } + } + + fn end_response(&mut self, rsp: Result, &Error>) { + match self { + Self::Grpc { label_grpc, .. } => label_grpc.end_response(rsp), + Self::Http { label_http, .. } => label_http.end_response(rsp), + } + } + + fn status_labels(&self) -> Self::StatusLabels { + match self { + Self::Grpc { label_grpc, route } => { + let route = route.clone(); + let status = label_grpc.status_labels().map(|code| code as u16); + StatusCodeLabels { route, status } + } + Self::Http { label_http, route } => { + let route = route.clone(); + let status = label_http.status_labels().as_ref().map(StatusCode::as_u16); + StatusCodeLabels { route, status } + } + } + } + + fn duration_labels(&self) -> Self::DurationLabels {} +} diff --git a/linkerd/app/inbound/src/metrics.rs b/linkerd/app/inbound/src/metrics.rs index 1d55d8427e..111e57f4bb 100644 --- a/linkerd/app/inbound/src/metrics.rs +++ b/linkerd/app/inbound/src/metrics.rs @@ -11,7 +11,9 @@ pub(crate) mod authz; pub(crate) mod error; -use crate::http::router::{RequestBodyFamilies, RequestCountFamilies, ResponseBodyFamilies}; +use crate::http::router::{ + RequestBodyFamilies, RequestCountFamilies, ResponseBodyFamilies, StatusCodeFamilies, +}; pub use linkerd_app_core::metrics::*; /// Holds LEGACY inbound proxy metrics. @@ -32,6 +34,7 @@ pub struct InboundMetrics { pub request_count: RequestCountFamilies, pub request_body_data: RequestBodyFamilies, pub response_body_data: ResponseBodyFamilies, + pub status_codes: StatusCodeFamilies, } impl InboundMetrics { @@ -44,6 +47,7 @@ impl InboundMetrics { let request_count = RequestCountFamilies::register(reg); let request_body_data = RequestBodyFamilies::register(reg); let response_body_data = ResponseBodyFamilies::register(reg); + let status_codes = StatusCodeFamilies::register(reg); Self { http_authz: authz::HttpAuthzMetrics::default(), @@ -56,6 +60,7 @@ impl InboundMetrics { request_count, request_body_data, response_body_data, + status_codes, } } }