diff --git a/common/ecsmetrics/metrics.go b/common/ecsmetrics/metrics.go index f9d5f018..066fdf0d 100644 --- a/common/ecsmetrics/metrics.go +++ b/common/ecsmetrics/metrics.go @@ -5,6 +5,7 @@ import ( "time" "github.com/AliceO2Group/Control/common/logger" + "github.com/AliceO2Group/Control/common/logger/infologger" "github.com/AliceO2Group/Control/common/monitoring" "github.com/sirupsen/logrus" ) @@ -30,7 +31,6 @@ func gather() monitoring.Metric { {Name: "/memory/classes/heap/unused:bytes"}, } - // Collect metrics data internalmetrics.Read(samples) metric := NewMetric("golangruntimemetrics") @@ -42,10 +42,10 @@ func gather() monitoring.Metric { case internalmetrics.KindFloat64: metric.AddValue(sample.Name, sample.Value.Float64()) case internalmetrics.KindFloat64Histogram: - log.Warning("Error: Histogram is not supported yet for metric [%s]", sample.Name) + log.WithField("level", infologger.IL_Devel).Warningf("Error: Histogram is not supported yet for metric [%s]", sample.Name) continue default: - log.Warning("Unsupported kind %v for metric %s\n", sample.Value.Kind(), sample.Name) + log.WithField("level", infologger.IL_Devel).Warningf("Unsupported kind %v for metric %s\n", sample.Value.Kind(), sample.Name) continue } } @@ -53,13 +53,17 @@ func gather() monitoring.Metric { } func StartGolangMetrics(period time.Duration) { + log.WithField("level", infologger.IL_Devel).Info("Starting golang metrics reporting") go func() { + log.Debug("Starting golang metrics goroutine") for { select { case <-endRequestChannel: + log.Debug("ending golang metrics") endRequestChannel <- struct{}{} return default: + log.Debug("sending golang metrics") monitoring.Send(gather()) time.Sleep(period) } diff --git a/common/monitoring/monitoring.go b/common/monitoring/monitoring.go index a6eae11b..be788f7d 100644 --- a/common/monitoring/monitoring.go +++ b/common/monitoring/monitoring.go @@ -22,47 +22,58 @@ var ( // channel used to send metrics into the event loop metricsChannel chan Metric - // channel for sending notifications to event loop that new http Request to report metrics arrived - metricsRequestChannel chan struct{} + // channel for sending requests to reset actual metrics slice and send it back to caller via metricsExportedToRequest + metricsRequestedChannel chan struct{} // channel used to send metrics to be reported by http request from event loop - metricsToRequest chan []Metric + metricsExportedToRequest chan []Metric - Log = logger.New(logrus.StandardLogger(), "metrics") + log = logger.New(logrus.StandardLogger(), "metrics") ) func initChannels(messageBufferSize int) { endChannel = make(chan struct{}) - metricsRequestChannel = make(chan struct{}) + metricsRequestedChannel = make(chan struct{}) + // 100 was chosen arbitrarily as a number that seemed sensible to be high enough to provide nice buffer if + // multiple goroutines want to send metrics without blocking each other metricsChannel = make(chan Metric, 100) - metricsToRequest = make(chan []Metric) + metricsExportedToRequest = make(chan []Metric) metricsLimit = messageBufferSize } func closeChannels() { close(endChannel) - close(metricsRequestChannel) + close(metricsRequestedChannel) close(metricsChannel) - close(metricsToRequest) + close(metricsExportedToRequest) } +// this eventLoop is the main part that processes all metrics send to the package +// 3 events can happen: +// 1. metricsChannel receives message from Send() method. We just add the new metric to metrics slice +// 2. metricsRequestChannel receives request to dump and request existing metrics. We send shallow copy of existing +// metrics to requestor (via metricsExportedToRequest channel) while resetting current metrics slice +// 3. receive request to stop monitoring via endChannel. We send confirmation through endChannel to notify caller +// that eventLoop stopped func eventLoop() { for { select { - case <-metricsRequestChannel: + case <-metricsRequestedChannel: shallowCopyMetrics := metrics metrics = make([]Metric, 0) - metricsToRequest <- shallowCopyMetrics + metricsExportedToRequest <- shallowCopyMetrics case metric := <-metricsChannel: if len(metrics) < metricsLimit { metrics = append(metrics, metric) } else { - Log.Warn("too many metrics waiting to be scraped. Are you sure that metrics scraping is running?") + log.Warn("too many metrics waiting to be scraped. Are you sure that metrics scraping is running?") } case <-endChannel: - endChannel <- struct{}{} + defer func() { + endChannel <- struct{}{} + }() return } } @@ -70,8 +81,8 @@ func eventLoop() { func exportMetricsAndReset(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") - metricsRequestChannel <- struct{}{} - metricsToConvert := <-metricsToRequest + metricsRequestedChannel <- struct{}{} + metricsToConvert := <-metricsExportedToRequest if metricsToConvert == nil { metricsToConvert = make([]Metric, 0) } @@ -79,7 +90,9 @@ func exportMetricsAndReset(w http.ResponseWriter, r *http.Request) { } func Send(metric Metric) { - metricsChannel <- metric + if IsRunning() { + metricsChannel <- metric + } } func handleFunc(endpointName string) { @@ -96,8 +109,8 @@ func handleFunc(endpointName string) { // \param messageBufferSize size of buffer for messages where messages are kept between scraping request. // // If we attempt send more messages than the size of the buffer, these overflowing messages will be ignored and warning will be logged. -func Start(port uint16, endpointName string, messageBufferSize int) error { - if server != nil { +func Run(port uint16, endpointName string, messageBufferSize int) error { + if IsRunning() { return nil } @@ -105,13 +118,13 @@ func Start(port uint16, endpointName string, messageBufferSize int) error { go eventLoop() - server := &http.Server{Addr: fmt.Sprintf(":%d", port)} + server = &http.Server{Addr: fmt.Sprintf(":%d", port)} handleFunc(endpointName) return server.ListenAndServe() } func Stop() { - if server == nil { + if !IsRunning() { return } @@ -122,4 +135,9 @@ func Stop() { endChannel <- struct{}{} <-endChannel server = nil + metrics = nil +} + +func IsRunning() bool { + return server != nil } diff --git a/common/monitoring/monitoring_test.go b/common/monitoring/monitoring_test.go index 4d3a3d95..7b707c3e 100644 --- a/common/monitoring/monitoring_test.go +++ b/common/monitoring/monitoring_test.go @@ -8,36 +8,61 @@ import ( "time" ) +// blocks until either IsRunning() returns true or timeout is triggered +func isRunningWithTimeout(t *testing.T, timeout time.Duration) { + timeoutChan := time.After(timeout) + for !IsRunning() { + select { + case <-timeoutChan: + t.Errorf("Monitoring is not running even after %v", timeout) + return + + default: + time.Sleep(10 * time.Millisecond) + } + } +} + +// block until either length of metrics is the same as \requiredMessages or timeout is triggered +func hasNumberOfMetrics(t *testing.T, timeout time.Duration, requiredMessages int) { + timeoutChan := time.After(timeout) + for len(metrics) != requiredMessages { + select { + case <-timeoutChan: + t.Errorf("Timeout %v triggered when waiting for %v messages, got %v", timeout, requiredMessages, len(metrics)) + return + + default: + time.Sleep(10 * time.Millisecond) + } + } +} + func TestSimpleStartStop(t *testing.T) { - go Start(1234, "/random", 100) - time.Sleep(time.Millisecond * 100) + go Run(1234, "/random", 100) + isRunningWithTimeout(t, time.Second) Stop() } func TestStartMultipleStop(t *testing.T) { - go Start(1234, "/random", 100) - time.Sleep(time.Millisecond * 100) + go Run(1234, "/random", 100) + isRunningWithTimeout(t, time.Second) Stop() Stop() } func cleaningUpAfterTest() { - endChannel <- struct{}{} - <-endChannel - closeChannels() - metrics = make([]Metric, 0) + Stop() } func initTest() { - initChannels(100) - // we need metrics channel to block so we don't end to quickly - metricsChannel = make(chan Metric, 0) - go eventLoop() + go Run(12345, "notimportant", 100) } // decorator function that properly inits and cleans after higher level test of Monitoring package func testFunction(t *testing.T, testToRun func(*testing.T)) { initTest() + isRunningWithTimeout(t, time.Second) testToRun(t) cleaningUpAfterTest() } @@ -46,9 +71,7 @@ func TestSendingSingleMetric(t *testing.T) { testFunction(t, func(t *testing.T) { metric := Metric{Name: "test"} Send(metric) - if len(metrics) != 1 { - t.Error("wrong number of metrics, should be 1") - } + hasNumberOfMetrics(t, time.Second, 1) if metrics[0].Name != "test" { t.Errorf("Got wrong name %s in stored metric", metrics[0].Name) @@ -60,16 +83,17 @@ func TestExportingMetrics(t *testing.T) { testFunction(t, func(t *testing.T) { metric := Metric{Name: "test"} Send(metric) + hasNumberOfMetrics(t, time.Second, 1) - metricsRequestChannel <- struct{}{} - metrics := <-metricsToRequest + metricsRequestedChannel <- struct{}{} + metricsToExport := <-metricsExportedToRequest - if len(metrics) != 1 { - t.Errorf("Got wrong amount of metrics %d, expected 1", len(metrics)) + if len(metricsToExport) != 1 { + t.Errorf("Got wrong amount of metrics %d, expected 1", len(metricsToExport)) } - if metrics[0].Name != "test" { - t.Errorf("Got wrong name of metric %s, expected test", metrics[0].Name) + if metricsToExport[0].Name != "test" { + t.Errorf("Got wrong name of metric %s, expected test", metricsToExport[0].Name) } }) } @@ -81,11 +105,9 @@ func TestBufferLimit(t *testing.T) { metric.Timestamp = 10 metric.AddTag("tag1", 42) metric.AddValue("value1", 11) - Send(metric) - if len(metrics) != 1 { - t.Errorf("Metrics length is %d, but should be 1 after sending first metric", len(metrics)) - } + Send(metric) + hasNumberOfMetrics(t, time.Second, 1) Send(metric) time.Sleep(100 * time.Millisecond) @@ -97,10 +119,10 @@ func TestBufferLimit(t *testing.T) { } func TestHttpRun(t *testing.T) { - go Start(12345, "/metrics", 10) + go Run(9876, "/metrics", 10) defer Stop() - time.Sleep(time.Second) + isRunningWithTimeout(t, time.Second) metric := Metric{Name: "test"} metric.Timestamp = 10 @@ -108,9 +130,9 @@ func TestHttpRun(t *testing.T) { metric.AddValue("value1", 11) Send(metric) - response, err := http.Get("http://localhost:12345/metrics") + response, err := http.Get("http://localhost:9876/metrics") if err != nil { - t.Fatalf("Failed to GET metrics at port 12345: %v", err) + t.Fatalf("Failed to GET metrics at port 9876: %v", err) } decoder := json.NewDecoder(response.Body) var receivedMetrics []Metric @@ -157,7 +179,7 @@ func TestHttpRun(t *testing.T) { // PASS // ok github.com/AliceO2Group/Control/common/monitoring 44.686s func BenchmarkSendingMetrics(b *testing.B) { - Start(12345, "/metrics", 100) + Run(12345, "/metrics", 100) // this goroutine keeps clearing results so RAM does not exhausted go func() { @@ -168,8 +190,8 @@ func BenchmarkSendingMetrics(b *testing.B) { break default: if len(metrics) >= 10000000 { - metricsRequestChannel <- struct{}{} - <-metricsToRequest + metricsRequestedChannel <- struct{}{} + <-metricsExportedToRequest } } time.Sleep(100 * time.Millisecond) diff --git a/core/config.go b/core/config.go index e772f694..9b48de8b 100644 --- a/core/config.go +++ b/core/config.go @@ -128,7 +128,7 @@ func setDefaults() error { viper.SetDefault("kafkaEndpoints", []string{"localhost:9092"}) viper.SetDefault("enableKafka", true) viper.SetDefault("logAllIL", false) - viper.SetDefault("metricsEndpoint", "8086/metrics") + viper.SetDefault("metricsEndpoint", "8088/ecsmetrics") viper.SetDefault("metricsBufferSize", 10000) return nil } diff --git a/core/core.go b/core/core.go index d1c708cc..ade41804 100644 --- a/core/core.go +++ b/core/core.go @@ -73,7 +73,6 @@ func parseMetricsEndpoint(metricsEndpoint string) (error, uint16, string) { } func runMetrics() { - log.Info("Starting run metrics") metricsEndpoint := viper.GetString("metricsEndpoint") err, port, endpoint := parseMetricsEndpoint(metricsEndpoint) if err != nil { @@ -82,7 +81,8 @@ func runMetrics() { } go func() { - if err := monitoring.Start(port, fmt.Sprintf("/%s", endpoint), viper.GetInt("metricsBufferSize")); err != nil && err != http.ErrServerClosed { + log.Infof("Starting to listen on endpoint %s:%d for metrics", endpoint, port) + if err := monitoring.Run(port, fmt.Sprintf("/%s", endpoint), viper.GetInt("metricsBufferSize")); err != nil && err != http.ErrServerClosed { ecsmetrics.StopGolangMetrics() log.Errorf("failed to run metrics on port %d and endpoint: %s") }