From fc5f018ba73834347282464c2cddf3db6087dfe2 Mon Sep 17 00:00:00 2001 From: Mike Heffner Date: Tue, 13 Jan 2026 09:38:55 -0500 Subject: [PATCH] Bump to latest Rotel, add logs sub --- Cargo.lock | 25 +++++++++++++++++++++++-- Cargo.toml | 2 +- src/lambda/telemetry_api.rs | 13 +++++++------ src/main.rs | 35 ++++++++++++++++++++++++++++++----- 4 files changed, 61 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f6c1018..661e30d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -921,6 +921,16 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b112acc8b3adf4b107a8ec20977da0273a8c386765a3ec0229bd500a1443f9f" +[[package]] +name = "humantime-serde" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57a3db5ea5923d99402c94e9feb261dc5ee9b4efa158b0315f788cf549cc200c" +dependencies = [ + "humantime", + "serde", +] + [[package]] name = "hyper" version = "1.6.0" @@ -1877,8 +1887,8 @@ dependencies = [ [[package]] name = "rotel" -version = "0.1.0" -source = "git+https://github.com/streamfold/rotel?rev=ec30d9f4a4cf479316247231efb543a11bbfa70d#ec30d9f4a4cf479316247231efb543a11bbfa70d" +version = "0.1.1" +source = "git+https://github.com/streamfold/rotel?rev=866f198870d7efd2f888b606c1569b987bafd207#866f198870d7efd2f888b606c1569b987bafd207" dependencies = [ "bstr", "bytes", @@ -1897,6 +1907,7 @@ dependencies = [ "http", "http-body-util", "humantime", + "humantime-serde", "hyper", "hyper-rustls", "hyper-util", @@ -1932,6 +1943,7 @@ dependencies = [ "tracing-log 0.2.0", "tracing-subscriber", "url", + "utilities", "wildcard", ] @@ -2714,6 +2726,15 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "utilities" +version = "0.0.1" +source = "git+https://github.com/streamfold/rotel?rev=866f198870d7efd2f888b606c1569b987bafd207#866f198870d7efd2f888b606c1569b987bafd207" +dependencies = [ + "chrono", + "opentelemetry-proto", +] + [[package]] name = "valuable" version = "0.1.1" diff --git a/Cargo.toml b/Cargo.toml index fb086e4..e955dac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ rustls = "0.23.20" tracing-subscriber = { version = "0.3.20", features = ["env-filter"] } tracing-appender = "0.2.3" tower = { version = "0.5.2", features = ["retry", "timeout"] } -rotel = { git = "https://github.com/streamfold/rotel", rev = "ec30d9f4a4cf479316247231efb543a11bbfa70d", default-features = false} +rotel = { git = "https://github.com/streamfold/rotel", rev = "866f198870d7efd2f888b606c1569b987bafd207", default-features = false} opentelemetry-proto = "0.30.0" chrono = "0.4.40" opentelemetry-semantic-conventions = { version = "0.30.0", features = ["semconv_experimental"] } diff --git a/src/lambda/telemetry_api.rs b/src/lambda/telemetry_api.rs index f9f1185..2772c69 100644 --- a/src/lambda/telemetry_api.rs +++ b/src/lambda/telemetry_api.rs @@ -18,6 +18,7 @@ use opentelemetry_semantic_conventions::resource::{ use opentelemetry_semantic_conventions::trace::FAAS_INVOKED_REGION; use rotel::bounded_channel::BoundedSender; use rotel::listener::Listener; +use rotel::topology::payload::Message; use std::fmt::{Debug, Display}; use std::future::Future; use std::net::SocketAddr; @@ -37,11 +38,11 @@ static LOG_LIMIT_LAST_LOG: LazyLock>> = LazyLock::new(|| M pub struct TelemetryAPI { pub listener: Listener, - pub logs_tx: BoundedSender, + pub logs_tx: BoundedSender>, } impl TelemetryAPI { - pub fn new(listener: Listener, logs_tx: BoundedSender) -> Self { + pub fn new(listener: Listener, logs_tx: BoundedSender>) -> Self { Self { listener, logs_tx } } @@ -119,14 +120,14 @@ impl TelemetryAPI { pub struct TelemetryService { resource: Resource, bus_tx: BoundedSender, - logs_tx: BoundedSender, + logs_tx: BoundedSender>, } impl TelemetryService { fn new( resource: Resource, bus_tx: BoundedSender, - logs_tx: BoundedSender, + logs_tx: BoundedSender>, ) -> Self { Self { resource, @@ -182,7 +183,7 @@ where async fn handle_request( bus_tx: BoundedSender, - logs_tx: BoundedSender, + logs_tx: BoundedSender>, resource: Resource, body: H, ) -> Result>, BoxError> @@ -230,7 +231,7 @@ where let logs = parse_logs(resource, log_events); match logs { Ok(rl) => { - if let Err(e) = logs_tx.send(rl).await { + if let Err(e) = logs_tx.send(Message::new(None, vec![rl], None)).await { log_with_limit(move || warn!("Failed to send logs: {}", e)); } } diff --git a/src/main.rs b/src/main.rs index bd40c4a..19b6857 100644 --- a/src/main.rs +++ b/src/main.rs @@ -47,6 +47,7 @@ pub const SENDING_QUEUE_SIZE: usize = 10; pub const LOGS_QUEUE_SIZE: usize = 50; +pub const FLUSH_LOGS_TIMEOUT_MILLIS: u64 = 100; // can be short, simply forces biased select ordering pub const FLUSH_PIPELINE_TIMEOUT_MILLIS: u64 = 500; pub const FLUSH_EXPORTERS_TIMEOUT_MILLIS: u64 = 3_000; @@ -231,6 +232,7 @@ async fn run_extension( Err(e) => return Err(format!("Failed to register extension: {}", e).into()), }; + let (mut flush_logs_tx, flush_logs_sub) = FlushBroadcast::new().into_parts(); let (mut flush_pipeline_tx, flush_pipeline_sub) = FlushBroadcast::new().into_parts(); let (mut flush_exporters_tx, flush_exporters_sub) = FlushBroadcast::new().into_parts(); @@ -255,7 +257,7 @@ async fn run_extension( } let agent = Agent::new(agent_args, port_map, SENDING_QUEUE_SIZE, env.clone()) - .with_logs_rx(logs_rx) + .with_logs_rx(logs_rx, flush_logs_sub) .with_pipeline_flush(flush_pipeline_sub) .with_exporters_flush(flush_exporters_sub); let token = agent_cancel.clone(); @@ -332,7 +334,7 @@ async fn run_extension( } }, _ = default_flush_interval.tick() => { - force_flush(&mut flush_pipeline_tx, &mut flush_exporters_tx, &mut default_flush_interval).await; + force_flush(&mut flush_logs_tx, &mut flush_pipeline_tx, &mut flush_exporters_tx, &mut default_flush_interval).await; } } } @@ -341,6 +343,7 @@ async fn run_extension( // Force a flush // force_flush( + &mut flush_logs_tx, &mut flush_pipeline_tx, &mut flush_exporters_tx, &mut default_flush_interval, @@ -361,6 +364,7 @@ async fn run_extension( // function invocation. if control.should_flush() { force_flush( + &mut flush_logs_tx, &mut flush_pipeline_tx, &mut flush_exporters_tx, &mut default_flush_interval, @@ -410,7 +414,7 @@ async fn run_extension( }, _ = default_flush_interval.tick() => { - force_flush(&mut flush_pipeline_tx, &mut flush_exporters_tx, &mut default_flush_interval).await; + force_flush(&mut flush_logs_tx, &mut flush_pipeline_tx, &mut flush_exporters_tx, &mut default_flush_interval).await; } } } @@ -439,14 +443,35 @@ async fn run_extension( } async fn force_flush( + logs_tx: &mut FlushSender, pipeline_tx: &mut FlushSender, exporters_tx: &mut FlushSender, default_flush: &mut Interval, ) { + let start = Instant::now(); + match timeout( + Duration::from_millis(FLUSH_LOGS_TIMEOUT_MILLIS), + logs_tx.broadcast(None), + ) + .await + { + Err(_) => { + warn!("timeout waiting to logs"); + return; + } + Ok(Err(e)) => { + warn!("failed to flush logs: {}", e); + return; + } + _ => {} + } + let duration = Instant::now().duration_since(start); + debug!(?duration, "finished flushing logs"); + let start = Instant::now(); match timeout( Duration::from_millis(FLUSH_PIPELINE_TIMEOUT_MILLIS), - pipeline_tx.broadcast(), + pipeline_tx.broadcast(None), ) .await { @@ -466,7 +491,7 @@ async fn force_flush( let start = Instant::now(); match timeout( Duration::from_millis(FLUSH_EXPORTERS_TIMEOUT_MILLIS), - exporters_tx.broadcast(), + exporters_tx.broadcast(None), ) .await {