From e25de22ee887629b8e55bbafba404a05882074b9 Mon Sep 17 00:00:00 2001 From: wlggraham Date: Wed, 17 Dec 2025 12:34:08 -0700 Subject: [PATCH 1/4] include database metrics --- internal/pkg/database/database.go | 27 ++++++++++++++++- internal/pkg/database/slice.go | 18 ++++++++++++ internal/pkg/heimdall/cluster_dal.go | 41 ++++++++++++++++++++++++-- internal/pkg/heimdall/command_dal.go | 43 +++++++++++++++++++++++++++- internal/pkg/heimdall/job_dal.go | 35 ++++++++++++++++++++++ internal/pkg/heimdall/jobs_async.go | 23 ++++++++++++++- internal/pkg/heimdall/keepalive.go | 12 ++++++++ internal/pkg/janitor/job.go | 18 ++++++++++++ 8 files changed, 212 insertions(+), 5 deletions(-) diff --git a/internal/pkg/database/database.go b/internal/pkg/database/database.go index f378db3..800b9d7 100644 --- a/internal/pkg/database/database.go +++ b/internal/pkg/database/database.go @@ -3,8 +3,10 @@ package database import ( "context" "database/sql" + "time" "github.com/babourine/x/pkg/set" + "github.com/hladush/go-telemetry/pkg/telemetry" _ "github.com/lib/pq" ) @@ -13,7 +15,9 @@ const ( ) var ( - ctx = context.Background() + ctx = context.Background() + dbConnectionMethod = telemetry.NewMethod("db_connection", "database") + dbSessionMethod = telemetry.NewMethod("db_session", "database") ) type Database struct { @@ -28,22 +32,43 @@ type Session struct { func (d *Database) NewSession(withTransaction bool) (*Session, error) { + // Track session creation metrics + transactionLabel := "false" + if withTransaction { + transactionLabel = "true" + } + + defer dbSessionMethod.RecordLatency(time.Now(), "with_transaction", transactionLabel) + dbSessionMethod.CountRequest("with_transaction", transactionLabel) + var err error s := &Session{} + // Track database connection creation + connectionStart := time.Now() + dbConnectionMethod.CountRequest("driver", dbDriverName) + // open connection if s.db, err = sql.Open(dbDriverName, d.ConnectionString); err != nil { + dbConnectionMethod.LogAndCountError(err, "driver", dbDriverName) + dbSessionMethod.LogAndCountError(err, "with_transaction", transactionLabel) return nil, err } + dbConnectionMethod.RecordLatency(connectionStart, "driver", dbDriverName) + dbConnectionMethod.CountSuccess("driver", dbDriverName) + // start transaction if withTransaction { if s.trx, err = s.db.BeginTx(ctx, nil); err != nil { + s.db.Close() // Close the connection before returning error + dbSessionMethod.LogAndCountError(err, "with_transaction", transactionLabel) return nil, err } } + dbSessionMethod.CountSuccess("with_transaction", transactionLabel) return s, nil } diff --git a/internal/pkg/database/slice.go b/internal/pkg/database/slice.go index d5b387c..8cb7b2b 100644 --- a/internal/pkg/database/slice.go +++ b/internal/pkg/database/slice.go @@ -1,16 +1,32 @@ package database +import ( + "time" + + "github.com/hladush/go-telemetry/pkg/telemetry" +) + +var ( + sliceMetrics = telemetry.NewMethod("db_connection", "database_slice") +) + func GetSlice(db *Database, query string) (any, error) { + // Track DB connection for slice query operation + defer sliceMetrics.RecordLatency(time.Now(), "operation", "get_slice") + sliceMetrics.CountRequest("operation", "get_slice") + // open connection sess, err := db.NewSession(false) if err != nil { + sliceMetrics.LogAndCountError(err, "operation", "get_slice") return nil, err } defer sess.Close() rows, err := sess.Query(query) if err != nil { + sliceMetrics.LogAndCountError(err, "operation", "get_slice") return nil, err } defer rows.Close() @@ -21,6 +37,7 @@ func GetSlice(db *Database, query string) (any, error) { var item any if err := rows.Scan(&item); err != nil { + sliceMetrics.LogAndCountError(err, "operation", "get_slice") return nil, err } @@ -28,6 +45,7 @@ func GetSlice(db *Database, query string) (any, error) { } + sliceMetrics.CountSuccess("operation", "get_slice") return result, nil } diff --git a/internal/pkg/heimdall/cluster_dal.go b/internal/pkg/heimdall/cluster_dal.go index 99a31c6..d73c62a 100644 --- a/internal/pkg/heimdall/cluster_dal.go +++ b/internal/pkg/heimdall/cluster_dal.go @@ -5,7 +5,9 @@ import ( _ "embed" "encoding/json" "fmt" + "time" + "github.com/hladush/go-telemetry/pkg/telemetry" _ "github.com/lib/pq" "github.com/patterninc/heimdall/internal/pkg/database" @@ -68,6 +70,7 @@ var ( var ( ErrUnknownClusterID = fmt.Errorf(`unknown cluster_id`) + clusterDALMetrics = telemetry.NewMethod("db_connection", "cluster_dal") ) func (h *Heimdall) submitCluster(c *cluster.Cluster) (any, error) { @@ -82,9 +85,14 @@ func (h *Heimdall) submitCluster(c *cluster.Cluster) (any, error) { func (h *Heimdall) clusterUpsert(c *cluster.Cluster) error { + // Track DB connection for cluster upsert operation + defer clusterDALMetrics.RecordLatency(time.Now(), "operation", "upsert_cluster") + clusterDALMetrics.CountRequest("operation", "upsert_cluster") + // open connection sess, err := h.Database.NewSession(true) if err != nil { + clusterDALMetrics.LogAndCountError(err, "operation", "upsert_cluster") return err } defer sess.Close() @@ -112,15 +120,26 @@ func (h *Heimdall) clusterUpsert(c *cluster.Cluster) error { } } - return sess.Commit() + if err := sess.Commit(); err != nil { + clusterDALMetrics.LogAndCountError(err, "operation", "upsert_cluster") + return err + } + + clusterDALMetrics.CountSuccess("operation", "upsert_cluster") + return nil } func (h *Heimdall) getCluster(c *cluster.Cluster) (any, error) { + // Track DB connection for get cluster operation + defer clusterDALMetrics.RecordLatency(time.Now(), "operation", "get_cluster") + clusterDALMetrics.CountRequest("operation", "get_cluster") + // open connection sess, err := h.Database.NewSession(false) if err != nil { + clusterDALMetrics.LogAndCountError(err, "operation", "get_cluster") return nil, err } defer sess.Close() @@ -142,25 +161,34 @@ func (h *Heimdall) getCluster(c *cluster.Cluster) (any, error) { if err := row.Scan(&r.SystemID, &r.Status, &r.Name, &r.Version, &r.Description, &clusterContext, &r.User, &r.CreatedAt, &r.UpdatedAt); err != nil { if err == sql.ErrNoRows { - return nil, ErrUnknownCommandID + clusterDALMetrics.LogAndCountError(ErrUnknownClusterID, "operation", "get_cluster") + return nil, ErrUnknownClusterID } else { + clusterDALMetrics.LogAndCountError(err, "operation", "get_cluster") return nil, err } } if err := clusterParseContextAndTags(r, clusterContext, sess); err != nil { + clusterDALMetrics.LogAndCountError(err, "operation", "get_cluster") return nil, err } + clusterDALMetrics.CountSuccess("operation", "get_cluster") return r, nil } func (h *Heimdall) getClusterStatus(c *cluster.Cluster) (any, error) { + // Track DB connection for cluster status operation + defer clusterDALMetrics.RecordLatency(time.Now(), "operation", "get_cluster_status") + clusterDALMetrics.CountRequest("operation", "get_cluster_status") + // open connection sess, err := h.Database.NewSession(false) if err != nil { + clusterDALMetrics.LogAndCountError(err, "operation", "get_cluster_status") return nil, err } defer sess.Close() @@ -175,12 +203,15 @@ func (h *Heimdall) getClusterStatus(c *cluster.Cluster) (any, error) { if err := row.Scan(&r.Status, &r.UpdatedAt); err != nil { if err == sql.ErrNoRows { + clusterDALMetrics.LogAndCountError(ErrUnknownClusterID, "operation", "get_cluster_status") return nil, ErrUnknownClusterID } else { + clusterDALMetrics.LogAndCountError(err, "operation", "get_cluster_status") return nil, err } } + clusterDALMetrics.CountSuccess("operation", "get_cluster_status") return r, nil } @@ -209,9 +240,14 @@ func (h *Heimdall) updateClusterStatus(c *cluster.Cluster) (any, error) { func (h *Heimdall) getClusters(f *database.Filter) (any, error) { + // Track DB connection for clusters list operation + defer clusterDALMetrics.RecordLatency(time.Now(), "operation", "get_clusters") + clusterDALMetrics.CountRequest("operation", "get_clusters") + // open connection sess, err := h.Database.NewSession(false) if err != nil { + clusterDALMetrics.LogAndCountError(err, "operation", "get_clusters") return nil, err } defer sess.Close() @@ -247,6 +283,7 @@ func (h *Heimdall) getClusters(f *database.Filter) (any, error) { } + clusterDALMetrics.CountSuccess("operation", "get_clusters") return &resultset{ Data: result, }, nil diff --git a/internal/pkg/heimdall/command_dal.go b/internal/pkg/heimdall/command_dal.go index c47262b..e2bf85d 100644 --- a/internal/pkg/heimdall/command_dal.go +++ b/internal/pkg/heimdall/command_dal.go @@ -5,7 +5,9 @@ import ( _ "embed" "encoding/json" "fmt" + "time" + "github.com/hladush/go-telemetry/pkg/telemetry" _ "github.com/lib/pq" "github.com/patterninc/heimdall/internal/pkg/database" @@ -80,6 +82,7 @@ var ( var ( ErrUnknownCommandID = fmt.Errorf(`unknown command_id`) + commandDALMetrics = telemetry.NewMethod("db_connection", "command_dal") ) func (h *Heimdall) submitCommand(c *command.Command) (any, error) { @@ -94,9 +97,14 @@ func (h *Heimdall) submitCommand(c *command.Command) (any, error) { func (h *Heimdall) commandUpsert(c *command.Command) error { + // Track DB connection for command upsert operation + defer commandDALMetrics.RecordLatency(time.Now(), "operation", "upsert_command") + commandDALMetrics.CountRequest("operation", "upsert_command") + // open connection sess, err := h.Database.NewSession(true) if err != nil { + commandDALMetrics.LogAndCountError(err, "operation", "upsert_command") return err } defer sess.Close() @@ -141,15 +149,26 @@ func (h *Heimdall) commandUpsert(c *command.Command) error { } } - return sess.Commit() + if err := sess.Commit(); err != nil { + commandDALMetrics.LogAndCountError(err, "operation", "upsert_command") + return err + } + + commandDALMetrics.CountSuccess("operation", "upsert_command") + return nil } func (h *Heimdall) getCommand(c *command.Command) (any, error) { + // Track DB connection for get command operation + defer commandDALMetrics.RecordLatency(time.Now(), "operation", "get_command") + commandDALMetrics.CountRequest("operation", "get_command") + // open connection sess, err := h.Database.NewSession(false) if err != nil { + commandDALMetrics.LogAndCountError(err, "operation", "get_command") return nil, err } defer sess.Close() @@ -171,25 +190,34 @@ func (h *Heimdall) getCommand(c *command.Command) (any, error) { if err := row.Scan(&r.SystemID, &r.Status, &r.Name, &r.Version, &r.Plugin, &r.Description, &commandContext, &r.User, &r.IsSync, &r.CreatedAt, &r.UpdatedAt); err != nil { if err == sql.ErrNoRows { + commandDALMetrics.LogAndCountError(ErrUnknownCommandID, "operation", "get_command") return nil, ErrUnknownCommandID } else { + commandDALMetrics.LogAndCountError(err, "operation", "get_command") return nil, err } } if err := commandParseContextAndTags(r, commandContext, sess); err != nil { + commandDALMetrics.LogAndCountError(err, "operation", "get_command") return nil, err } + commandDALMetrics.CountSuccess("operation", "get_command") return r, nil } func (h *Heimdall) getCommandStatus(c *command.Command) (any, error) { + // Track DB connection for command status operation + defer commandDALMetrics.RecordLatency(time.Now(), "operation", "get_command_status") + commandDALMetrics.CountRequest("operation", "get_command_status") + // open connection sess, err := h.Database.NewSession(false) if err != nil { + commandDALMetrics.LogAndCountError(err, "operation", "get_command_status") return nil, err } defer sess.Close() @@ -204,21 +232,28 @@ func (h *Heimdall) getCommandStatus(c *command.Command) (any, error) { if err := row.Scan(&r.Status, &r.UpdatedAt); err != nil { if err == sql.ErrNoRows { + commandDALMetrics.LogAndCountError(ErrUnknownCommandID, "operation", "get_command_status") return nil, ErrUnknownCommandID } else { + commandDALMetrics.LogAndCountError(err, "operation", "get_command_status") return nil, err } } + commandDALMetrics.CountSuccess("operation", "get_command_status") return r, nil } func (h *Heimdall) updateCommandStatus(c *command.Command) (any, error) { + // Track DB connection for command status update operation + commandDALMetrics.CountRequest("operation", "update_command_status") + // open connection sess, err := h.Database.NewSession(false) if err != nil { + commandDALMetrics.LogAndCountError(err, "operation", "update_command_status") return nil, err } defer sess.Close() @@ -238,9 +273,14 @@ func (h *Heimdall) updateCommandStatus(c *command.Command) (any, error) { func (h *Heimdall) getCommands(f *database.Filter) (any, error) { + // Track DB connection for commands list operation + defer commandDALMetrics.RecordLatency(time.Now(), "operation", "get_commands") + commandDALMetrics.CountRequest("operation", "get_commands") + // open connection sess, err := h.Database.NewSession(false) if err != nil { + commandDALMetrics.LogAndCountError(err, "operation", "get_commands") return nil, err } defer sess.Close() @@ -276,6 +316,7 @@ func (h *Heimdall) getCommands(f *database.Filter) (any, error) { } + commandDALMetrics.CountSuccess("operation", "get_commands") return &resultset{ Data: result, }, nil diff --git a/internal/pkg/heimdall/job_dal.go b/internal/pkg/heimdall/job_dal.go index aaa34cd..69c8c93 100644 --- a/internal/pkg/heimdall/job_dal.go +++ b/internal/pkg/heimdall/job_dal.go @@ -5,7 +5,9 @@ import ( _ "embed" "encoding/json" "fmt" + "time" + "github.com/hladush/go-telemetry/pkg/telemetry" _ "github.com/lib/pq" "github.com/patterninc/heimdall/internal/pkg/database" @@ -51,6 +53,7 @@ var queryJobStatusesSelect string var ( ErrUnknownJobID = fmt.Errorf(`unknown job_id`) + jobDALMetrics = telemetry.NewMethod("db_connection", "job_dal") ) var ( @@ -92,9 +95,14 @@ type jobRequest struct { func (h *Heimdall) insertJob(j *job.Job, clusterID, commandID string) (int64, error) { + // Track DB connection for job insert operation + defer jobDALMetrics.RecordLatency(time.Now(), "operation", "insert_job") + jobDALMetrics.CountRequest("operation", "insert_job") + // open connection sess, err := h.Database.NewSession(true) if err != nil { + jobDALMetrics.LogAndCountError(err, "operation", "insert_job") return 0, err } defer sess.Close() @@ -149,18 +157,25 @@ func (h *Heimdall) insertJob(j *job.Job, clusterID, commandID string) (int64, er } if err := sess.Commit(); err != nil { + jobDALMetrics.LogAndCountError(err, "operation", "insert_job") return 0, err } + jobDALMetrics.CountSuccess("operation", "insert_job") return jobID, nil } func (h *Heimdall) getJob(j *jobRequest) (any, error) { + // Track DB connection for job get operation + defer jobDALMetrics.RecordLatency(time.Now(), "operation", "get_job") + jobDALMetrics.CountRequest("operation", "get_job") + // open connection sess, err := h.Database.NewSession(false) if err != nil { + jobDALMetrics.LogAndCountError(err, "operation", "get_job") return nil, err } defer sess.Close() @@ -189,29 +204,38 @@ func (h *Heimdall) getJob(j *jobRequest) (any, error) { } if err := jobParseContextAndTags(r, jobContext, sess); err != nil { + jobDALMetrics.LogAndCountError(err, "operation", "get_job") return nil, err } + jobDALMetrics.CountSuccess("operation", "get_job") return r, nil } func (h *Heimdall) getJobs(f *database.Filter) (any, error) { + // Track DB connection for jobs list operation + defer jobDALMetrics.RecordLatency(time.Now(), "operation", "get_jobs") + jobDALMetrics.CountRequest("operation", "get_jobs") + // open connection sess, err := h.Database.NewSession(false) if err != nil { + jobDALMetrics.LogAndCountError(err, "operation", "get_jobs") return nil, err } defer sess.Close() query, args, err := f.Render(queryJobsSelect, jobsFilterConfig) if err != nil { + jobDALMetrics.LogAndCountError(err, "operation", "get_jobs") return nil, err } rows, err := sess.Query(query, args...) if err != nil { + jobDALMetrics.LogAndCountError(err, "operation", "get_jobs") return nil, err } defer rows.Close() @@ -225,10 +249,12 @@ func (h *Heimdall) getJobs(f *database.Filter) (any, error) { if err := rows.Scan(&r.SystemID, &r.ID, &r.Status, &r.Name, &r.Version, &r.Description, &jobContext, &r.Error, &r.User, &r.IsSync, &r.CreatedAt, &r.UpdatedAt, &r.CommandID, &r.CommandName, &r.CluserID, &r.ClusterName, &r.StoreResultSync); err != nil { + jobDALMetrics.LogAndCountError(err, "operation", "get_jobs") return nil, err } if err := jobParseContextAndTags(r, jobContext, sess); err != nil { + jobDALMetrics.LogAndCountError(err, "operation", "get_jobs") return nil, err } @@ -236,6 +262,7 @@ func (h *Heimdall) getJobs(f *database.Filter) (any, error) { } + jobDALMetrics.CountSuccess("operation", "get_jobs") return &resultset{ Data: result, }, nil @@ -244,9 +271,14 @@ func (h *Heimdall) getJobs(f *database.Filter) (any, error) { func (h *Heimdall) getJobStatus(j *jobRequest) (any, error) { + // Track DB connection for job status operation + defer jobDALMetrics.RecordLatency(time.Now(), "operation", "get_job_status") + jobDALMetrics.CountRequest("operation", "get_job_status") + // open connection sess, err := h.Database.NewSession(false) if err != nil { + jobDALMetrics.LogAndCountError(err, "operation", "get_job_status") return nil, err } defer sess.Close() @@ -261,12 +293,15 @@ func (h *Heimdall) getJobStatus(j *jobRequest) (any, error) { if err := row.Scan(&r.Status, &r.Error, &r.UpdatedAt); err != nil { if err == sql.ErrNoRows { + jobDALMetrics.LogAndCountError(ErrUnknownJobID, "operation", "get_job_status") return nil, ErrUnknownJobID } else { + jobDALMetrics.LogAndCountError(err, "operation", "get_job_status") return nil, err } } + jobDALMetrics.CountSuccess("operation", "get_job_status") return r, nil } diff --git a/internal/pkg/heimdall/jobs_async.go b/internal/pkg/heimdall/jobs_async.go index bdb0c71..409036e 100644 --- a/internal/pkg/heimdall/jobs_async.go +++ b/internal/pkg/heimdall/jobs_async.go @@ -3,7 +3,9 @@ package heimdall import ( _ "embed" "fmt" + "time" + "github.com/hladush/go-telemetry/pkg/telemetry" "github.com/patterninc/heimdall/internal/pkg/database" "github.com/patterninc/heimdall/pkg/object/job" "github.com/patterninc/heimdall/pkg/object/job/status" @@ -14,6 +16,10 @@ const ( formatErrUnknownCluster = "unknown cluster: %s" ) +var ( + asyncJobsMetrics = telemetry.NewMethod("db_connection", "async_jobs") +) + //go:embed queries/job/active_select.sql var queryActiveJobSelect string @@ -28,9 +34,14 @@ var queryActiveJobDelete string func (h *Heimdall) getAsyncJobs(limit int) ([]*job.Job, error) { + // Track DB connection for async jobs retrieval + defer asyncJobsMetrics.RecordLatency(time.Now(), "operation", "get_async_jobs") + asyncJobsMetrics.CountRequest("operation", "get_async_jobs") + // open connection sess, err := h.Database.NewSession(true) if err != nil { + asyncJobsMetrics.LogAndCountError(err, "operation", "get_async_jobs") return nil, err } defer sess.Close() @@ -81,18 +92,24 @@ func (h *Heimdall) getAsyncJobs(limit int) ([]*job.Job, error) { // commit transaction if err := sess.Commit(); err != nil { + asyncJobsMetrics.LogAndCountError(err, "operation", "get_async_jobs") return nil, err } + asyncJobsMetrics.CountSuccess("operation", "get_async_jobs") return result, nil } func (h *Heimdall) runAsyncJob(j *job.Job) error { + // Track DB connection for async job execution + asyncJobsMetrics.CountRequest("operation", "run_async_job") + // let's updte job status that we're running it... sess, err := h.Database.NewSession(false) if err != nil { + asyncJobsMetrics.LogAndCountError(err, "operation", "run_async_job") return h.updateAsyncJobStatus(j, err) } defer sess.Close() @@ -119,6 +136,9 @@ func (h *Heimdall) runAsyncJob(j *job.Job) error { func (h *Heimdall) updateAsyncJobStatus(j *job.Job, jobError error) error { + // Track DB connection for async job status update + asyncJobsMetrics.CountRequest("operation", "update_async_job_status") + // we updte the final job status based on presence of the error if jobError == nil { j.Status = status.Succeeded @@ -130,8 +150,9 @@ func (h *Heimdall) updateAsyncJobStatus(j *job.Job, jobError error) error { // now we update that status in the database sess, err := h.Database.NewSession(true) if err != nil { - // TODO: implement proper logging + asyncJobsMetrics.LogAndCountError(err, "operation", "update_async_job_status") fmt.Println(`session error:`, err) + return jobError // Return early if session creation fails } defer sess.Close() diff --git a/internal/pkg/heimdall/keepalive.go b/internal/pkg/heimdall/keepalive.go index 60da93c..a5caf32 100644 --- a/internal/pkg/heimdall/keepalive.go +++ b/internal/pkg/heimdall/keepalive.go @@ -3,12 +3,18 @@ package heimdall import ( _ "embed" "time" + + "github.com/hladush/go-telemetry/pkg/telemetry" ) const ( defaultJanitorKeepalive = 5 // seconds ) +var ( + keepaliveMetrics = telemetry.NewMethod("db_connection", "keepalive") +) + //go:embed queries/job/active_keepalive.sql var queryActiveJobKeepalive string @@ -22,9 +28,14 @@ func (h *Heimdall) jobKeepalive(done <-chan struct{}, jobID int64, agentName str ticker := time.NewTicker(time.Duration(keepaliveSeconds) * time.Second) defer ticker.Stop() + // Track DB connection for job keepalive + defer keepaliveMetrics.RecordLatency(time.Now(), "operation", "job_keepalive") + keepaliveMetrics.CountRequest("operation", "job_keepalive") + // set the db session sess, err := h.Database.NewSession(false) if err != nil { + keepaliveMetrics.LogAndCountError(err, "operation", "job_keepalive") sess = nil } defer sess.Close() @@ -39,6 +50,7 @@ func (h *Heimdall) jobKeepalive(done <-chan struct{}, jobID int64, agentName str } case _, stillOpen := <-done: if !stillOpen { + keepaliveMetrics.CountSuccess("operation", "job_keepalive") return } } diff --git a/internal/pkg/janitor/job.go b/internal/pkg/janitor/job.go index 2e4ea80..a8c3763 100644 --- a/internal/pkg/janitor/job.go +++ b/internal/pkg/janitor/job.go @@ -2,10 +2,16 @@ package janitor import ( _ "embed" + "time" + "github.com/hladush/go-telemetry/pkg/telemetry" "github.com/patterninc/heimdall/internal/pkg/database" ) +var ( + janitorMetrics = telemetry.NewMethod("db_connection", "janitor") +) + //go:embed queries/stale_jobs_select.sql var queryStaleJobsSelect string @@ -17,15 +23,21 @@ var queryStaleJobsDelete string func (j *Janitor) cleanupStaleJobs() error { + // Track DB connection for janitor cleanup operations + defer janitorMetrics.RecordLatency(time.Now(), "operation", "cleanup_stale_jobs") + janitorMetrics.CountRequest("operation", "cleanup_stale_jobs") + // let's find the jobs we'll be cleaning up... sess, err := j.db.NewSession(false) if err != nil { + janitorMetrics.LogAndCountError(err, "operation", "cleanup_session_create") return err } defer sess.Close() rows, err := sess.Query(queryStaleJobsSelect, j.StaleJob) if err != nil { + janitorMetrics.LogAndCountError(err, "operation", "cleanup_select_stale_jobs") return err } defer rows.Close() @@ -37,6 +49,7 @@ func (j *Janitor) cleanupStaleJobs() error { var jobID int if err := rows.Scan(&jobID); err != nil { + janitorMetrics.LogAndCountError(err, "operation", "cleanup_scan_stale_jobs") return err } @@ -54,23 +67,28 @@ func (j *Janitor) cleanupStaleJobs() error { // prepare query to update job statuses updateStaleJobs, jobSystemIDs, err := database.PrepareSliceQuery(queryFailStaleJobs, `$%d`, staleJobIDs) if err != nil { + janitorMetrics.LogAndCountError(err, "operation", "cleanup_prepare_update_query") return err } if _, err := sess.Exec(updateStaleJobs, jobSystemIDs...); err != nil { + janitorMetrics.LogAndCountError(err, "operation", "cleanup_update_stale_jobs") return err } // delete stale jobs from active jobs deleteStaleJobs, jobSystemIDs, err := database.PrepareSliceQuery(queryStaleJobsDelete, `$%d`, staleJobIDs) if err != nil { + janitorMetrics.LogAndCountError(err, "operation", "cleanup_prepare_delete_query") return err } if _, err := sess.Exec(deleteStaleJobs, jobSystemIDs...); err != nil { + janitorMetrics.LogAndCountError(err, "operation", "cleanup_delete_stale_jobs") return err } + janitorMetrics.CountSuccess("operation", "cleanup_stale_jobs") return nil } From 9a9be0a7d29427a4e42766a90e34eb1048f155e9 Mon Sep 17 00:00:00 2001 From: wlggraham Date: Wed, 17 Dec 2025 14:28:23 -0700 Subject: [PATCH 2/4] standardize metric format --- docker-compose.yaml | 2 +- internal/pkg/database/database.go | 22 +++++------ internal/pkg/database/slice.go | 14 +++---- internal/pkg/heimdall/cluster_dal.go | 53 +++++++++++++++------------ internal/pkg/heimdall/command_dal.go | 54 ++++++++++++++------------- internal/pkg/heimdall/job_dal.go | 55 +++++++++++++++------------- internal/pkg/heimdall/jobs_async.go | 26 ++++++++----- internal/pkg/heimdall/keepalive.go | 10 ++--- 8 files changed, 125 insertions(+), 111 deletions(-) diff --git a/docker-compose.yaml b/docker-compose.yaml index 15d3151..dc3642f 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -29,7 +29,7 @@ services: - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} - AWS_SESSION_TOKEN=${AWS_SESSION_TOKEN} - AWS_REGION=${AWS_REGION} - - METRICS_EMITTER=prometheus + - METRICS_EMITTER=console - PROMETHEUS_ADDRESS=:9091 depends_on: postgres: diff --git a/internal/pkg/database/database.go b/internal/pkg/database/database.go index 800b9d7..a932fa7 100644 --- a/internal/pkg/database/database.go +++ b/internal/pkg/database/database.go @@ -15,9 +15,8 @@ const ( ) var ( - ctx = context.Background() - dbConnectionMethod = telemetry.NewMethod("db_connection", "database") - dbSessionMethod = telemetry.NewMethod("db_session", "database") + ctx = context.Background() + newSessionMethod = telemetry.NewMethod("db_connection", "new_session") ) type Database struct { @@ -38,8 +37,8 @@ func (d *Database) NewSession(withTransaction bool) (*Session, error) { transactionLabel = "true" } - defer dbSessionMethod.RecordLatency(time.Now(), "with_transaction", transactionLabel) - dbSessionMethod.CountRequest("with_transaction", transactionLabel) + defer newSessionMethod.RecordLatency(time.Now(), "with_transaction", transactionLabel) + newSessionMethod.CountRequest("with_transaction", transactionLabel) var err error @@ -47,28 +46,27 @@ func (d *Database) NewSession(withTransaction bool) (*Session, error) { // Track database connection creation connectionStart := time.Now() - dbConnectionMethod.CountRequest("driver", dbDriverName) + newSessionMethod.CountRequest("with_transaction", transactionLabel) // open connection if s.db, err = sql.Open(dbDriverName, d.ConnectionString); err != nil { - dbConnectionMethod.LogAndCountError(err, "driver", dbDriverName) - dbSessionMethod.LogAndCountError(err, "with_transaction", transactionLabel) + newSessionMethod.LogAndCountError(err, "with_transaction", transactionLabel) return nil, err } - dbConnectionMethod.RecordLatency(connectionStart, "driver", dbDriverName) - dbConnectionMethod.CountSuccess("driver", dbDriverName) + newSessionMethod.RecordLatency(connectionStart, "with_transaction", transactionLabel) + newSessionMethod.CountSuccess("with_transaction", transactionLabel) // start transaction if withTransaction { if s.trx, err = s.db.BeginTx(ctx, nil); err != nil { s.db.Close() // Close the connection before returning error - dbSessionMethod.LogAndCountError(err, "with_transaction", transactionLabel) + newSessionMethod.LogAndCountError(err, "with_transaction", transactionLabel) return nil, err } } - dbSessionMethod.CountSuccess("with_transaction", transactionLabel) + newSessionMethod.CountSuccess("with_transaction", transactionLabel) return s, nil } diff --git a/internal/pkg/database/slice.go b/internal/pkg/database/slice.go index 8cb7b2b..8b59bfa 100644 --- a/internal/pkg/database/slice.go +++ b/internal/pkg/database/slice.go @@ -7,26 +7,26 @@ import ( ) var ( - sliceMetrics = telemetry.NewMethod("db_connection", "database_slice") + getSliceMethod = telemetry.NewMethod("db_connection", "get_slice") ) func GetSlice(db *Database, query string) (any, error) { // Track DB connection for slice query operation - defer sliceMetrics.RecordLatency(time.Now(), "operation", "get_slice") - sliceMetrics.CountRequest("operation", "get_slice") + defer getSliceMethod.RecordLatency(time.Now()) + getSliceMethod.CountRequest() // open connection sess, err := db.NewSession(false) if err != nil { - sliceMetrics.LogAndCountError(err, "operation", "get_slice") + getSliceMethod.LogAndCountError(err, "new_session") return nil, err } defer sess.Close() rows, err := sess.Query(query) if err != nil { - sliceMetrics.LogAndCountError(err, "operation", "get_slice") + getSliceMethod.LogAndCountError(err, "query") return nil, err } defer rows.Close() @@ -37,7 +37,7 @@ func GetSlice(db *Database, query string) (any, error) { var item any if err := rows.Scan(&item); err != nil { - sliceMetrics.LogAndCountError(err, "operation", "get_slice") + getSliceMethod.LogAndCountError(err, "scan") return nil, err } @@ -45,7 +45,7 @@ func GetSlice(db *Database, query string) (any, error) { } - sliceMetrics.CountSuccess("operation", "get_slice") + getSliceMethod.CountSuccess() return result, nil } diff --git a/internal/pkg/heimdall/cluster_dal.go b/internal/pkg/heimdall/cluster_dal.go index d73c62a..188aabf 100644 --- a/internal/pkg/heimdall/cluster_dal.go +++ b/internal/pkg/heimdall/cluster_dal.go @@ -69,8 +69,12 @@ var ( ) var ( - ErrUnknownClusterID = fmt.Errorf(`unknown cluster_id`) - clusterDALMetrics = telemetry.NewMethod("db_connection", "cluster_dal") + ErrUnknownClusterID = fmt.Errorf(`unknown cluster_id`) + upsertClusterMethod = telemetry.NewMethod("db_connection", "upsert_cluster") + getClusterMethod = telemetry.NewMethod("db_connection", "get_cluster") + getClusterStatusMethod = telemetry.NewMethod("db_connection", "get_cluster_status") + updateClusterStatusMethod = telemetry.NewMethod("db_connection", "update_cluster_status") + getClustersMethod = telemetry.NewMethod("db_connection", "get_clusters") ) func (h *Heimdall) submitCluster(c *cluster.Cluster) (any, error) { @@ -86,13 +90,13 @@ func (h *Heimdall) submitCluster(c *cluster.Cluster) (any, error) { func (h *Heimdall) clusterUpsert(c *cluster.Cluster) error { // Track DB connection for cluster upsert operation - defer clusterDALMetrics.RecordLatency(time.Now(), "operation", "upsert_cluster") - clusterDALMetrics.CountRequest("operation", "upsert_cluster") + defer upsertClusterMethod.RecordLatency(time.Now()) + upsertClusterMethod.CountRequest() // open connection sess, err := h.Database.NewSession(true) if err != nil { - clusterDALMetrics.LogAndCountError(err, "operation", "upsert_cluster") + upsertClusterMethod.LogAndCountError(err, "new_session") return err } defer sess.Close() @@ -121,11 +125,11 @@ func (h *Heimdall) clusterUpsert(c *cluster.Cluster) error { } if err := sess.Commit(); err != nil { - clusterDALMetrics.LogAndCountError(err, "operation", "upsert_cluster") + upsertClusterMethod.LogAndCountError(err, "commit") return err } - clusterDALMetrics.CountSuccess("operation", "upsert_cluster") + upsertClusterMethod.CountSuccess() return nil } @@ -133,13 +137,13 @@ func (h *Heimdall) clusterUpsert(c *cluster.Cluster) error { func (h *Heimdall) getCluster(c *cluster.Cluster) (any, error) { // Track DB connection for get cluster operation - defer clusterDALMetrics.RecordLatency(time.Now(), "operation", "get_cluster") - clusterDALMetrics.CountRequest("operation", "get_cluster") + defer getClusterMethod.RecordLatency(time.Now()) + getClusterMethod.CountRequest() // open connection sess, err := h.Database.NewSession(false) if err != nil { - clusterDALMetrics.LogAndCountError(err, "operation", "get_cluster") + getClusterMethod.LogAndCountError(err, "new_session") return nil, err } defer sess.Close() @@ -161,20 +165,18 @@ func (h *Heimdall) getCluster(c *cluster.Cluster) (any, error) { if err := row.Scan(&r.SystemID, &r.Status, &r.Name, &r.Version, &r.Description, &clusterContext, &r.User, &r.CreatedAt, &r.UpdatedAt); err != nil { if err == sql.ErrNoRows { - clusterDALMetrics.LogAndCountError(ErrUnknownClusterID, "operation", "get_cluster") return nil, ErrUnknownClusterID } else { - clusterDALMetrics.LogAndCountError(err, "operation", "get_cluster") return nil, err } } if err := clusterParseContextAndTags(r, clusterContext, sess); err != nil { - clusterDALMetrics.LogAndCountError(err, "operation", "get_cluster") + getClusterMethod.LogAndCountError(err, "cluster_parse_context_and_tags") return nil, err } - clusterDALMetrics.CountSuccess("operation", "get_cluster") + getClusterMethod.CountSuccess() return r, nil } @@ -182,13 +184,13 @@ func (h *Heimdall) getCluster(c *cluster.Cluster) (any, error) { func (h *Heimdall) getClusterStatus(c *cluster.Cluster) (any, error) { // Track DB connection for cluster status operation - defer clusterDALMetrics.RecordLatency(time.Now(), "operation", "get_cluster_status") - clusterDALMetrics.CountRequest("operation", "get_cluster_status") + defer getClusterStatusMethod.RecordLatency(time.Now()) + getClusterStatusMethod.CountRequest() // open connection sess, err := h.Database.NewSession(false) if err != nil { - clusterDALMetrics.LogAndCountError(err, "operation", "get_cluster_status") + getClusterStatusMethod.LogAndCountError(err, "new_session") return nil, err } defer sess.Close() @@ -203,24 +205,26 @@ func (h *Heimdall) getClusterStatus(c *cluster.Cluster) (any, error) { if err := row.Scan(&r.Status, &r.UpdatedAt); err != nil { if err == sql.ErrNoRows { - clusterDALMetrics.LogAndCountError(ErrUnknownClusterID, "operation", "get_cluster_status") return nil, ErrUnknownClusterID } else { - clusterDALMetrics.LogAndCountError(err, "operation", "get_cluster_status") return nil, err } } - clusterDALMetrics.CountSuccess("operation", "get_cluster_status") + getClusterStatusMethod.CountSuccess() return r, nil } func (h *Heimdall) updateClusterStatus(c *cluster.Cluster) (any, error) { + defer updateClusterStatusMethod.RecordLatency(time.Now()) + updateClusterStatusMethod.CountRequest() + // open connection sess, err := h.Database.NewSession(false) if err != nil { + updateClusterStatusMethod.LogAndCountError(err, "new_session") return nil, err } defer sess.Close() @@ -234,6 +238,7 @@ func (h *Heimdall) updateClusterStatus(c *cluster.Cluster) (any, error) { return nil, ErrUnknownClusterID } + updateClusterStatusMethod.CountSuccess() return h.getClusterStatus(c) } @@ -241,13 +246,13 @@ func (h *Heimdall) updateClusterStatus(c *cluster.Cluster) (any, error) { func (h *Heimdall) getClusters(f *database.Filter) (any, error) { // Track DB connection for clusters list operation - defer clusterDALMetrics.RecordLatency(time.Now(), "operation", "get_clusters") - clusterDALMetrics.CountRequest("operation", "get_clusters") + defer getClustersMethod.RecordLatency(time.Now()) + getClustersMethod.CountRequest() // open connection sess, err := h.Database.NewSession(false) if err != nil { - clusterDALMetrics.LogAndCountError(err, "operation", "get_clusters") + getClustersMethod.LogAndCountError(err, "new_session") return nil, err } defer sess.Close() @@ -283,7 +288,7 @@ func (h *Heimdall) getClusters(f *database.Filter) (any, error) { } - clusterDALMetrics.CountSuccess("operation", "get_clusters") + getClustersMethod.CountSuccess() return &resultset{ Data: result, }, nil diff --git a/internal/pkg/heimdall/command_dal.go b/internal/pkg/heimdall/command_dal.go index e2bf85d..ad9cd99 100644 --- a/internal/pkg/heimdall/command_dal.go +++ b/internal/pkg/heimdall/command_dal.go @@ -81,8 +81,12 @@ var ( ) var ( - ErrUnknownCommandID = fmt.Errorf(`unknown command_id`) - commandDALMetrics = telemetry.NewMethod("db_connection", "command_dal") + ErrUnknownCommandID = fmt.Errorf(`unknown command_id`) + upsertCommandMethod = telemetry.NewMethod("db_connection", "upsert_command") + getCommandMethod = telemetry.NewMethod("db_connection", "get_command") + getCommandStatusMethod = telemetry.NewMethod("db_connection", "get_command_status") + updateCommandStatusMethod = telemetry.NewMethod("db_connection", "update_command_status") + getCommandsMethod = telemetry.NewMethod("db_connection", "get_commands") ) func (h *Heimdall) submitCommand(c *command.Command) (any, error) { @@ -98,13 +102,13 @@ func (h *Heimdall) submitCommand(c *command.Command) (any, error) { func (h *Heimdall) commandUpsert(c *command.Command) error { // Track DB connection for command upsert operation - defer commandDALMetrics.RecordLatency(time.Now(), "operation", "upsert_command") - commandDALMetrics.CountRequest("operation", "upsert_command") + defer upsertCommandMethod.RecordLatency(time.Now()) + upsertCommandMethod.CountRequest() // open connection sess, err := h.Database.NewSession(true) if err != nil { - commandDALMetrics.LogAndCountError(err, "operation", "upsert_command") + upsertCommandMethod.LogAndCountError(err, "new_session") return err } defer sess.Close() @@ -150,11 +154,11 @@ func (h *Heimdall) commandUpsert(c *command.Command) error { } if err := sess.Commit(); err != nil { - commandDALMetrics.LogAndCountError(err, "operation", "upsert_command") + upsertCommandMethod.LogAndCountError(err, "commit") return err } - commandDALMetrics.CountSuccess("operation", "upsert_command") + upsertCommandMethod.CountSuccess() return nil } @@ -162,13 +166,13 @@ func (h *Heimdall) commandUpsert(c *command.Command) error { func (h *Heimdall) getCommand(c *command.Command) (any, error) { // Track DB connection for get command operation - defer commandDALMetrics.RecordLatency(time.Now(), "operation", "get_command") - commandDALMetrics.CountRequest("operation", "get_command") + defer getCommandMethod.RecordLatency(time.Now()) + getCommandMethod.CountRequest() // open connection sess, err := h.Database.NewSession(false) if err != nil { - commandDALMetrics.LogAndCountError(err, "operation", "get_command") + getCommandMethod.LogAndCountError(err, "new_session") return nil, err } defer sess.Close() @@ -190,20 +194,18 @@ func (h *Heimdall) getCommand(c *command.Command) (any, error) { if err := row.Scan(&r.SystemID, &r.Status, &r.Name, &r.Version, &r.Plugin, &r.Description, &commandContext, &r.User, &r.IsSync, &r.CreatedAt, &r.UpdatedAt); err != nil { if err == sql.ErrNoRows { - commandDALMetrics.LogAndCountError(ErrUnknownCommandID, "operation", "get_command") return nil, ErrUnknownCommandID } else { - commandDALMetrics.LogAndCountError(err, "operation", "get_command") return nil, err } } if err := commandParseContextAndTags(r, commandContext, sess); err != nil { - commandDALMetrics.LogAndCountError(err, "operation", "get_command") + getCommandMethod.LogAndCountError(err, "command_parse_context_and_tags") return nil, err } - commandDALMetrics.CountSuccess("operation", "get_command") + getCommandMethod.CountSuccess() return r, nil } @@ -211,13 +213,13 @@ func (h *Heimdall) getCommand(c *command.Command) (any, error) { func (h *Heimdall) getCommandStatus(c *command.Command) (any, error) { // Track DB connection for command status operation - defer commandDALMetrics.RecordLatency(time.Now(), "operation", "get_command_status") - commandDALMetrics.CountRequest("operation", "get_command_status") + defer getCommandStatusMethod.RecordLatency(time.Now()) + getCommandStatusMethod.CountRequest() // open connection sess, err := h.Database.NewSession(false) if err != nil { - commandDALMetrics.LogAndCountError(err, "operation", "get_command_status") + getCommandStatusMethod.LogAndCountError(err, "new_session") return nil, err } defer sess.Close() @@ -232,15 +234,13 @@ func (h *Heimdall) getCommandStatus(c *command.Command) (any, error) { if err := row.Scan(&r.Status, &r.UpdatedAt); err != nil { if err == sql.ErrNoRows { - commandDALMetrics.LogAndCountError(ErrUnknownCommandID, "operation", "get_command_status") return nil, ErrUnknownCommandID } else { - commandDALMetrics.LogAndCountError(err, "operation", "get_command_status") return nil, err } } - commandDALMetrics.CountSuccess("operation", "get_command_status") + getCommandStatusMethod.CountSuccess() return r, nil } @@ -248,12 +248,13 @@ func (h *Heimdall) getCommandStatus(c *command.Command) (any, error) { func (h *Heimdall) updateCommandStatus(c *command.Command) (any, error) { // Track DB connection for command status update operation - commandDALMetrics.CountRequest("operation", "update_command_status") + defer updateCommandStatusMethod.RecordLatency(time.Now()) + updateCommandStatusMethod.CountRequest() // open connection sess, err := h.Database.NewSession(false) if err != nil { - commandDALMetrics.LogAndCountError(err, "operation", "update_command_status") + updateCommandStatusMethod.LogAndCountError(err, "new_session") return nil, err } defer sess.Close() @@ -267,6 +268,7 @@ func (h *Heimdall) updateCommandStatus(c *command.Command) (any, error) { return nil, ErrUnknownCommandID } + updateCommandStatusMethod.CountSuccess() return h.getCommandStatus(c) } @@ -274,13 +276,13 @@ func (h *Heimdall) updateCommandStatus(c *command.Command) (any, error) { func (h *Heimdall) getCommands(f *database.Filter) (any, error) { // Track DB connection for commands list operation - defer commandDALMetrics.RecordLatency(time.Now(), "operation", "get_commands") - commandDALMetrics.CountRequest("operation", "get_commands") + defer getCommandsMethod.RecordLatency(time.Now()) + getCommandsMethod.CountRequest() // open connection sess, err := h.Database.NewSession(false) if err != nil { - commandDALMetrics.LogAndCountError(err, "operation", "get_commands") + getCommandsMethod.LogAndCountError(err, "new_session") return nil, err } defer sess.Close() @@ -316,7 +318,7 @@ func (h *Heimdall) getCommands(f *database.Filter) (any, error) { } - commandDALMetrics.CountSuccess("operation", "get_commands") + getCommandsMethod.CountSuccess() return &resultset{ Data: result, }, nil diff --git a/internal/pkg/heimdall/job_dal.go b/internal/pkg/heimdall/job_dal.go index 69c8c93..31f070a 100644 --- a/internal/pkg/heimdall/job_dal.go +++ b/internal/pkg/heimdall/job_dal.go @@ -52,8 +52,11 @@ var queryJobStatusSelect string var queryJobStatusesSelect string var ( - ErrUnknownJobID = fmt.Errorf(`unknown job_id`) - jobDALMetrics = telemetry.NewMethod("db_connection", "job_dal") + ErrUnknownJobID = fmt.Errorf(`unknown job_id`) + insertJobMethod = telemetry.NewMethod("db_connection", "insert_job") + getJobMethod = telemetry.NewMethod("db_connection", "get_job") + getJobsMethod = telemetry.NewMethod("db_connection", "get_jobs") + getJobStatusMethod = telemetry.NewMethod("db_connection", "get_job_status") ) var ( @@ -96,13 +99,13 @@ type jobRequest struct { func (h *Heimdall) insertJob(j *job.Job, clusterID, commandID string) (int64, error) { // Track DB connection for job insert operation - defer jobDALMetrics.RecordLatency(time.Now(), "operation", "insert_job") - jobDALMetrics.CountRequest("operation", "insert_job") + defer insertJobMethod.RecordLatency(time.Now()) + insertJobMethod.CountRequest() // open connection sess, err := h.Database.NewSession(true) if err != nil { - jobDALMetrics.LogAndCountError(err, "operation", "insert_job") + insertJobMethod.LogAndCountError(err, "new_session") return 0, err } defer sess.Close() @@ -157,11 +160,11 @@ func (h *Heimdall) insertJob(j *job.Job, clusterID, commandID string) (int64, er } if err := sess.Commit(); err != nil { - jobDALMetrics.LogAndCountError(err, "operation", "insert_job") + insertJobMethod.LogAndCountError(err, "commit") return 0, err } - jobDALMetrics.CountSuccess("operation", "insert_job") + insertJobMethod.CountSuccess() return jobID, nil } @@ -169,13 +172,13 @@ func (h *Heimdall) insertJob(j *job.Job, clusterID, commandID string) (int64, er func (h *Heimdall) getJob(j *jobRequest) (any, error) { // Track DB connection for job get operation - defer jobDALMetrics.RecordLatency(time.Now(), "operation", "get_job") - jobDALMetrics.CountRequest("operation", "get_job") + defer getJobMethod.RecordLatency(time.Now()) + getJobMethod.CountRequest() // open connection sess, err := h.Database.NewSession(false) if err != nil { - jobDALMetrics.LogAndCountError(err, "operation", "get_job") + getJobMethod.LogAndCountError(err, "new_session") return nil, err } defer sess.Close() @@ -204,11 +207,11 @@ func (h *Heimdall) getJob(j *jobRequest) (any, error) { } if err := jobParseContextAndTags(r, jobContext, sess); err != nil { - jobDALMetrics.LogAndCountError(err, "operation", "get_job") + getJobMethod.LogAndCountError(err, "job_parse_context_and_tags") return nil, err } - jobDALMetrics.CountSuccess("operation", "get_job") + getJobMethod.CountSuccess() return r, nil } @@ -216,26 +219,26 @@ func (h *Heimdall) getJob(j *jobRequest) (any, error) { func (h *Heimdall) getJobs(f *database.Filter) (any, error) { // Track DB connection for jobs list operation - defer jobDALMetrics.RecordLatency(time.Now(), "operation", "get_jobs") - jobDALMetrics.CountRequest("operation", "get_jobs") + defer getJobsMethod.RecordLatency(time.Now()) + getJobsMethod.CountRequest() // open connection sess, err := h.Database.NewSession(false) if err != nil { - jobDALMetrics.LogAndCountError(err, "operation", "get_jobs") + getJobsMethod.LogAndCountError(err, "new_session") return nil, err } defer sess.Close() query, args, err := f.Render(queryJobsSelect, jobsFilterConfig) if err != nil { - jobDALMetrics.LogAndCountError(err, "operation", "get_jobs") + getJobsMethod.LogAndCountError(err, "query") return nil, err } rows, err := sess.Query(query, args...) if err != nil { - jobDALMetrics.LogAndCountError(err, "operation", "get_jobs") + getJobsMethod.LogAndCountError(err, "query") return nil, err } defer rows.Close() @@ -249,12 +252,12 @@ func (h *Heimdall) getJobs(f *database.Filter) (any, error) { if err := rows.Scan(&r.SystemID, &r.ID, &r.Status, &r.Name, &r.Version, &r.Description, &jobContext, &r.Error, &r.User, &r.IsSync, &r.CreatedAt, &r.UpdatedAt, &r.CommandID, &r.CommandName, &r.CluserID, &r.ClusterName, &r.StoreResultSync); err != nil { - jobDALMetrics.LogAndCountError(err, "operation", "get_jobs") + getJobsMethod.LogAndCountError(err, "scan") return nil, err } if err := jobParseContextAndTags(r, jobContext, sess); err != nil { - jobDALMetrics.LogAndCountError(err, "operation", "get_jobs") + getJobsMethod.LogAndCountError(err, "job_parse_context_and_tags") return nil, err } @@ -262,7 +265,7 @@ func (h *Heimdall) getJobs(f *database.Filter) (any, error) { } - jobDALMetrics.CountSuccess("operation", "get_jobs") + getJobsMethod.CountSuccess() return &resultset{ Data: result, }, nil @@ -272,13 +275,13 @@ func (h *Heimdall) getJobs(f *database.Filter) (any, error) { func (h *Heimdall) getJobStatus(j *jobRequest) (any, error) { // Track DB connection for job status operation - defer jobDALMetrics.RecordLatency(time.Now(), "operation", "get_job_status") - jobDALMetrics.CountRequest("operation", "get_job_status") + defer getJobStatusMethod.RecordLatency(time.Now()) + getJobStatusMethod.CountRequest() // open connection sess, err := h.Database.NewSession(false) if err != nil { - jobDALMetrics.LogAndCountError(err, "operation", "get_job_status") + getJobStatusMethod.LogAndCountError(err, "new_session") return nil, err } defer sess.Close() @@ -293,15 +296,15 @@ func (h *Heimdall) getJobStatus(j *jobRequest) (any, error) { if err := row.Scan(&r.Status, &r.Error, &r.UpdatedAt); err != nil { if err == sql.ErrNoRows { - jobDALMetrics.LogAndCountError(ErrUnknownJobID, "operation", "get_job_status") + getJobStatusMethod.LogAndCountError(ErrUnknownJobID, "query") return nil, ErrUnknownJobID } else { - jobDALMetrics.LogAndCountError(err, "operation", "get_job_status") + getJobStatusMethod.LogAndCountError(err, "query") return nil, err } } - jobDALMetrics.CountSuccess("operation", "get_job_status") + getJobStatusMethod.CountSuccess() return r, nil } diff --git a/internal/pkg/heimdall/jobs_async.go b/internal/pkg/heimdall/jobs_async.go index 409036e..49a23f4 100644 --- a/internal/pkg/heimdall/jobs_async.go +++ b/internal/pkg/heimdall/jobs_async.go @@ -17,7 +17,9 @@ const ( ) var ( - asyncJobsMetrics = telemetry.NewMethod("db_connection", "async_jobs") + getAsyncJobsMethod = telemetry.NewMethod("db_connection", "get_async_jobs") + runAsyncJobMethod = telemetry.NewMethod("db_connection", "run_async_job") + updateAsyncJobStatusMethod = telemetry.NewMethod("db_connection", "update_async_job_status") ) //go:embed queries/job/active_select.sql @@ -35,13 +37,13 @@ var queryActiveJobDelete string func (h *Heimdall) getAsyncJobs(limit int) ([]*job.Job, error) { // Track DB connection for async jobs retrieval - defer asyncJobsMetrics.RecordLatency(time.Now(), "operation", "get_async_jobs") - asyncJobsMetrics.CountRequest("operation", "get_async_jobs") + defer getAsyncJobsMethod.RecordLatency(time.Now()) + getAsyncJobsMethod.CountRequest() // open connection sess, err := h.Database.NewSession(true) if err != nil { - asyncJobsMetrics.LogAndCountError(err, "operation", "get_async_jobs") + getAsyncJobsMethod.LogAndCountError(err, "new_session") return nil, err } defer sess.Close() @@ -92,11 +94,11 @@ func (h *Heimdall) getAsyncJobs(limit int) ([]*job.Job, error) { // commit transaction if err := sess.Commit(); err != nil { - asyncJobsMetrics.LogAndCountError(err, "operation", "get_async_jobs") + getAsyncJobsMethod.LogAndCountError(err, "commit") return nil, err } - asyncJobsMetrics.CountSuccess("operation", "get_async_jobs") + getAsyncJobsMethod.CountSuccess() return result, nil } @@ -104,12 +106,13 @@ func (h *Heimdall) getAsyncJobs(limit int) ([]*job.Job, error) { func (h *Heimdall) runAsyncJob(j *job.Job) error { // Track DB connection for async job execution - asyncJobsMetrics.CountRequest("operation", "run_async_job") + defer runAsyncJobMethod.RecordLatency(time.Now()) + runAsyncJobMethod.CountRequest() // let's updte job status that we're running it... sess, err := h.Database.NewSession(false) if err != nil { - asyncJobsMetrics.LogAndCountError(err, "operation", "run_async_job") + runAsyncJobMethod.LogAndCountError(err, "new_session") return h.updateAsyncJobStatus(j, err) } defer sess.Close() @@ -130,6 +133,7 @@ func (h *Heimdall) runAsyncJob(j *job.Job) error { return h.updateAsyncJobStatus(j, fmt.Errorf(formatErrUnknownCluster, j.CluserID)) } + runAsyncJobMethod.CountSuccess() return h.updateAsyncJobStatus(j, h.runJob(j, command, cluster)) } @@ -137,7 +141,8 @@ func (h *Heimdall) runAsyncJob(j *job.Job) error { func (h *Heimdall) updateAsyncJobStatus(j *job.Job, jobError error) error { // Track DB connection for async job status update - asyncJobsMetrics.CountRequest("operation", "update_async_job_status") + defer updateAsyncJobStatusMethod.RecordLatency(time.Now()) + updateAsyncJobStatusMethod.CountRequest() // we updte the final job status based on presence of the error if jobError == nil { @@ -150,7 +155,7 @@ func (h *Heimdall) updateAsyncJobStatus(j *job.Job, jobError error) error { // now we update that status in the database sess, err := h.Database.NewSession(true) if err != nil { - asyncJobsMetrics.LogAndCountError(err, "operation", "update_async_job_status") + updateAsyncJobStatusMethod.LogAndCountError(err, "new_session") fmt.Println(`session error:`, err) return jobError // Return early if session creation fails } @@ -171,6 +176,7 @@ func (h *Heimdall) updateAsyncJobStatus(j *job.Job, jobError error) error { fmt.Println(`session commit error:`, err) } + updateAsyncJobStatusMethod.CountSuccess() return jobError } diff --git a/internal/pkg/heimdall/keepalive.go b/internal/pkg/heimdall/keepalive.go index a5caf32..4a4e0c1 100644 --- a/internal/pkg/heimdall/keepalive.go +++ b/internal/pkg/heimdall/keepalive.go @@ -12,7 +12,7 @@ const ( ) var ( - keepaliveMetrics = telemetry.NewMethod("db_connection", "keepalive") + jobKeepaliveMethod = telemetry.NewMethod("db_connection", "job_keepalive") ) //go:embed queries/job/active_keepalive.sql @@ -29,13 +29,13 @@ func (h *Heimdall) jobKeepalive(done <-chan struct{}, jobID int64, agentName str defer ticker.Stop() // Track DB connection for job keepalive - defer keepaliveMetrics.RecordLatency(time.Now(), "operation", "job_keepalive") - keepaliveMetrics.CountRequest("operation", "job_keepalive") + defer jobKeepaliveMethod.RecordLatency(time.Now()) + jobKeepaliveMethod.CountRequest() // set the db session sess, err := h.Database.NewSession(false) if err != nil { - keepaliveMetrics.LogAndCountError(err, "operation", "job_keepalive") + jobKeepaliveMethod.LogAndCountError(err, "new_session") sess = nil } defer sess.Close() @@ -50,7 +50,7 @@ func (h *Heimdall) jobKeepalive(done <-chan struct{}, jobID int64, agentName str } case _, stillOpen := <-done: if !stillOpen { - keepaliveMetrics.CountSuccess("operation", "job_keepalive") + jobKeepaliveMethod.CountSuccess() return } } From b8fd09c857426458dfc522b635931b58fdcdbd7f Mon Sep 17 00:00:00 2001 From: wlggraham Date: Wed, 17 Dec 2025 15:03:38 -0700 Subject: [PATCH 3/4] set metrics emitter back --- docker-compose.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker-compose.yaml b/docker-compose.yaml index dc3642f..15d3151 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -29,7 +29,7 @@ services: - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} - AWS_SESSION_TOKEN=${AWS_SESSION_TOKEN} - AWS_REGION=${AWS_REGION} - - METRICS_EMITTER=console + - METRICS_EMITTER=prometheus - PROMETHEUS_ADDRESS=:9091 depends_on: postgres: From 1b316d861d1558384376e19dab466d4394ee3866 Mon Sep 17 00:00:00 2001 From: wlggraham Date: Wed, 17 Dec 2025 15:32:37 -0700 Subject: [PATCH 4/4] janitor metrics --- internal/pkg/janitor/job.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/internal/pkg/janitor/job.go b/internal/pkg/janitor/job.go index a8c3763..9c40421 100644 --- a/internal/pkg/janitor/job.go +++ b/internal/pkg/janitor/job.go @@ -9,7 +9,7 @@ import ( ) var ( - janitorMetrics = telemetry.NewMethod("db_connection", "janitor") + cleanUpStaleJobsMethod = telemetry.NewMethod("db_connection", "cleanup_stale_jobs") ) //go:embed queries/stale_jobs_select.sql @@ -23,21 +23,21 @@ var queryStaleJobsDelete string func (j *Janitor) cleanupStaleJobs() error { - // Track DB connection for janitor cleanup operations - defer janitorMetrics.RecordLatency(time.Now(), "operation", "cleanup_stale_jobs") - janitorMetrics.CountRequest("operation", "cleanup_stale_jobs") + // Track DB connection for stale jobs cleanup operations + defer cleanUpStaleJobsMethod.RecordLatency(time.Now()) + cleanUpStaleJobsMethod.CountRequest() // let's find the jobs we'll be cleaning up... sess, err := j.db.NewSession(false) if err != nil { - janitorMetrics.LogAndCountError(err, "operation", "cleanup_session_create") + cleanUpStaleJobsMethod.LogAndCountError(err, "new_session") return err } defer sess.Close() rows, err := sess.Query(queryStaleJobsSelect, j.StaleJob) if err != nil { - janitorMetrics.LogAndCountError(err, "operation", "cleanup_select_stale_jobs") + cleanUpStaleJobsMethod.LogAndCountError(err, "query") return err } defer rows.Close() @@ -49,7 +49,7 @@ func (j *Janitor) cleanupStaleJobs() error { var jobID int if err := rows.Scan(&jobID); err != nil { - janitorMetrics.LogAndCountError(err, "operation", "cleanup_scan_stale_jobs") + cleanUpStaleJobsMethod.LogAndCountError(err, "scan") return err } @@ -67,28 +67,28 @@ func (j *Janitor) cleanupStaleJobs() error { // prepare query to update job statuses updateStaleJobs, jobSystemIDs, err := database.PrepareSliceQuery(queryFailStaleJobs, `$%d`, staleJobIDs) if err != nil { - janitorMetrics.LogAndCountError(err, "operation", "cleanup_prepare_update_query") + cleanUpStaleJobsMethod.LogAndCountError(err, "prepare_slice_query") return err } if _, err := sess.Exec(updateStaleJobs, jobSystemIDs...); err != nil { - janitorMetrics.LogAndCountError(err, "operation", "cleanup_update_stale_jobs") + cleanUpStaleJobsMethod.LogAndCountError(err, "exec") return err } // delete stale jobs from active jobs deleteStaleJobs, jobSystemIDs, err := database.PrepareSliceQuery(queryStaleJobsDelete, `$%d`, staleJobIDs) if err != nil { - janitorMetrics.LogAndCountError(err, "operation", "cleanup_prepare_delete_query") + cleanUpStaleJobsMethod.LogAndCountError(err, "prepare_slice_query") return err } if _, err := sess.Exec(deleteStaleJobs, jobSystemIDs...); err != nil { - janitorMetrics.LogAndCountError(err, "operation", "cleanup_delete_stale_jobs") + cleanUpStaleJobsMethod.LogAndCountError(err, "exec") return err } - janitorMetrics.CountSuccess("operation", "cleanup_stale_jobs") + cleanUpStaleJobsMethod.CountSuccess() return nil }