From c27e615b7513491d0f7daa62a6c1ff80930566de Mon Sep 17 00:00:00 2001 From: Ilia Bakhterev Date: Thu, 30 Oct 2025 20:35:28 +0100 Subject: [PATCH 1/6] Added new table of components with cache renewal; --- Cargo.toml | 3 +- src/bin/reporter.rs | 208 ++++++++++++++++++++++++++++++++++++-------- 2 files changed, 176 insertions(+), 35 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 880dc4a..d2b4b3d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,9 +18,10 @@ path="src/bin/reporter.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +anyhow = { version = "~1.0" } axum = { version="~0.6" } axum-macros = { version="~0.3" } -chrono = "~0.4" +chrono = { version = "~0.4", features = ["serde"] } config = "~0.13" evalexpr = "~9.0" glob = "~0.3" diff --git a/src/bin/reporter.rs b/src/bin/reporter.rs index c549d85..001b61a 100644 --- a/src/bin/reporter.rs +++ b/src/bin/reporter.rs @@ -13,6 +13,8 @@ use reqwest::{ use tokio::signal; use tokio::time::{sleep, Duration}; +use anyhow::Result; +use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -24,7 +26,7 @@ use jwt::SignWithKey; use sha2::Sha256; use std::collections::BTreeMap; -#[derive(Clone, Deserialize, Serialize, Debug)] +#[derive(Clone, Deserialize, Serialize, Debug, PartialEq, Eq, Hash, Ord, PartialOrd)] pub struct ComponentAttribute { pub name: String, pub value: String, @@ -36,13 +38,28 @@ pub struct Component { pub attributes: Vec, } -#[derive(Deserialize, Serialize, Debug)] -pub struct ComponentStatus { +/// Structure for deserializing components from Status Dashboard API v2 (/v2/components) +#[derive(Clone, Deserialize, Serialize, Debug)] +pub struct StatusDashboardComponent { + pub id: u32, pub name: String, - pub impact: u8, + #[serde(default)] pub attributes: Vec, } +/// Structure for serializing incident data for Status Dashboard API v2 (/v2/incidents) +#[derive(Clone, Deserialize, Serialize, Debug)] +pub struct IncidentData { + pub title: String, + pub impact: u8, + pub components: Vec, + pub start_date: DateTime, + #[serde(default)] + pub system: bool, + #[serde(rename = "type")] + pub incident_type: String, +} + #[tokio::main] async fn main() { //Enable logging @@ -86,14 +103,66 @@ async fn main() { tracing::info!("Stopped cloudmon-metrics-reporting"); } +/// Fetches components from the Status Dashboard API. +async fn fetch_components( + req_client: &reqwest::Client, + components_url: &str, +) -> Result> { + let response = req_client.get(components_url).send().await?; + response.error_for_status_ref()?; + let components = response.json::>().await?; + Ok(components) +} + +/// Fetches components with a retry mechanism. +async fn fetch_components_with_retry( + req_client: &reqwest::Client, + components_url: &str, +) -> Option> { + let mut attempts = 0; + loop { + match fetch_components(req_client, components_url).await { + Ok(components) => { + tracing::info!("Successfully fetched {} components.", components.len()); + return Some(components); + } + Err(e) => { + attempts += 1; + tracing::error!("Failed to fetch components (attempt {}/3): {}", attempts, e); + if attempts >= 3 { + tracing::error!("Could not fetch components after 3 attempts. Giving up."); + return None; + } + tracing::info!("Retrying in 60 seconds..."); + sleep(Duration::from_secs(60)).await; + } + } + } +} + +/// Builds a cache mapping (ComponentName, Attributes) -> ComponentID. +fn build_component_id_cache( + components: Vec, +) -> HashMap<(String, Vec), u32> { + components + .into_iter() + .map(|c| { + let mut attrs = c.attributes; + attrs.sort(); + ((c.name, attrs), c.id) + }) + .collect() +} + async fn metric_watcher(config: &Config) { tracing::info!("Starting metric reporter thread"); - // Init reqwest client + // Init reqwest client. let req_client: reqwest::Client = ClientBuilder::new() - .timeout(Duration::from_secs(2 as u64)) + .timeout(Duration::from_secs(10 as u64)) .build() .unwrap(); - // Endless loop + + // This is the logic to build a component lookup table from config. let mut components: HashMap> = HashMap::new(); for env in config.environments.iter() { let comp_env_entry = components.entry(env.name.clone()).or_insert(HashMap::new()); @@ -124,11 +193,37 @@ async fn metric_watcher(config: &Config) { } } } + let sdb_config = config .status_dashboard .as_ref() .expect("Status dashboard section is missing"); - let status_report_url = format!("{}/v1/component_status", sdb_config.url.clone(),); + + // Fetch components from Status Dashboard and build a cache to resolve component name to ID. + let components_url = format!("{}/v2/components", sdb_config.url.clone()); + let mut component_id_cache = + match fetch_components_with_retry(&req_client, &components_url).await { + Some(components) => { + if components.is_empty() { + tracing::error!( + "Component list from status-dashboard is empty. Reporter cannot proceed." + ); + return; + } + build_component_id_cache(components) + } + None => { + tracing::error!("Failed to fetch initial component list. Reporter cannot proceed."); + return; + } + }; + tracing::info!( + "Successfully cached {} components from status-dashboard.", + component_id_cache.len() + ); + + // Prepare for incident reporting + let incidents_url = format!("{}/v2/incidents", sdb_config.url.clone()); let mut headers = HeaderMap::new(); if let Some(ref secret) = sdb_config.secret { let key: Hmac = Hmac::new_from_slice(secret.as_bytes()).unwrap(); @@ -139,10 +234,29 @@ async fn metric_watcher(config: &Config) { headers.insert(AUTHORIZATION, bearer.parse().unwrap()); } loop { - // For every env from config + // Refresh component cache before checking metrics. + tracing::info!("Refreshing component cache..."); + match fetch_components(&req_client, &components_url).await { + Ok(components) if !components.is_empty() => { + component_id_cache = build_component_id_cache(components); + tracing::info!( + "Successfully refreshed component cache. New size: {}", + component_id_cache.len() + ); + } + Ok(_) => { + tracing::warn!("Component list from status-dashboard is empty. Using old cache.") + } + Err(e) => tracing::warn!( + "Failed to refresh component cache, using old one. Error: {}", + e + ), + }; + + // For every env from config. for env in config.environments.iter() { tracing::trace!("env {:?}", env); - // For every component (health_metric service) + // For every component (health_metric service). for component in config.health_metrics.iter() { tracing::trace!("Component {:?}", component.0); // Query metric-convertor for the status @@ -180,34 +294,60 @@ async fn metric_watcher(config: &Config) { .get(component.0) .unwrap(); tracing::info!("Component to report: {:?}", component); - let body = ComponentStatus { - name: component.name.clone(), - impact: last.1, - attributes: component.attributes.clone(), - }; - let res = req_client - .post(&status_report_url) - .headers(headers.clone()) - .json(&body) - .send() - .await; - match res { - Ok(rsp) => { - if rsp.status().is_client_error() { + + // Search for component ID in the cache using name and attributes. + let mut search_attrs = component.attributes.clone(); + search_attrs.sort(); + let cache_key = (component.name.clone(), search_attrs); + + if let Some(component_id) = + component_id_cache.get(&cache_key) + { + tracing::info!( + "Found component ID {} in cache.", + component_id + ); + + // Build IncidentData body for API v2 + let body = IncidentData { + title: component.name.clone(), + impact: last.1, + components: vec![*component_id], + start_date: Utc::now(), + system: true, + incident_type: "incident".to_string(), + }; + + let res = req_client + .post(&incidents_url) + .headers(headers.clone()) + .json(&body) + .send() + .await; + match res { + Ok(rsp) => { + if !rsp.status().is_success() { + tracing::error!( + "Error reporting incident: [{}] {:?}", + rsp.status(), + rsp.text().await + ); + } else { + tracing::info!("Successfully reported incident for component '{}'.", component.name); + } + } + Err(e) => { tracing::error!( - "Error: [{}] {:?}", - rsp.status(), - rsp.text().await + "Error during sending post request for incident: {}", + e ); } } - - Err(e) => { - tracing::error!( - "Error during posting component status: {}", - e - ); - } + } else { + tracing::error!( + "Component with name '{}' and attributes {:?} not found in status-dashboard cache.", + component.name, component.attributes + ); } } } From 51b738953842adb682a70337facf4cbdc492ce14 Mon Sep 17 00:00:00 2001 From: Ilia Bakhterev Date: Thu, 6 Nov 2025 15:55:28 +0100 Subject: [PATCH 2/6] mock convertor --- src/bin/mock_convertor.rs | 83 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 src/bin/mock_convertor.rs diff --git a/src/bin/mock_convertor.rs b/src/bin/mock_convertor.rs new file mode 100644 index 0000000..4d29b36 --- /dev/null +++ b/src/bin/mock_convertor.rs @@ -0,0 +1,83 @@ +use axum::{ + extract::{Query, State}, + http::StatusCode, + response::Json, + routing::get, + Router, +}; +use chrono::Utc; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::{Arc, Mutex}; +use tokio::signal; + +#[derive(Deserialize, Debug)] +struct HealthQuery { + environment: String, + service: String, + #[serde(default)] + _from: String, + #[serde(default)] + _to: String, +} + +/// Structure of the response which waits reporter.rs +#[derive(Serialize, Debug)] +struct ServiceHealthResponse { + metrics: Vec<(i64, u8)>, +} + +/// State of the mock server. Mutex provides live changes. +type AppState = Arc>>; + +#[tokio::main] +async fn main() { + // Key "environment:service", value - status. + let health_statuses: AppState = Arc::new(Mutex::new(HashMap::new())); + + // Initial state + // 0 = OK, >0 = Problem + health_statuses + .lock() + .unwrap() + .insert("production_eu-de:test".to_string(), 2); // Imitate a problem (impact = 2) + + let app = Router::new() + .route("/api/v1/health", get(health_handler)) + .with_state(health_statuses); + + let addr = SocketAddr::from(([127, 0, 0, 1], 3005)); + println!("Mock convertor listening on {}", addr); + + axum::Server::bind(&addr) + .serve(app.into_make_service()) + .with_graceful_shutdown(shutdown_signal()) + .await + .unwrap(); +} + +async fn health_handler( + State(state): State, + Query(params): Query, +) -> (StatusCode, Json) { + let key = format!("{}:{}", params.environment, params.service); + println!("Received request for: {}", key); + + let statuses = state.lock().unwrap(); + let status_value = statuses.get(&key).cloned().unwrap_or(0); + + let response = ServiceHealthResponse { + metrics: vec![(Utc::now().timestamp(), status_value)], + }; + + println!("Responding with status: {}", status_value); + (StatusCode::OK, Json(response)) +} + +async fn shutdown_signal() { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + println!("Signal received, shutting down mock server."); +} From c65799c4d11f925d8c11772d63871d8f8c2f089d Mon Sep 17 00:00:00 2001 From: Ilia Bakhterev Date: Fri, 7 Nov 2025 10:58:48 +0100 Subject: [PATCH 3/6] caching process redesigned --- src/bin/reporter.rs | 169 +++++++++++++++++++++++++++----------------- 1 file changed, 106 insertions(+), 63 deletions(-) diff --git a/src/bin/reporter.rs b/src/bin/reporter.rs index 001b61a..7554bc5 100644 --- a/src/bin/reporter.rs +++ b/src/bin/reporter.rs @@ -38,7 +38,7 @@ pub struct Component { pub attributes: Vec, } -/// Structure for deserializing components from Status Dashboard API v2 (/v2/components) +/// Structure for deserializing components from Status Dashboard API v2 (/v2/components). #[derive(Clone, Deserialize, Serialize, Debug)] pub struct StatusDashboardComponent { pub id: u32, @@ -47,10 +47,12 @@ pub struct StatusDashboardComponent { pub attributes: Vec, } -/// Structure for serializing incident data for Status Dashboard API v2 (/v2/incidents) +/// Structure for serializing incident data for Status Dashboard API v2 (/v2/incidents). #[derive(Clone, Deserialize, Serialize, Debug)] pub struct IncidentData { pub title: String, + #[serde(default)] + pub description: String, pub impact: u8, pub components: Vec, pub start_date: DateTime, @@ -62,7 +64,7 @@ pub struct IncidentData { #[tokio::main] async fn main() { - //Enable logging + //Enable logging. tracing_subscriber::registry() .with(tracing_subscriber::EnvFilter::new( std::env::var("RUST_LOG").unwrap_or_else(|_| "info".into()), @@ -72,10 +74,10 @@ async fn main() { tracing::info!("Starting cloudmon-metrics-reporter"); - // Parse config + // Parse config. let config = Config::new("config.yaml").unwrap(); - // Set up CTRL+C handlers + // Set up CTRL+C handlers. let ctrl_c = async { signal::ctrl_c() .await @@ -93,7 +95,7 @@ async fn main() { #[cfg(not(unix))] let terminate = std::future::pending::<()>(); - // Execute metric_watcher unless need to stop + // Execute metric_watcher unless need to stop. tokio::select! { _ = metric_watcher(&config) => {}, _ = ctrl_c => {}, @@ -114,6 +116,41 @@ async fn fetch_components( Ok(components) } +/// Fetches components, builds, and returns a new component ID cache. +/// +/// # Arguments: +/// * `req_client` - The reqwest client; +/// * `components_url` - The URL to fetch components from; +/// * `with_retry` - If true, it will retry fetching up to 3 times on failure. +async fn update_component_cache( + req_client: &reqwest::Client, + components_url: &str, + with_retry: bool, +) -> Result), u32>> { + tracing::info!("Updating component cache..."); + + let fetch_future = if with_retry { + fetch_components_with_retry(req_client, components_url).await + } else { + fetch_components(req_client, components_url).await.ok() + }; + + match fetch_future { + Some(components) if !components.is_empty() => { + let cache = build_component_id_cache(components); + tracing::info!( + "Successfully updated component cache. New size: {}", + cache.len() + ); + Ok(cache) + } + Some(_) => { + // Components list is empty + anyhow::bail!("Component list from status-dashboard is empty.") + } + None => anyhow::bail!("Failed to fetch component list from status-dashboard."), + } +} /// Fetches components with a retry mechanism. async fn fetch_components_with_retry( req_client: &reqwest::Client, @@ -163,9 +200,11 @@ async fn metric_watcher(config: &Config) { .unwrap(); // This is the logic to build a component lookup table from config. - let mut components: HashMap> = HashMap::new(); + let mut components_from_config: HashMap> = HashMap::new(); for env in config.environments.iter() { - let comp_env_entry = components.entry(env.name.clone()).or_insert(HashMap::new()); + let comp_env_entry = components_from_config + .entry(env.name.clone()) + .or_insert(HashMap::new()); let mut env_attrs: Vec = Vec::new(); if let Some(ref attrs) = env.attributes { for (key, val) in attrs.iter() { @@ -176,11 +215,11 @@ async fn metric_watcher(config: &Config) { } } - for component in config.health_metrics.iter() { - match component.1.component_name { + for component_def in config.health_metrics.iter() { + match component_def.1.component_name { Some(ref name) => { comp_env_entry.insert( - component.0.clone(), + component_def.0.clone(), Component { name: name.clone(), attributes: env_attrs.clone(), @@ -188,7 +227,7 @@ async fn metric_watcher(config: &Config) { ); } None => { - tracing::warn!("No component_name is given for {}", component.1.service); + tracing::warn!("No component_name is given for {}", component_def.1.service); } } } @@ -202,25 +241,16 @@ async fn metric_watcher(config: &Config) { // Fetch components from Status Dashboard and build a cache to resolve component name to ID. let components_url = format!("{}/v2/components", sdb_config.url.clone()); let mut component_id_cache = - match fetch_components_with_retry(&req_client, &components_url).await { - Some(components) => { - if components.is_empty() { - tracing::error!( - "Component list from status-dashboard is empty. Reporter cannot proceed." - ); - return; - } - build_component_id_cache(components) - } - None => { - tracing::error!("Failed to fetch initial component list. Reporter cannot proceed."); + match update_component_cache(&req_client, &components_url, true).await { + Ok(cache) => cache, + Err(e) => { + tracing::error!( + "Initial component cache load failed: {}. Reporter cannot proceed.", + e + ); return; } }; - tracing::info!( - "Successfully cached {} components from status-dashboard.", - component_id_cache.len() - ); // Prepare for incident reporting let incidents_url = format!("{}/v2/incidents", sdb_config.url.clone()); @@ -234,31 +264,12 @@ async fn metric_watcher(config: &Config) { headers.insert(AUTHORIZATION, bearer.parse().unwrap()); } loop { - // Refresh component cache before checking metrics. - tracing::info!("Refreshing component cache..."); - match fetch_components(&req_client, &components_url).await { - Ok(components) if !components.is_empty() => { - component_id_cache = build_component_id_cache(components); - tracing::info!( - "Successfully refreshed component cache. New size: {}", - component_id_cache.len() - ); - } - Ok(_) => { - tracing::warn!("Component list from status-dashboard is empty. Using old cache.") - } - Err(e) => tracing::warn!( - "Failed to refresh component cache, using old one. Error: {}", - e - ), - }; - // For every env from config. for env in config.environments.iter() { tracing::trace!("env {:?}", env); // For every component (health_metric service). - for component in config.health_metrics.iter() { - tracing::trace!("Component {:?}", component.0); + for component_def in config.health_metrics.iter() { + tracing::trace!("Component {:?}", component_def.0); // Query metric-convertor for the status match req_client .get(format!( @@ -268,7 +279,7 @@ async fn metric_watcher(config: &Config) { // Query env/service for time [-2min..-1min] .query(&[ ("environment", env.name.clone()), - ("service", component.0.clone()), + ("service", component_def.0.clone()), ("from", "-5min".to_string()), ("to", "-2min".to_string()), ]) @@ -279,19 +290,20 @@ async fn metric_watcher(config: &Config) { if rsp.status().is_client_error() { tracing::error!("Got API error {:?}", rsp.text().await); } else { - // Try to parse response + // Try to parse response. match rsp.json::().await { Ok(mut data) => { tracing::debug!("response {:?}", data); - // Peek at last metric in the vector + // Peek at last metric in the vector. if let Some(last) = data.metrics.pop() { // Is metric showing issues? if last.1 > 0 { + // 0 means OK tracing::info!("Bad status found: {}", last.1); - let component = components + let component = components_from_config .get(&env.name) .unwrap() - .get(component.0) + .get(component_def.0) .unwrap(); tracing::info!("Component to report: {:?}", component); @@ -300,24 +312,51 @@ async fn metric_watcher(config: &Config) { search_attrs.sort(); let cache_key = (component.name.clone(), search_attrs); - if let Some(component_id) = - component_id_cache.get(&cache_key) - { + let mut component_id = + component_id_cache.get(&cache_key); + + // If component not found, refresh cache and try again. + if component_id.is_none() { + tracing::info!( + "Component '{}' with attributes {:?} not found in cache. Attempting to refresh.", + component.name, component.attributes + ); + match update_component_cache( + &req_client, + &components_url, + false, + ) + .await + { + Ok(new_cache) => { + component_id_cache = new_cache; + component_id = + component_id_cache.get(&cache_key); + } + Err(e) => { + tracing::warn!("Failed to refresh component cache, using old one. Error: {}", e); + } + } + } + + if let Some(id) = component_id { tracing::info!( "Found component ID {} in cache.", - component_id + id ); // Build IncidentData body for API v2 let body = IncidentData { - title: component.name.clone(), + title: "System incident from monitoring system" + .to_string(), + description: "System-wide incident affecting multiple components. Created automatically." + .to_string(), impact: last.1, - components: vec![*component_id], + components: vec![*id], start_date: Utc::now(), system: true, incident_type: "incident".to_string(), }; - let res = req_client .post(&incidents_url) .headers(headers.clone()) @@ -333,7 +372,11 @@ async fn metric_watcher(config: &Config) { rsp.text().await ); } else { - tracing::info!("Successfully reported incident for component '{}'.", component.name); + tracing::info!( + "Successfully reported incident for component '{}' with attributes {:?}.", + component.name, + component.attributes + ); } } Err(e) => { @@ -345,7 +388,7 @@ async fn metric_watcher(config: &Config) { } } else { tracing::error!( - "Component with name '{}' and attributes {:?} not found in status-dashboard cache.", + "Component with name '{}' and attributes {:?} still not found in status-dashboard cache after refresh.", component.name, component.attributes ); } From d1c751c0379e70cf6c5dfec302ff1d6bcfdb8856 Mon Sep 17 00:00:00 2001 From: Ilia Bakhterev Date: Tue, 11 Nov 2025 18:39:24 +0100 Subject: [PATCH 4/6] component matching process changed --- src/bin/mock_convertor.rs | 6 ++++++ src/bin/reporter.rs | 31 +++++++++++++++++++++++++------ 2 files changed, 31 insertions(+), 6 deletions(-) diff --git a/src/bin/mock_convertor.rs b/src/bin/mock_convertor.rs index 4d29b36..63466e9 100644 --- a/src/bin/mock_convertor.rs +++ b/src/bin/mock_convertor.rs @@ -25,6 +25,9 @@ struct HealthQuery { /// Structure of the response which waits reporter.rs #[derive(Serialize, Debug)] struct ServiceHealthResponse { + name: String, + service_category: String, + environment: String, metrics: Vec<(i64, u8)>, } @@ -68,6 +71,9 @@ async fn health_handler( let status_value = statuses.get(&key).cloned().unwrap_or(0); let response = ServiceHealthResponse { + name: params.service.clone(), + service_category: "mock_category".to_string(), + environment: params.environment.clone(), metrics: vec![(Utc::now().timestamp(), status_value)], }; diff --git a/src/bin/reporter.rs b/src/bin/reporter.rs index 7554bc5..88d561b 100644 --- a/src/bin/reporter.rs +++ b/src/bin/reporter.rs @@ -310,10 +310,29 @@ async fn metric_watcher(config: &Config) { // Search for component ID in the cache using name and attributes. let mut search_attrs = component.attributes.clone(); search_attrs.sort(); - let cache_key = (component.name.clone(), search_attrs); + let mut required_attrs = component.attributes.clone(); + required_attrs.sort(); + + let find_id = + |cache: &HashMap< + (String, Vec), + u32, + >| { + cache + .iter() + .find(|((name, attrs), _id)| { + if name != &component.name { + return false; + } + required_attrs + .iter() + .all(|r| attrs.contains(r)) + }) + .map(|(_, id)| *id) + }; - let mut component_id = - component_id_cache.get(&cache_key); + // First attemption to find Component + let mut component_id = find_id(&component_id_cache); // If component not found, refresh cache and try again. if component_id.is_none() { @@ -321,6 +340,7 @@ async fn metric_watcher(config: &Config) { "Component '{}' with attributes {:?} not found in cache. Attempting to refresh.", component.name, component.attributes ); + match update_component_cache( &req_client, &components_url, @@ -330,8 +350,7 @@ async fn metric_watcher(config: &Config) { { Ok(new_cache) => { component_id_cache = new_cache; - component_id = - component_id_cache.get(&cache_key); + component_id = find_id(&component_id_cache); } Err(e) => { tracing::warn!("Failed to refresh component cache, using old one. Error: {}", e); @@ -352,7 +371,7 @@ async fn metric_watcher(config: &Config) { description: "System-wide incident affecting multiple components. Created automatically." .to_string(), impact: last.1, - components: vec![*id], + components: vec![id], start_date: Utc::now(), system: true, incident_type: "incident".to_string(), From eec4a294ab8aeb604684e2c80d2abf74066223ab Mon Sep 17 00:00:00 2001 From: Ilia Bakhterev Date: Tue, 11 Nov 2025 20:35:14 +0100 Subject: [PATCH 5/6] body debugging trace; time shifting -1s for start_date --- src/bin/mock_convertor.rs | 8 ++++++-- src/bin/reporter.rs | 13 +++++++++++-- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/src/bin/mock_convertor.rs b/src/bin/mock_convertor.rs index 63466e9..da91790 100644 --- a/src/bin/mock_convertor.rs +++ b/src/bin/mock_convertor.rs @@ -70,14 +70,18 @@ async fn health_handler( let statuses = state.lock().unwrap(); let status_value = statuses.get(&key).cloned().unwrap_or(0); + let metric_time = Utc::now().timestamp(); let response = ServiceHealthResponse { name: params.service.clone(), service_category: "mock_category".to_string(), environment: params.environment.clone(), - metrics: vec![(Utc::now().timestamp(), status_value)], + metrics: vec![(metric_time, status_value)], }; - println!("Responding with status: {}", status_value); + println!( + "Responding with status: {}, time: {}", + status_value, metric_time + ); (StatusCode::OK, Json(response)) } diff --git a/src/bin/reporter.rs b/src/bin/reporter.rs index 88d561b..cc6b1c0 100644 --- a/src/bin/reporter.rs +++ b/src/bin/reporter.rs @@ -14,7 +14,7 @@ use tokio::signal; use tokio::time::{sleep, Duration}; use anyhow::Result; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, TimeZone, Utc}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -299,6 +299,14 @@ async fn metric_watcher(config: &Config) { // Is metric showing issues? if last.1 > 0 { // 0 means OK + let shifted_date = Utc + .timestamp_opt(last.0 as i64, 0) + .single() + .map(|ts| ts - chrono::Duration::seconds(1)) + .unwrap_or_else(|| { + Utc::now() - chrono::Duration::seconds(1) + }); + tracing::info!("Bad status found: {}", last.1); let component = components_from_config .get(&env.name) @@ -372,10 +380,11 @@ async fn metric_watcher(config: &Config) { .to_string(), impact: last.1, components: vec![id], - start_date: Utc::now(), + start_date: shifted_date, system: true, incident_type: "incident".to_string(), }; + tracing::debug!("IncidentData body: {:?}", body); let res = req_client .post(&incidents_url) .headers(headers.clone()) From 86757bf08ba7706bb3f6c09b700b9a6de444bcc9 Mon Sep 17 00:00:00 2001 From: Ilia Bakhterev Date: Tue, 23 Dec 2025 14:34:11 +0100 Subject: [PATCH 6/6] tracing::info improvement --- src/bin/mock_convertor.rs | 27 ++++++++++++++++++++++----- src/bin/reporter.rs | 32 +++++++++++++++++++++++++++----- src/common.rs | 18 ++++++++++++++++-- src/graphite.rs | 4 ++-- src/types.rs | 12 ++++++++++-- 5 files changed, 77 insertions(+), 16 deletions(-) diff --git a/src/bin/mock_convertor.rs b/src/bin/mock_convertor.rs index da91790..c32e195 100644 --- a/src/bin/mock_convertor.rs +++ b/src/bin/mock_convertor.rs @@ -23,12 +23,20 @@ struct HealthQuery { } /// Structure of the response which waits reporter.rs +#[derive(Serialize, Debug)] +struct ServiceHealthPoint { + ts: u32, + value: u8, + #[serde(default)] + triggered: Vec, +} + #[derive(Serialize, Debug)] struct ServiceHealthResponse { name: String, service_category: String, environment: String, - metrics: Vec<(i64, u8)>, + metrics: Vec, } /// State of the mock server. Mutex provides live changes. @@ -36,10 +44,8 @@ type AppState = Arc>>; #[tokio::main] async fn main() { - // Key "environment:service", value - status. let health_statuses: AppState = Arc::new(Mutex::new(HashMap::new())); - // Initial state // 0 = OK, >0 = Problem health_statuses .lock() @@ -70,12 +76,23 @@ async fn health_handler( let statuses = state.lock().unwrap(); let status_value = statuses.get(&key).cloned().unwrap_or(0); - let metric_time = Utc::now().timestamp(); + let metric_time = Utc::now().timestamp() as u32; + + let triggered = if status_value > 0 { + vec![format!("{}.{}", params.service, "api_down")] + } else { + Vec::new() + }; + let response = ServiceHealthResponse { name: params.service.clone(), service_category: "mock_category".to_string(), environment: params.environment.clone(), - metrics: vec![(metric_time, status_value)], + metrics: vec![ServiceHealthPoint { + ts: metric_time, + value: status_value, + triggered, + }], }; println!( diff --git a/src/bin/reporter.rs b/src/bin/reporter.rs index cc6b1c0..9f9e686 100644 --- a/src/bin/reporter.rs +++ b/src/bin/reporter.rs @@ -16,6 +16,7 @@ use tokio::time::{sleep, Duration}; use anyhow::Result; use chrono::{DateTime, TimeZone, Utc}; use serde::{Deserialize, Serialize}; +use serde_json::json; use std::collections::HashMap; @@ -297,23 +298,44 @@ async fn metric_watcher(config: &Config) { // Peek at last metric in the vector. if let Some(last) = data.metrics.pop() { // Is metric showing issues? - if last.1 > 0 { + if last.value > 0 { // 0 means OK let shifted_date = Utc - .timestamp_opt(last.0 as i64, 0) + .timestamp_opt(last.ts as i64, 0) .single() .map(|ts| ts - chrono::Duration::seconds(1)) .unwrap_or_else(|| { Utc::now() - chrono::Duration::seconds(1) }); - tracing::info!("Bad status found: {}", last.1); let component = components_from_config .get(&env.name) .unwrap() .get(component_def.0) .unwrap(); - tracing::info!("Component to report: {:?}", component); + + // Try to find metric list for this component. + let metric_names = + match config.health_metrics.get(component_def.0) { + Some(h) => h.metrics.clone(), + None => Vec::new(), + }; + + // Сombined JSON log. + let log_obj = json!({ + "timestamp": shifted_date.to_rfc3339(), + "status": last.value, + "service": component_def.0, + "environment": env.name, + "configured_metrics": metric_names, + "triggered_metrics": last.triggered, + "component": { + "name": component.name, + "attributes": component.attributes, + } + }); + + tracing::info!("{}", log_obj.to_string()); // Search for component ID in the cache using name and attributes. let mut search_attrs = component.attributes.clone(); @@ -378,7 +400,7 @@ async fn metric_watcher(config: &Config) { .to_string(), description: "System-wide incident affecting multiple components. Created automatically." .to_string(), - impact: last.1, + impact: last.value, components: vec![id], start_date: shifted_date, system: true, diff --git a/src/common.rs b/src/common.rs index f3d7d1f..6da794f 100644 --- a/src/common.rs +++ b/src/common.rs @@ -1,6 +1,8 @@ //! Common methods //! -use crate::types::{AppState, CloudMonError, CmpType, FlagMetric, ServiceHealthData}; +use crate::types::{ + AppState, CloudMonError, CmpType, FlagMetric, ServiceHealthData, ServiceHealthPoint, +}; use chrono::DateTime; use evalexpr::*; use std::collections::{BTreeMap, HashMap}; @@ -146,7 +148,19 @@ pub async fn get_service_health( } } } - result.push((*ts, expression_res)); + // Determine which metrics were true at this timestamp. + let mut triggered: Vec = Vec::new(); + for (mname, present) in ts_val.iter() { + if *present { + triggered.push(mname.clone()); + } + } + + result.push(ServiceHealthPoint { + ts: *ts, + value: expression_res, + triggered, + }); } tracing::debug!("Summary data: {:?}, length={}", result, result.len()); diff --git a/src/graphite.rs b/src/graphite.rs index da16ab6..f165e88 100644 --- a/src/graphite.rs +++ b/src/graphite.rs @@ -353,7 +353,7 @@ pub async fn handler_render( if target_parts.len() == 3 { let from = from.unwrap(); let to = to.unwrap(); - if let Ok(service_health_data) = get_service_health( + if let Ok(service_health_data) = get_service_health( &state, target_parts[2], target_parts[1], @@ -366,7 +366,7 @@ pub async fn handler_render( return ( StatusCode::OK, Json( - json!([{"target": target_parts[2], "datapoints": service_health_data.iter().map(|x| (Some(x.1 as f32), x.0)).collect::, u32)>>()}]), + json!([{"target": target_parts[2], "datapoints": service_health_data.iter().map(|x| (Some(x.value as f32), x.ts)).collect::, u32)>>()}]), ), ); } diff --git a/src/types.rs b/src/types.rs index f4894cc..04f71ce 100644 --- a/src/types.rs +++ b/src/types.rs @@ -110,8 +110,16 @@ pub struct MetricData { #[serde(rename(serialize = "datapoints"))] pub points: MetricPoints, } -/// List of the service health values (ts, data) -pub type ServiceHealthData = Vec<(u32, u8)>; +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ServiceHealthPoint { + pub ts: u32, + pub value: u8, + #[serde(default)] + pub triggered: Vec, +} + +/// List of the service health datapoints. +pub type ServiceHealthData = Vec; pub enum CloudMonError { ServiceNotSupported,