diff --git a/README.md b/README.md index 1ab1fb21812a2..ec41abb1df67b 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,9 @@ +For startree build + +Switch to the docker-build branch and find dockerfile under the root path, the instruction for building the img +is in the comments of the docker file + +

Quickstart  •   diff --git a/src/sinks/vector/config.rs b/src/sinks/vector/config.rs index b0e2b6c5a8e6f..2c2f18e16b691 100644 --- a/src/sinks/vector/config.rs +++ b/src/sinks/vector/config.rs @@ -2,6 +2,7 @@ use http::Uri; use hyper::client::HttpConnector; use hyper_openssl::HttpsConnector; use hyper_proxy::ProxyConnector; +use indexmap::IndexMap; use tonic::body::BoxBody; use tower::ServiceBuilder; use vector_config::configurable_component; @@ -58,6 +59,12 @@ pub struct VectorConfig { #[serde(default)] compression: bool, + /// A list of custom headers to add to each request. + #[configurable(metadata( + docs::additional_props_description = "An HTTP request header and it's value." + ))] + headers: Option>, + #[configurable(derived)] #[serde(default)] pub batch: BatchConfig, @@ -98,6 +105,7 @@ fn default_config(address: &str) -> VectorConfig { version: None, address: address.to_owned(), compression: false, + headers: None, batch: BatchConfig::default(), request: TowerRequestConfig::default(), tls: None, @@ -112,6 +120,8 @@ impl SinkConfig for VectorConfig { let tls = MaybeTlsSettings::from_config(&self.tls, false)?; let uri = with_default_scheme(&self.address, tls.is_tls())?; + let headers = self.headers.clone(); + let client = new_client(&tls, cx.proxy())?; let healthcheck_uri = cx @@ -120,9 +130,9 @@ impl SinkConfig for VectorConfig { .clone() .map(|uri| uri.uri) .unwrap_or_else(|| uri.clone()); - let healthcheck_client = VectorService::new(client.clone(), healthcheck_uri, false); + let healthcheck_client = VectorService::new(client.clone(), healthcheck_uri, false, headers.clone()); let healthcheck = healthcheck(healthcheck_client, cx.healthcheck); - let service = VectorService::new(client, uri, self.compression); + let service = VectorService::new(client, uri, self.compression, headers); let request_settings = self.request.unwrap_with(&TowerRequestConfig::default()); let batch_settings = self.batch.into_batcher_settings()?; diff --git a/src/sinks/vector/service.rs b/src/sinks/vector/service.rs index a89271d4f928d..c9b1fc4a0f000 100644 --- a/src/sinks/vector/service.rs +++ b/src/sinks/vector/service.rs @@ -5,8 +5,9 @@ use http::Uri; use hyper::client::HttpConnector; use hyper_openssl::HttpsConnector; use hyper_proxy::ProxyConnector; +use indexmap::IndexMap; use prost::Message; -use tonic::{body::BoxBody, IntoRequest}; +use tonic::body::BoxBody; use tower::Service; use vector_common::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}; use vector_core::stream::DriverResponse; @@ -25,6 +26,7 @@ pub struct VectorService { pub client: proto_vector::Client, pub protocol: String, pub endpoint: String, + pub headers: Option>, } pub struct VectorResponse { @@ -69,6 +71,7 @@ impl VectorService { hyper_client: hyper::Client>, BoxBody>, uri: Uri, compression: bool, + headers: Option>, ) -> Self { let (protocol, endpoint) = uri::protocol_endpoint(uri.clone()); let mut proto_client = proto_vector::Client::new(HyperSvc { @@ -79,10 +82,12 @@ impl VectorService { if compression { proto_client = proto_client.send_compressed(tonic::codec::CompressionEncoding::Gzip); } + Self { client: proto_client, protocol, endpoint, + headers } } } @@ -105,14 +110,31 @@ impl Service for VectorService { // Emission of internal events for errors and dropped events is handled upstream by the caller. fn call(&mut self, mut list: VectorRequest) -> Self::Future { let mut service = self.clone(); + let service_header = service.headers.unwrap_or_default(); + let mut header_entries: Vec<(String, String)> = service_header + .into_iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(); let byte_size = list.request.encoded_len(); let metadata = std::mem::take(list.metadata_mut()); let events_byte_size = metadata.into_events_estimated_json_encoded_byte_size(); + let grpc_request = list.request; // Get the actual gRPC request object from your custom request + + let mut tonic_request = tonic::Request::new(grpc_request); // Convert it to a tonic::Request + + { + let metadata = tonic_request.metadata_mut(); + for (k, v) in header_entries.drain(..) { + let static_key: &'static str = Box::leak(k.into_boxed_str()); + metadata.insert(static_key, v.parse().unwrap()); + } + } + let future = async move { service .client - .push_events(list.request.into_request()) + .push_events(tonic_request) .map_ok(|_response| { emit!(EndpointBytesSent { byte_size,