From efafa74566c0fcb2caf72969c8fc9b9bbee92c03 Mon Sep 17 00:00:00 2001 From: Parker Timmerman Date: Tue, 31 Jan 2023 14:33:09 -0500 Subject: [PATCH 1/4] branch start, add e2e test for opentelemetry --- Cargo.lock | 2 ++ src/environmentd/Cargo.toml | 2 ++ src/environmentd/tests/sql.rs | 34 ++++++++++++++++++++++++++++ src/environmentd/tests/util.rs | 41 ++++++++++++++++++++++++++++++++-- 4 files changed, 77 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a73503d5999ae..339e1d3e64793 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3741,6 +3741,7 @@ dependencies = [ "reqwest", "rlimit", "sentry", + "sentry-tracing", "serde", "serde_json", "shell-words", @@ -3755,6 +3756,7 @@ dependencies = [ "tokio-stream", "tower-http", "tracing", + "tracing-core", "tracing-opentelemetry", "tracing-subscriber", "tungstenite", diff --git a/src/environmentd/Cargo.toml b/src/environmentd/Cargo.toml index da2a2920ee804..b8211da6ed88a 100644 --- a/src/environmentd/Cargo.toml +++ b/src/environmentd/Cargo.toml @@ -66,6 +66,7 @@ rand = "0.8.5" reqwest = { version = "0.11.13", features = ["json"] } rlimit = "0.8.3" sentry = { version = "0.29.1", optional = true } +sentry-tracing = "0.29.1" serde = { version = "1.0.152", features = ["derive"] } serde_json = "1.0.89" shell-words = "1.1.0" @@ -79,6 +80,7 @@ tokio-postgres = { git = "https://github.com/MaterializeInc/rust-postgres" } tokio-stream = { version = "0.1.11", features = ["net"] } tower-http = { version = "0.3.5", features = ["cors"] } tracing = "0.1.37" +tracing-core = "0.1.30" tracing-opentelemetry = { git = "https://github.com/MaterializeInc/tracing.git", branch = "v0.1.x" } tracing-subscriber = "0.3.16" tungstenite = { version = "0.18.0", features = ["native-tls"] } diff --git a/src/environmentd/tests/sql.rs b/src/environmentd/tests/sql.rs index 983aff7c91164..a0aa1febb7d91 100644 --- a/src/environmentd/tests/sql.rs +++ b/src/environmentd/tests/sql.rs @@ -2274,3 +2274,37 @@ fn test_isolation_level_notice() { }) .unwrap(); } + +#[test] +fn test_emit_tracing_notice() { + let config = util::Config::default().with_enable_tracing(true); + let server = util::start_server(config).unwrap(); + + let (tx, mut rx) = futures::channel::mpsc::unbounded(); + + let mut client = server + .pg_config() + .notice_callback(move |notice| { + tx.unbounded_send(notice).unwrap(); + }) + .connect(postgres::NoTls) + .unwrap(); + + client + .execute("SET emit_trace_id_notice = true", &[]) + .unwrap(); + let _row = client.query_one("SELECT 1;", &[]).unwrap(); + + let tracing_re = Regex::new("trace id: (.*)").unwrap(); + match rx.try_next() { + Ok(Some(msg)) => { + // assert the NOTICE we recieved contained a trace_id + let captures = tracing_re.captures(msg.message()).expect("no matches?"); + let trace_id = captures.get(1).expect("trace_id not captured?").as_str(); + + assert!(trace_id.is_ascii()); + assert_eq!(trace_id.len(), 32); + } + x => panic!("failed to read message from channel, {:?}", x), + } +} diff --git a/src/environmentd/tests/util.rs b/src/environmentd/tests/util.rs index ba47e1f3a8207..ba92b7b8a3ba6 100644 --- a/src/environmentd/tests/util.rs +++ b/src/environmentd/tests/util.rs @@ -103,7 +103,7 @@ use mz_ore::metrics::MetricsRegistry; use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME}; use mz_ore::retry::Retry; use mz_ore::task; -use mz_ore::tracing::TracingHandle; +use mz_ore::tracing::{TracingHandle, TracingConfig, StderrLogConfig, StderrLogFormat, OpenTelemetryConfig, TracingGuard}; use mz_persist_client::cache::PersistClientCache; use mz_persist_client::cfg::PersistConfig; use mz_persist_client::PersistLocation; @@ -111,6 +111,7 @@ use mz_secrets::SecretsController; use mz_sql::catalog::EnvironmentId; use mz_stash::StashFactory; use mz_storage_client::types::connections::ConnectionContext; +use tracing_subscriber::filter::Targets; pub static KAFKA_ADDRS: Lazy = Lazy::new(|| env::var("KAFKA_ADDRS").unwrap_or_else(|_| "localhost:9092".into())); @@ -128,6 +129,7 @@ pub struct Config { default_cluster_replica_size: String, builtin_cluster_replica_size: String, propagate_crashes: bool, + enable_tracing: bool, } impl Default for Config { @@ -144,6 +146,7 @@ impl Default for Config { default_cluster_replica_size: "1".to_string(), builtin_cluster_replica_size: "1".to_string(), propagate_crashes: false, + enable_tracing: false, } } } @@ -216,6 +219,11 @@ impl Config { self.propagate_crashes = propagate_crashes; self } + + pub fn with_enable_tracing(mut self, enable_tracing: bool) -> Self { + self.enable_tracing = enable_tracing; + self + } } pub fn start_server(config: Config) -> Result { @@ -277,6 +285,33 @@ pub fn start_server(config: Config) -> Result { let postgres_factory = StashFactory::new(&metrics_registry); let secrets_controller = Arc::clone(&orchestrator); let connection_context = ConnectionContext::for_tests(orchestrator.reader()); + let (tracing_handle, tracing_guard) = if config.enable_tracing { + let config = TracingConfig:: sentry_tracing::EventFilter> { + service_name: "environmentd", + stderr_log: StderrLogConfig { + format: StderrLogFormat::Json, + filter: Targets::default(), + }, + opentelemetry: Some(OpenTelemetryConfig { + endpoint: "http://fake_address_for_testing:8080".to_string(), + headers: http::HeaderMap::new(), + filter: Targets::default().with_default(tracing_core::Level::DEBUG), + resource: opentelemetry::sdk::resource::Resource::default(), + start_enabled: true, + }), + #[cfg(feature = "tokio-console")] + tokio_console: None, + sentry: None, + build_version: &mz_environmentd::BUILD_INFO.version, + build_sha: &mz_environmentd::BUILD_INFO.sha, + build_time: &mz_environmentd::BUILD_INFO.time, + }; + let (tracing_handle, tracing_guard) = runtime.block_on(mz_ore::tracing::configure(config))?; + (tracing_handle, Some(tracing_guard)) + } else { + (TracingHandle::disabled(), None) + }; + let inner = runtime.block_on(mz_environmentd::serve(mz_environmentd::Config { adapter_stash_url, controller: ControllerConfig { @@ -315,7 +350,7 @@ pub fn start_server(config: Config) -> Result { bootstrap_system_parameters: Default::default(), availability_zones: Default::default(), connection_context, - tracing_handle: TracingHandle::disabled(), + tracing_handle, storage_usage_collection_interval: config.storage_usage_collection_interval, segment_api_key: None, egress_ips: vec![], @@ -330,6 +365,7 @@ pub fn start_server(config: Config) -> Result { runtime, metrics_registry, _temp_dir: temp_dir, + _tracing_guard: tracing_guard, }; Ok(server) } @@ -339,6 +375,7 @@ pub struct Server { pub runtime: Arc, _temp_dir: Option, pub metrics_registry: MetricsRegistry, + pub _tracing_guard: Option, } impl Server { From 5f3e5c33fdefa91082912ef387ac739fd780902a Mon Sep 17 00:00:00 2001 From: Parker Timmerman Date: Tue, 31 Jan 2023 14:48:52 -0500 Subject: [PATCH 2/4] mark the TracingGuard as must_use --- src/ore/src/tracing.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/ore/src/tracing.rs b/src/ore/src/tracing.rs index d78afaa5d5db1..55e392eb119c3 100644 --- a/src/ore/src/tracing.rs +++ b/src/ore/src/tracing.rs @@ -197,6 +197,7 @@ impl std::fmt::Debug for TracingHandle { /// A guard for the tracing infrastructure configured with [`configure`]. /// /// This guard should be kept alive for the lifetime of the program. +#[must_use = "Must hold for the lifetime of the program, otherwise tracing will be shutdown"] pub struct TracingGuard { _sentry_guard: Option, } From 621bea85eeb16d44621159467743659a79b60c40 Mon Sep 17 00:00:00 2001 From: Parker Timmerman Date: Wed, 1 Feb 2023 10:33:09 -0500 Subject: [PATCH 3/4] fix clippy --- src/environmentd/tests/util.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/environmentd/tests/util.rs b/src/environmentd/tests/util.rs index ba92b7b8a3ba6..53fb86d0c5752 100644 --- a/src/environmentd/tests/util.rs +++ b/src/environmentd/tests/util.rs @@ -302,9 +302,9 @@ pub fn start_server(config: Config) -> Result { #[cfg(feature = "tokio-console")] tokio_console: None, sentry: None, - build_version: &mz_environmentd::BUILD_INFO.version, - build_sha: &mz_environmentd::BUILD_INFO.sha, - build_time: &mz_environmentd::BUILD_INFO.time, + build_version: mz_environmentd::BUILD_INFO.version, + build_sha: mz_environmentd::BUILD_INFO.sha, + build_time: mz_environmentd::BUILD_INFO.time, }; let (tracing_handle, tracing_guard) = runtime.block_on(mz_ore::tracing::configure(config))?; (tracing_handle, Some(tracing_guard)) From 5f8d691216f5987a794b521a929a59f7bf8d606b Mon Sep 17 00:00:00 2001 From: Parker Timmerman Date: Wed, 1 Feb 2023 12:12:46 -0500 Subject: [PATCH 4/4] cargo fmt --- src/environmentd/tests/util.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/environmentd/tests/util.rs b/src/environmentd/tests/util.rs index 53fb86d0c5752..93de3c9b8a94d 100644 --- a/src/environmentd/tests/util.rs +++ b/src/environmentd/tests/util.rs @@ -103,7 +103,10 @@ use mz_ore::metrics::MetricsRegistry; use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME}; use mz_ore::retry::Retry; use mz_ore::task; -use mz_ore::tracing::{TracingHandle, TracingConfig, StderrLogConfig, StderrLogFormat, OpenTelemetryConfig, TracingGuard}; +use mz_ore::tracing::{ + OpenTelemetryConfig, StderrLogConfig, StderrLogFormat, TracingConfig, TracingGuard, + TracingHandle, +}; use mz_persist_client::cache::PersistClientCache; use mz_persist_client::cfg::PersistConfig; use mz_persist_client::PersistLocation; @@ -306,7 +309,8 @@ pub fn start_server(config: Config) -> Result { build_sha: mz_environmentd::BUILD_INFO.sha, build_time: mz_environmentd::BUILD_INFO.time, }; - let (tracing_handle, tracing_guard) = runtime.block_on(mz_ore::tracing::configure(config))?; + let (tracing_handle, tracing_guard) = + runtime.block_on(mz_ore::tracing::configure(config))?; (tracing_handle, Some(tracing_guard)) } else { (TracingHandle::disabled(), None)