diff --git a/src/sinks/vector/config.rs b/src/sinks/vector/config.rs index 9f2664c3a11b3..1f5128cbd061f 100644 --- a/src/sinks/vector/config.rs +++ b/src/sinks/vector/config.rs @@ -2,6 +2,7 @@ use http::Uri; use hyper::client::HttpConnector; use hyper_openssl::HttpsConnector; use hyper_proxy::ProxyConnector; +use indexmap::IndexMap; use tonic::body::BoxBody; use tower::ServiceBuilder; use vector_lib::configurable::configurable_component; @@ -58,6 +59,12 @@ pub struct VectorConfig { #[serde(default)] compression: bool, + /// A list of custom headers to add to each request. + #[configurable(metadata( + docs::additional_props_description = "An HTTP request header and it's value." + ))] + headers: Option>, + #[configurable(derived)] #[serde(default)] pub batch: BatchConfig, @@ -98,6 +105,7 @@ fn default_config(address: &str) -> VectorConfig { version: None, address: address.to_owned(), compression: false, + headers: None, batch: BatchConfig::default(), request: TowerRequestConfig::default(), tls: None, @@ -112,6 +120,8 @@ impl SinkConfig for VectorConfig { let tls = MaybeTlsSettings::from_config(&self.tls, false)?; let uri = with_default_scheme(&self.address, tls.is_tls())?; + let headers = self.headers.clone(); + let client = new_client(&tls, cx.proxy())?; let healthcheck_uri = cx @@ -120,9 +130,9 @@ impl SinkConfig for VectorConfig { .clone() .map(|uri| uri.uri) .unwrap_or_else(|| uri.clone()); - let healthcheck_client = VectorService::new(client.clone(), healthcheck_uri, false); + let healthcheck_client = VectorService::new(client.clone(), healthcheck_uri, false, headers.clone()); let healthcheck = healthcheck(healthcheck_client, cx.healthcheck); - let service = VectorService::new(client, uri, self.compression); + let service = VectorService::new(client, uri, self.compression, headers); let request_settings = self.request.into_settings(); let batch_settings = self.batch.into_batcher_settings()?; diff --git a/src/sinks/vector/service.rs b/src/sinks/vector/service.rs index 1a0b240e4338d..1b07c01013d5a 100644 --- a/src/sinks/vector/service.rs +++ b/src/sinks/vector/service.rs @@ -5,8 +5,10 @@ use http::Uri; use hyper::client::HttpConnector; use hyper_openssl::HttpsConnector; use hyper_proxy::ProxyConnector; +use indexmap::IndexMap; use prost::Message; -use tonic::{body::BoxBody, IntoRequest}; +use tonic::body::BoxBody; +use tonic::metadata::MetadataKey; use tower::Service; use vector_lib::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}; use vector_lib::stream::DriverResponse; @@ -25,6 +27,7 @@ pub struct VectorService { pub client: proto_vector::Client, pub protocol: String, pub endpoint: String, + pub headers: Option>, } pub struct VectorResponse { @@ -69,6 +72,7 @@ impl VectorService { hyper_client: hyper::Client>, BoxBody>, uri: Uri, compression: bool, + headers: Option>, ) -> Self { let (protocol, endpoint) = uri::protocol_endpoint(uri.clone()); let mut proto_client = proto_vector::Client::new(HyperSvc { @@ -79,10 +83,12 @@ impl VectorService { if compression { proto_client = proto_client.send_compressed(tonic::codec::CompressionEncoding::Gzip); } + Self { client: proto_client, protocol, endpoint, + headers } } } @@ -105,14 +111,30 @@ impl Service for VectorService { // Emission of internal events for errors and dropped events is handled upstream by the caller. fn call(&mut self, mut list: VectorRequest) -> Self::Future { let mut service = self.clone(); + let service_header = service.headers.unwrap_or_default(); + let mut header_entries: Vec<(String, String)> = service_header + .into_iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(); let byte_size = list.request.encoded_len(); let metadata = std::mem::take(list.metadata_mut()); let events_byte_size = metadata.into_events_estimated_json_encoded_byte_size(); + let grpc_request = list.request; // Get the actual gRPC request object from your custom request + + let mut tonic_request = tonic::Request::new(grpc_request); // Convert it to a tonic::Request + + { + let metadata = tonic_request.metadata_mut(); + for (k, v) in header_entries.drain(..) { + metadata.insert(MetadataKey::from_bytes(k.as_bytes()).unwrap(), v.parse().unwrap()); + } + } + let future = async move { service .client - .push_events(list.request.into_request()) + .push_events(tonic_request) .map_ok(|_response| { emit!(EndpointBytesSent { byte_size,