Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ rustls = "0.23.20"
tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
tracing-appender = "0.2.3"
tower = { version = "0.5.2", features = ["retry", "timeout"] }
rotel = { git = "https://github.com/streamfold/rotel", rev = "ec30d9f4a4cf479316247231efb543a11bbfa70d", default-features = false}
rotel = { git = "https://github.com/streamfold/rotel", rev = "866f198870d7efd2f888b606c1569b987bafd207", default-features = false}
opentelemetry-proto = "0.30.0"
chrono = "0.4.40"
opentelemetry-semantic-conventions = { version = "0.30.0", features = ["semconv_experimental"] }
Expand Down
13 changes: 7 additions & 6 deletions src/lambda/telemetry_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use opentelemetry_semantic_conventions::resource::{
use opentelemetry_semantic_conventions::trace::FAAS_INVOKED_REGION;
use rotel::bounded_channel::BoundedSender;
use rotel::listener::Listener;
use rotel::topology::payload::Message;
use std::fmt::{Debug, Display};
use std::future::Future;
use std::net::SocketAddr;
Expand All @@ -37,11 +38,11 @@ static LOG_LIMIT_LAST_LOG: LazyLock<Mutex<Option<Instant>>> = LazyLock::new(|| M

pub struct TelemetryAPI {
pub listener: Listener,
pub logs_tx: BoundedSender<ResourceLogs>,
pub logs_tx: BoundedSender<Message<ResourceLogs>>,
}

impl TelemetryAPI {
pub fn new(listener: Listener, logs_tx: BoundedSender<ResourceLogs>) -> Self {
pub fn new(listener: Listener, logs_tx: BoundedSender<Message<ResourceLogs>>) -> Self {
Self { listener, logs_tx }
}

Expand Down Expand Up @@ -119,14 +120,14 @@ impl TelemetryAPI {
pub struct TelemetryService {
resource: Resource,
bus_tx: BoundedSender<LambdaTelemetry>,
logs_tx: BoundedSender<ResourceLogs>,
logs_tx: BoundedSender<Message<ResourceLogs>>,
}

impl TelemetryService {
fn new(
resource: Resource,
bus_tx: BoundedSender<LambdaTelemetry>,
logs_tx: BoundedSender<ResourceLogs>,
logs_tx: BoundedSender<Message<ResourceLogs>>,
) -> Self {
Self {
resource,
Expand Down Expand Up @@ -182,7 +183,7 @@ where

async fn handle_request<H>(
bus_tx: BoundedSender<LambdaTelemetry>,
logs_tx: BoundedSender<ResourceLogs>,
logs_tx: BoundedSender<Message<ResourceLogs>>,
resource: Resource,
body: H,
) -> Result<Response<Full<Bytes>>, BoxError>
Expand Down Expand Up @@ -230,7 +231,7 @@ where
let logs = parse_logs(resource, log_events);
match logs {
Ok(rl) => {
if let Err(e) = logs_tx.send(rl).await {
if let Err(e) = logs_tx.send(Message::new(None, vec![rl], None)).await {
log_with_limit(move || warn!("Failed to send logs: {}", e));
}
}
Expand Down
35 changes: 30 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub const SENDING_QUEUE_SIZE: usize = 10;

pub const LOGS_QUEUE_SIZE: usize = 50;

pub const FLUSH_LOGS_TIMEOUT_MILLIS: u64 = 100; // can be short, simply forces biased select ordering
pub const FLUSH_PIPELINE_TIMEOUT_MILLIS: u64 = 500;
pub const FLUSH_EXPORTERS_TIMEOUT_MILLIS: u64 = 3_000;

Expand Down Expand Up @@ -231,6 +232,7 @@ async fn run_extension(
Err(e) => return Err(format!("Failed to register extension: {}", e).into()),
};

let (mut flush_logs_tx, flush_logs_sub) = FlushBroadcast::new().into_parts();
let (mut flush_pipeline_tx, flush_pipeline_sub) = FlushBroadcast::new().into_parts();
let (mut flush_exporters_tx, flush_exporters_sub) = FlushBroadcast::new().into_parts();

Expand All @@ -255,7 +257,7 @@ async fn run_extension(
}

let agent = Agent::new(agent_args, port_map, SENDING_QUEUE_SIZE, env.clone())
.with_logs_rx(logs_rx)
.with_logs_rx(logs_rx, flush_logs_sub)
.with_pipeline_flush(flush_pipeline_sub)
.with_exporters_flush(flush_exporters_sub);
let token = agent_cancel.clone();
Expand Down Expand Up @@ -332,7 +334,7 @@ async fn run_extension(
}
},
_ = default_flush_interval.tick() => {
force_flush(&mut flush_pipeline_tx, &mut flush_exporters_tx, &mut default_flush_interval).await;
force_flush(&mut flush_logs_tx, &mut flush_pipeline_tx, &mut flush_exporters_tx, &mut default_flush_interval).await;
}
}
}
Expand All @@ -341,6 +343,7 @@ async fn run_extension(
// Force a flush
//
force_flush(
&mut flush_logs_tx,
&mut flush_pipeline_tx,
&mut flush_exporters_tx,
&mut default_flush_interval,
Expand All @@ -361,6 +364,7 @@ async fn run_extension(
// function invocation.
if control.should_flush() {
force_flush(
&mut flush_logs_tx,
&mut flush_pipeline_tx,
&mut flush_exporters_tx,
&mut default_flush_interval,
Expand Down Expand Up @@ -410,7 +414,7 @@ async fn run_extension(
},

_ = default_flush_interval.tick() => {
force_flush(&mut flush_pipeline_tx, &mut flush_exporters_tx, &mut default_flush_interval).await;
force_flush(&mut flush_logs_tx, &mut flush_pipeline_tx, &mut flush_exporters_tx, &mut default_flush_interval).await;
}
}
}
Expand Down Expand Up @@ -439,14 +443,35 @@ async fn run_extension(
}

async fn force_flush(
logs_tx: &mut FlushSender,
pipeline_tx: &mut FlushSender,
exporters_tx: &mut FlushSender,
default_flush: &mut Interval,
) {
let start = Instant::now();
match timeout(
Duration::from_millis(FLUSH_LOGS_TIMEOUT_MILLIS),
logs_tx.broadcast(None),
)
.await
{
Err(_) => {
warn!("timeout waiting to logs");
return;
}
Ok(Err(e)) => {
warn!("failed to flush logs: {}", e);
return;
}
_ => {}
}
let duration = Instant::now().duration_since(start);
debug!(?duration, "finished flushing logs");

let start = Instant::now();
match timeout(
Duration::from_millis(FLUSH_PIPELINE_TIMEOUT_MILLIS),
pipeline_tx.broadcast(),
pipeline_tx.broadcast(None),
)
.await
{
Expand All @@ -466,7 +491,7 @@ async fn force_flush(
let start = Instant::now();
match timeout(
Duration::from_millis(FLUSH_EXPORTERS_TIMEOUT_MILLIS),
exporters_tx.broadcast(),
exporters_tx.broadcast(None),
)
.await
{
Expand Down