diff --git a/Cargo.toml b/Cargo.toml index 880dc4a..d2b4b3d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,9 +18,10 @@ path="src/bin/reporter.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +anyhow = { version = "~1.0" } axum = { version="~0.6" } axum-macros = { version="~0.3" } -chrono = "~0.4" +chrono = { version = "~0.4", features = ["serde"] } config = "~0.13" evalexpr = "~9.0" glob = "~0.3" diff --git a/src/bin/mock_convertor.rs b/src/bin/mock_convertor.rs new file mode 100644 index 0000000..c32e195 --- /dev/null +++ b/src/bin/mock_convertor.rs @@ -0,0 +1,110 @@ +use axum::{ + extract::{Query, State}, + http::StatusCode, + response::Json, + routing::get, + Router, +}; +use chrono::Utc; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::{Arc, Mutex}; +use tokio::signal; + +#[derive(Deserialize, Debug)] +struct HealthQuery { + environment: String, + service: String, + #[serde(default)] + _from: String, + #[serde(default)] + _to: String, +} + +/// Structure of the response which waits reporter.rs +#[derive(Serialize, Debug)] +struct ServiceHealthPoint { + ts: u32, + value: u8, + #[serde(default)] + triggered: Vec, +} + +#[derive(Serialize, Debug)] +struct ServiceHealthResponse { + name: String, + service_category: String, + environment: String, + metrics: Vec, +} + +/// State of the mock server. Mutex provides live changes. +type AppState = Arc>>; + +#[tokio::main] +async fn main() { + let health_statuses: AppState = Arc::new(Mutex::new(HashMap::new())); + + // 0 = OK, >0 = Problem + health_statuses + .lock() + .unwrap() + .insert("production_eu-de:test".to_string(), 2); // Imitate a problem (impact = 2) + + let app = Router::new() + .route("/api/v1/health", get(health_handler)) + .with_state(health_statuses); + + let addr = SocketAddr::from(([127, 0, 0, 1], 3005)); + println!("Mock convertor listening on {}", addr); + + axum::Server::bind(&addr) + .serve(app.into_make_service()) + .with_graceful_shutdown(shutdown_signal()) + .await + .unwrap(); +} + +async fn health_handler( + State(state): State, + Query(params): Query, +) -> (StatusCode, Json) { + let key = format!("{}:{}", params.environment, params.service); + println!("Received request for: {}", key); + + let statuses = state.lock().unwrap(); + let status_value = statuses.get(&key).cloned().unwrap_or(0); + + let metric_time = Utc::now().timestamp() as u32; + + let triggered = if status_value > 0 { + vec![format!("{}.{}", params.service, "api_down")] + } else { + Vec::new() + }; + + let response = ServiceHealthResponse { + name: params.service.clone(), + service_category: "mock_category".to_string(), + environment: params.environment.clone(), + metrics: vec![ServiceHealthPoint { + ts: metric_time, + value: status_value, + triggered, + }], + }; + + println!( + "Responding with status: {}, time: {}", + status_value, metric_time + ); + (StatusCode::OK, Json(response)) +} + +async fn shutdown_signal() { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + println!("Signal received, shutting down mock server."); +} diff --git a/src/bin/reporter.rs b/src/bin/reporter.rs index c549d85..9f9e686 100644 --- a/src/bin/reporter.rs +++ b/src/bin/reporter.rs @@ -13,7 +13,10 @@ use reqwest::{ use tokio::signal; use tokio::time::{sleep, Duration}; +use anyhow::Result; +use chrono::{DateTime, TimeZone, Utc}; use serde::{Deserialize, Serialize}; +use serde_json::json; use std::collections::HashMap; @@ -24,7 +27,7 @@ use jwt::SignWithKey; use sha2::Sha256; use std::collections::BTreeMap; -#[derive(Clone, Deserialize, Serialize, Debug)] +#[derive(Clone, Deserialize, Serialize, Debug, PartialEq, Eq, Hash, Ord, PartialOrd)] pub struct ComponentAttribute { pub name: String, pub value: String, @@ -36,16 +39,33 @@ pub struct Component { pub attributes: Vec, } -#[derive(Deserialize, Serialize, Debug)] -pub struct ComponentStatus { +/// Structure for deserializing components from Status Dashboard API v2 (/v2/components). +#[derive(Clone, Deserialize, Serialize, Debug)] +pub struct StatusDashboardComponent { + pub id: u32, pub name: String, - pub impact: u8, + #[serde(default)] pub attributes: Vec, } +/// Structure for serializing incident data for Status Dashboard API v2 (/v2/incidents). +#[derive(Clone, Deserialize, Serialize, Debug)] +pub struct IncidentData { + pub title: String, + #[serde(default)] + pub description: String, + pub impact: u8, + pub components: Vec, + pub start_date: DateTime, + #[serde(default)] + pub system: bool, + #[serde(rename = "type")] + pub incident_type: String, +} + #[tokio::main] async fn main() { - //Enable logging + //Enable logging. tracing_subscriber::registry() .with(tracing_subscriber::EnvFilter::new( std::env::var("RUST_LOG").unwrap_or_else(|_| "info".into()), @@ -55,10 +75,10 @@ async fn main() { tracing::info!("Starting cloudmon-metrics-reporter"); - // Parse config + // Parse config. let config = Config::new("config.yaml").unwrap(); - // Set up CTRL+C handlers + // Set up CTRL+C handlers. let ctrl_c = async { signal::ctrl_c() .await @@ -76,7 +96,7 @@ async fn main() { #[cfg(not(unix))] let terminate = std::future::pending::<()>(); - // Execute metric_watcher unless need to stop + // Execute metric_watcher unless need to stop. tokio::select! { _ = metric_watcher(&config) => {}, _ = ctrl_c => {}, @@ -86,17 +106,106 @@ async fn main() { tracing::info!("Stopped cloudmon-metrics-reporting"); } +/// Fetches components from the Status Dashboard API. +async fn fetch_components( + req_client: &reqwest::Client, + components_url: &str, +) -> Result> { + let response = req_client.get(components_url).send().await?; + response.error_for_status_ref()?; + let components = response.json::>().await?; + Ok(components) +} + +/// Fetches components, builds, and returns a new component ID cache. +/// +/// # Arguments: +/// * `req_client` - The reqwest client; +/// * `components_url` - The URL to fetch components from; +/// * `with_retry` - If true, it will retry fetching up to 3 times on failure. +async fn update_component_cache( + req_client: &reqwest::Client, + components_url: &str, + with_retry: bool, +) -> Result), u32>> { + tracing::info!("Updating component cache..."); + + let fetch_future = if with_retry { + fetch_components_with_retry(req_client, components_url).await + } else { + fetch_components(req_client, components_url).await.ok() + }; + + match fetch_future { + Some(components) if !components.is_empty() => { + let cache = build_component_id_cache(components); + tracing::info!( + "Successfully updated component cache. New size: {}", + cache.len() + ); + Ok(cache) + } + Some(_) => { + // Components list is empty + anyhow::bail!("Component list from status-dashboard is empty.") + } + None => anyhow::bail!("Failed to fetch component list from status-dashboard."), + } +} +/// Fetches components with a retry mechanism. +async fn fetch_components_with_retry( + req_client: &reqwest::Client, + components_url: &str, +) -> Option> { + let mut attempts = 0; + loop { + match fetch_components(req_client, components_url).await { + Ok(components) => { + tracing::info!("Successfully fetched {} components.", components.len()); + return Some(components); + } + Err(e) => { + attempts += 1; + tracing::error!("Failed to fetch components (attempt {}/3): {}", attempts, e); + if attempts >= 3 { + tracing::error!("Could not fetch components after 3 attempts. Giving up."); + return None; + } + tracing::info!("Retrying in 60 seconds..."); + sleep(Duration::from_secs(60)).await; + } + } + } +} + +/// Builds a cache mapping (ComponentName, Attributes) -> ComponentID. +fn build_component_id_cache( + components: Vec, +) -> HashMap<(String, Vec), u32> { + components + .into_iter() + .map(|c| { + let mut attrs = c.attributes; + attrs.sort(); + ((c.name, attrs), c.id) + }) + .collect() +} + async fn metric_watcher(config: &Config) { tracing::info!("Starting metric reporter thread"); - // Init reqwest client + // Init reqwest client. let req_client: reqwest::Client = ClientBuilder::new() - .timeout(Duration::from_secs(2 as u64)) + .timeout(Duration::from_secs(10 as u64)) .build() .unwrap(); - // Endless loop - let mut components: HashMap> = HashMap::new(); + + // This is the logic to build a component lookup table from config. + let mut components_from_config: HashMap> = HashMap::new(); for env in config.environments.iter() { - let comp_env_entry = components.entry(env.name.clone()).or_insert(HashMap::new()); + let comp_env_entry = components_from_config + .entry(env.name.clone()) + .or_insert(HashMap::new()); let mut env_attrs: Vec = Vec::new(); if let Some(ref attrs) = env.attributes { for (key, val) in attrs.iter() { @@ -107,11 +216,11 @@ async fn metric_watcher(config: &Config) { } } - for component in config.health_metrics.iter() { - match component.1.component_name { + for component_def in config.health_metrics.iter() { + match component_def.1.component_name { Some(ref name) => { comp_env_entry.insert( - component.0.clone(), + component_def.0.clone(), Component { name: name.clone(), attributes: env_attrs.clone(), @@ -119,16 +228,33 @@ async fn metric_watcher(config: &Config) { ); } None => { - tracing::warn!("No component_name is given for {}", component.1.service); + tracing::warn!("No component_name is given for {}", component_def.1.service); } } } } + let sdb_config = config .status_dashboard .as_ref() .expect("Status dashboard section is missing"); - let status_report_url = format!("{}/v1/component_status", sdb_config.url.clone(),); + + // Fetch components from Status Dashboard and build a cache to resolve component name to ID. + let components_url = format!("{}/v2/components", sdb_config.url.clone()); + let mut component_id_cache = + match update_component_cache(&req_client, &components_url, true).await { + Ok(cache) => cache, + Err(e) => { + tracing::error!( + "Initial component cache load failed: {}. Reporter cannot proceed.", + e + ); + return; + } + }; + + // Prepare for incident reporting + let incidents_url = format!("{}/v2/incidents", sdb_config.url.clone()); let mut headers = HeaderMap::new(); if let Some(ref secret) = sdb_config.secret { let key: Hmac = Hmac::new_from_slice(secret.as_bytes()).unwrap(); @@ -139,12 +265,12 @@ async fn metric_watcher(config: &Config) { headers.insert(AUTHORIZATION, bearer.parse().unwrap()); } loop { - // For every env from config + // For every env from config. for env in config.environments.iter() { tracing::trace!("env {:?}", env); - // For every component (health_metric service) - for component in config.health_metrics.iter() { - tracing::trace!("Component {:?}", component.0); + // For every component (health_metric service). + for component_def in config.health_metrics.iter() { + tracing::trace!("Component {:?}", component_def.0); // Query metric-convertor for the status match req_client .get(format!( @@ -154,7 +280,7 @@ async fn metric_watcher(config: &Config) { // Query env/service for time [-2min..-1min] .query(&[ ("environment", env.name.clone()), - ("service", component.0.clone()), + ("service", component_def.0.clone()), ("from", "-5min".to_string()), ("to", "-2min".to_string()), ]) @@ -165,49 +291,156 @@ async fn metric_watcher(config: &Config) { if rsp.status().is_client_error() { tracing::error!("Got API error {:?}", rsp.text().await); } else { - // Try to parse response + // Try to parse response. match rsp.json::().await { Ok(mut data) => { tracing::debug!("response {:?}", data); - // Peek at last metric in the vector + // Peek at last metric in the vector. if let Some(last) = data.metrics.pop() { // Is metric showing issues? - if last.1 > 0 { - tracing::info!("Bad status found: {}", last.1); - let component = components + if last.value > 0 { + // 0 means OK + let shifted_date = Utc + .timestamp_opt(last.ts as i64, 0) + .single() + .map(|ts| ts - chrono::Duration::seconds(1)) + .unwrap_or_else(|| { + Utc::now() - chrono::Duration::seconds(1) + }); + + let component = components_from_config .get(&env.name) .unwrap() - .get(component.0) + .get(component_def.0) .unwrap(); - tracing::info!("Component to report: {:?}", component); - let body = ComponentStatus { - name: component.name.clone(), - impact: last.1, - attributes: component.attributes.clone(), - }; - let res = req_client - .post(&status_report_url) - .headers(headers.clone()) - .json(&body) - .send() - .await; - match res { - Ok(rsp) => { - if rsp.status().is_client_error() { - tracing::error!( - "Error: [{}] {:?}", - rsp.status(), - rsp.text().await - ); + + // Try to find metric list for this component. + let metric_names = + match config.health_metrics.get(component_def.0) { + Some(h) => h.metrics.clone(), + None => Vec::new(), + }; + + // Сombined JSON log. + let log_obj = json!({ + "timestamp": shifted_date.to_rfc3339(), + "status": last.value, + "service": component_def.0, + "environment": env.name, + "configured_metrics": metric_names, + "triggered_metrics": last.triggered, + "component": { + "name": component.name, + "attributes": component.attributes, + } + }); + + tracing::info!("{}", log_obj.to_string()); + + // Search for component ID in the cache using name and attributes. + let mut search_attrs = component.attributes.clone(); + search_attrs.sort(); + let mut required_attrs = component.attributes.clone(); + required_attrs.sort(); + + let find_id = + |cache: &HashMap< + (String, Vec), + u32, + >| { + cache + .iter() + .find(|((name, attrs), _id)| { + if name != &component.name { + return false; + } + required_attrs + .iter() + .all(|r| attrs.contains(r)) + }) + .map(|(_, id)| *id) + }; + + // First attemption to find Component + let mut component_id = find_id(&component_id_cache); + + // If component not found, refresh cache and try again. + if component_id.is_none() { + tracing::info!( + "Component '{}' with attributes {:?} not found in cache. Attempting to refresh.", + component.name, component.attributes + ); + + match update_component_cache( + &req_client, + &components_url, + false, + ) + .await + { + Ok(new_cache) => { + component_id_cache = new_cache; + component_id = find_id(&component_id_cache); + } + Err(e) => { + tracing::warn!("Failed to refresh component cache, using old one. Error: {}", e); } } + } + + if let Some(id) = component_id { + tracing::info!( + "Found component ID {} in cache.", + id + ); - Err(e) => { - tracing::error!( - "Error during posting component status: {}", - e - ); + // Build IncidentData body for API v2 + let body = IncidentData { + title: "System incident from monitoring system" + .to_string(), + description: "System-wide incident affecting multiple components. Created automatically." + .to_string(), + impact: last.value, + components: vec![id], + start_date: shifted_date, + system: true, + incident_type: "incident".to_string(), + }; + tracing::debug!("IncidentData body: {:?}", body); + let res = req_client + .post(&incidents_url) + .headers(headers.clone()) + .json(&body) + .send() + .await; + match res { + Ok(rsp) => { + if !rsp.status().is_success() { + tracing::error!( + "Error reporting incident: [{}] {:?}", + rsp.status(), + rsp.text().await + ); + } else { + tracing::info!( + "Successfully reported incident for component '{}' with attributes {:?}.", + component.name, + component.attributes + ); + } + } + Err(e) => { + tracing::error!( + "Error during sending post request for incident: {}", + e + ); + } } + } else { + tracing::error!( + "Component with name '{}' and attributes {:?} still not found in status-dashboard cache after refresh.", + component.name, component.attributes + ); } } } diff --git a/src/common.rs b/src/common.rs index f3d7d1f..6da794f 100644 --- a/src/common.rs +++ b/src/common.rs @@ -1,6 +1,8 @@ //! Common methods //! -use crate::types::{AppState, CloudMonError, CmpType, FlagMetric, ServiceHealthData}; +use crate::types::{ + AppState, CloudMonError, CmpType, FlagMetric, ServiceHealthData, ServiceHealthPoint, +}; use chrono::DateTime; use evalexpr::*; use std::collections::{BTreeMap, HashMap}; @@ -146,7 +148,19 @@ pub async fn get_service_health( } } } - result.push((*ts, expression_res)); + // Determine which metrics were true at this timestamp. + let mut triggered: Vec = Vec::new(); + for (mname, present) in ts_val.iter() { + if *present { + triggered.push(mname.clone()); + } + } + + result.push(ServiceHealthPoint { + ts: *ts, + value: expression_res, + triggered, + }); } tracing::debug!("Summary data: {:?}, length={}", result, result.len()); diff --git a/src/graphite.rs b/src/graphite.rs index da16ab6..f165e88 100644 --- a/src/graphite.rs +++ b/src/graphite.rs @@ -353,7 +353,7 @@ pub async fn handler_render( if target_parts.len() == 3 { let from = from.unwrap(); let to = to.unwrap(); - if let Ok(service_health_data) = get_service_health( + if let Ok(service_health_data) = get_service_health( &state, target_parts[2], target_parts[1], @@ -366,7 +366,7 @@ pub async fn handler_render( return ( StatusCode::OK, Json( - json!([{"target": target_parts[2], "datapoints": service_health_data.iter().map(|x| (Some(x.1 as f32), x.0)).collect::, u32)>>()}]), + json!([{"target": target_parts[2], "datapoints": service_health_data.iter().map(|x| (Some(x.value as f32), x.ts)).collect::, u32)>>()}]), ), ); } diff --git a/src/types.rs b/src/types.rs index f4894cc..04f71ce 100644 --- a/src/types.rs +++ b/src/types.rs @@ -110,8 +110,16 @@ pub struct MetricData { #[serde(rename(serialize = "datapoints"))] pub points: MetricPoints, } -/// List of the service health values (ts, data) -pub type ServiceHealthData = Vec<(u32, u8)>; +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ServiceHealthPoint { + pub ts: u32, + pub value: u8, + #[serde(default)] + pub triggered: Vec, +} + +/// List of the service health datapoints. +pub type ServiceHealthData = Vec; pub enum CloudMonError { ServiceNotSupported,