From 72a1b5eb29d02a40cd118c27a528d341d3262a5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Tich=C3=A1k?= Date: Fri, 31 Oct 2025 09:53:48 +0100 Subject: [PATCH] [monitoring] Refactor monitoring so Send doesn't have to wait until library is running --- common/monitoring/monitoring.go | 77 +++++++++++++++------------- common/monitoring/monitoring_test.go | 18 ++----- core/core.go | 5 ++ 3 files changed, 51 insertions(+), 49 deletions(-) diff --git a/common/monitoring/monitoring.go b/common/monitoring/monitoring.go index 91dbd417..f64339bd 100644 --- a/common/monitoring/monitoring.go +++ b/common/monitoring/monitoring.go @@ -42,50 +42,38 @@ var ( // atomic holder for the HTTP server instance server atomic.Pointer[http.Server] // objects to store incoming metrics - metricsInternal *MetricsAggregate - metricsHistogramInternal *MetricsReservoirSampling + metricsInternal *MetricsAggregate = NewMetricsAggregate() + metricsHistogramInternal *MetricsReservoirSampling = NewMetricsReservoirSampling() // channel that is used to request end of metrics server, it sends notification when server ended. // It needs to be read!!! - endChannel chan struct{} + endChannel chan struct{} = make(chan struct{}) // channel used to send metrics into the event loop - metricsChannel chan Metric + // 100000 was chosen arbitrarily as a number that seemed sensible to be high enough to provide nice buffer if + // multiple goroutines want to send metrics without blocking each other + metricsChannel chan Metric = make(chan Metric, 100000) // channel used to send metrics meant to be proceesed as histogram into the event loop - metricsHistosChannel chan Metric + // 100000 was chosen arbitrarily as a number that seemed sensible to be high enough to provide nice buffer if + // multiple goroutines want to send metrics without blocking each other + metricsHistosChannel chan Metric = make(chan Metric, 100000) // channel for sending requests to reset actual metrics slice and send it back to caller via metricsExportedToRequest - metricsRequestedChannel chan struct{} + metricsRequestedChannel chan struct{} = make(chan struct{}) // channel used to send metrics to be reported by http request from event loop - metricsExportedToRequest chan []Metric + metricsExportedToRequest chan []Metric = make(chan []Metric) - log = logger.New(logrus.StandardLogger(), "metrics").WithField("level", infologger.IL_Devel) -) - -func initChannels() { - endChannel = make(chan struct{}) - metricsRequestedChannel = make(chan struct{}) - // 100 was chosen arbitrarily as a number that seemed sensible to be high enough to provide nice buffer if - // multiple goroutines want to send metrics without blocking each other - metricsChannel = make(chan Metric, 100000) - metricsHistosChannel = make(chan Metric, 100000) - metricsExportedToRequest = make(chan []Metric) - metricsInternal = NewMetricsAggregate() - metricsHistogramInternal = NewMetricsReservoirSampling() -} + // WaitUntilRunning is waiting until this channel is closed + waitUntilRunningChannel chan struct{} = make(chan struct{}) -func closeChannels() { - close(endChannel) - close(metricsRequestedChannel) - close(metricsChannel) - close(metricsExportedToRequest) -} + log = logger.New(logrus.StandardLogger(), "metrics").WithField(infologger.Level, infologger.IL_Devel) +) // this eventLoop is the main part that processes all metrics send to the package // 4 events can happen: -// 1. metricsChannel receives message from Send() method. We just add the new metric to metrics slice -// 2. metricsHistosChannel receives message from Send() method. We just add the new metric to metrics slice +// 1. metricsChannel receives message from Send() method. We add the new metric to metrics slice +// 2. metricsHistosChannel receives message from Send() method. We add the new metric to metrics slice // 3. metricsRequestChannel receives request to dump and request existing metrics. We send shallow copy of existing // metrics to requestor (via metricsExportedToRequest channel) while resetting current metrics slice // 4. receive request to stop monitoring via endChannel. We send confirmation through endChannel to notify caller @@ -130,14 +118,18 @@ func exportMetricsAndReset(w http.ResponseWriter, r *http.Request) { } func Send(metric *Metric) { - if IsRunning() { - metricsChannel <- *metric + // drop overflowing messages to not slowdown processing, we don't log so we don't flood IL + select { + case metricsChannel <- *metric: + default: } } func SendHistogrammable(metric *Metric) { - if IsRunning() { - metricsHistosChannel <- *metric + // drop overflowing messages to not slowdown processing, we don't log so we don't flood IL + select { + case metricsHistosChannel <- *metric: + default: } } @@ -160,10 +152,10 @@ func Run(port uint16, endpointName string) error { if !server.CompareAndSwap(nil, localServer) { return nil } - initChannels() go eventLoop() handleFunc(endpointName) // block until Shutdown is called + close(waitUntilRunningChannel) return localServer.ListenAndServe() } @@ -176,9 +168,22 @@ func Stop() { defer cancel() localServer.Shutdown(ctx) endChannel <- struct{}{} + _, ok := <-waitUntilRunningChannel + if !ok { + waitUntilRunningChannel = make(chan struct{}) + } <-endChannel } -func IsRunning() bool { - return server.Load() != nil +// If monitoring is not running it will wait until monitoring is running or +// timeout is triggered. +// \return true if monitoring is running, false if timeout occured +func WaitUntilRunning(timeout time.Duration) bool { + timeoutChan := time.After(timeout) + select { + case <-waitUntilRunningChannel: + return true + case <-timeoutChan: + return false + } } diff --git a/common/monitoring/monitoring_test.go b/common/monitoring/monitoring_test.go index b0da3843..dd98cf6b 100644 --- a/common/monitoring/monitoring_test.go +++ b/common/monitoring/monitoring_test.go @@ -40,16 +40,8 @@ import ( // blocks until either IsRunning() returns true or timeout is triggered func isRunningWithTimeout(t *testing.T, timeout time.Duration) { - timeoutChan := time.After(timeout) - for !IsRunning() { - select { - case <-timeoutChan: - t.Errorf("Monitoring is not running even after %v", timeout) - return - - default: - time.Sleep(10 * time.Millisecond) - } + if !WaitUntilRunning(timeout) { + t.Errorf("Failed to init monitoring library in %v", timeout) } } @@ -126,7 +118,7 @@ func TestHttpRun(t *testing.T) { go Run(9876, "/metrics") defer Stop() - isRunningWithTimeout(t, time.Second) + isRunningWithTimeout(t, 5*time.Second) metric := Metric{name: "test"} metric.timestamp = time.Unix(10, 0) @@ -140,12 +132,12 @@ func TestHttpRun(t *testing.T) { } message, err := io.ReadAll(response.Body) if err != nil { - t.Errorf("Failed to read response Body: %v", err) + t.Fatalf("Failed to read response Body: %v", err) } receivedMetrics, err := parseMultipleLineProtocol(string(message)) if err != nil { - t.Errorf("Failed to parse message: %v", string(message)) + t.Fatalf("Failed to parse message: %v with err: %v", string(message), err) } receivedMetric := receivedMetrics[0] diff --git a/core/core.go b/core/core.go index 6919cb45..f2370f11 100644 --- a/core/core.go +++ b/core/core.go @@ -90,6 +90,11 @@ func runMetrics() { } }() + monitoringTimeout := 30 * time.Second + if !monitoring.WaitUntilRunning(monitoringTimeout) { + log.WithField(infologger.Level, infologger.IL_Devel).Warnf("Failed to initialize monitoring framework in %v, it might catch up later. For now we are starting without metrics", monitoringTimeout) + } + golangmetrics.Start(10 * time.Second) }