From 44d2b6d4513deb725c07d683b170f33171795fcf Mon Sep 17 00:00:00 2001 From: wlggraham Date: Fri, 19 Dec 2025 17:45:31 -0700 Subject: [PATCH] janitor resource cleanup draft --- assets/databases/heimdall/tables/jobs.sql | 3 +- internal/pkg/heimdall/heimdall.go | 11 +- internal/pkg/heimdall/job.go | 3 +- internal/pkg/heimdall/plugins.go | 14 +- internal/pkg/janitor/janitor.go | 34 +-- internal/pkg/janitor/job.go | 196 +++++++++++++++--- .../queries/cancelling_jobs_select.sql | 19 ++ .../pkg/janitor/queries/job_set_cancelled.sql | 8 + .../pkg/janitor/queries/stale_jobs_select.sql | 21 +- internal/pkg/object/command/ecs/ecs.go | 68 +++++- pkg/object/job/job.go | 25 +-- pkg/plugin/plugin.go | 7 + plugins/ecs/ecs.go | 11 +- 13 files changed, 338 insertions(+), 82 deletions(-) create mode 100644 internal/pkg/janitor/queries/cancelling_jobs_select.sql create mode 100644 internal/pkg/janitor/queries/job_set_cancelled.sql diff --git a/assets/databases/heimdall/tables/jobs.sql b/assets/databases/heimdall/tables/jobs.sql index e40a6fb..75c70b2 100644 --- a/assets/databases/heimdall/tables/jobs.sql +++ b/assets/databases/heimdall/tables/jobs.sql @@ -21,4 +21,5 @@ create table if not exists jobs alter table jobs add column if not exists store_result_sync boolean not null default false; alter table jobs add column if not exists cancelled_by varchar(64) null; -update jobs set cancelled_by = '' where cancelled_by is null; \ No newline at end of file +update jobs set cancelled_by = '' where cancelled_by is null; +alter table jobs add column if not exists cancellation_ctx jsonb null; \ No newline at end of file diff --git a/internal/pkg/heimdall/heimdall.go b/internal/pkg/heimdall/heimdall.go index 871546d..9c2cff9 100644 --- a/internal/pkg/heimdall/heimdall.go +++ b/internal/pkg/heimdall/heimdall.go @@ -55,7 +55,7 @@ type Heimdall struct { Janitor *janitor.Janitor `yaml:"janitor,omitempty" json:"janitor,omitempty"` Version string `yaml:"-" json:"-"` agentName string - commandHandlers map[string]plugin.Handler + commandHandlers map[string]*plugin.Handlers } func (h *Heimdall) Init() error { @@ -96,7 +96,7 @@ func (h *Heimdall) Init() error { rbacsByName[rbacName] = r } - h.commandHandlers = make(map[string]plugin.Handler) + h.commandHandlers = make(map[string]*plugin.Handlers) // process commands / add default values if missing, write commands to db for _, c := range h.Commands { @@ -112,11 +112,12 @@ func (h *Heimdall) Init() error { return fmt.Errorf(formatErrUnknownPlugin, c.Plugin) } - handler, err := pluginNew(c.Context) + handlers, err := pluginNew(c.Context) if err != nil { return err } - h.commandHandlers[c.ID] = handler + + h.commandHandlers[c.ID] = handlers // let's record command in the database if err := h.commandUpsert(c); err != nil { @@ -151,7 +152,7 @@ func (h *Heimdall) Init() error { } // start janitor - if err := h.Janitor.Start(h.Database); err != nil { + if err := h.Janitor.Start(h.Database, h.commandHandlers, h.Clusters); err != nil { return err } diff --git a/internal/pkg/heimdall/job.go b/internal/pkg/heimdall/job.go index 41151d1..3ae8e74 100644 --- a/internal/pkg/heimdall/job.go +++ b/internal/pkg/heimdall/job.go @@ -124,7 +124,8 @@ func (h *Heimdall) runJob(ctx context.Context, j *job.Job, command *command.Comm // Start plugin execution in goroutine go func() { defer close(cancelMonitorDone) // signal monitoring to stop - err := h.commandHandlers[command.ID](pluginCtx, runtime, j, cluster) + handlers := h.commandHandlers[command.ID] + err := handlers.Handler(pluginCtx, runtime, j, cluster) jobDone <- err }() diff --git a/internal/pkg/heimdall/plugins.go b/internal/pkg/heimdall/plugins.go index 89213df..b55a717 100644 --- a/internal/pkg/heimdall/plugins.go +++ b/internal/pkg/heimdall/plugins.go @@ -1,6 +1,7 @@ package heimdall import ( + "fmt" "os" "path" "plugin" @@ -16,9 +17,9 @@ const ( pluginExtensionLength = len(pluginExtension) ) -func (h *Heimdall) loadPlugins() (map[string]func(*context.Context) (hp.Handler, error), error) { +func (h *Heimdall) loadPlugins() (map[string]func(*context.Context) (*hp.Handlers, error), error) { - plugins := make(map[string]func(*context.Context) (hp.Handler, error)) + plugins := make(map[string]func(*context.Context) (*hp.Handlers, error)) files, err := os.ReadDir(h.PluginsDirectory) if err != nil { @@ -35,11 +36,12 @@ func (h *Heimdall) loadPlugins() (map[string]func(*context.Context) (hp.Handler, if err != nil { return nil, err } - // is it our plugin? - newPluginFunc, ok := newFunc.(func(*context.Context) (hp.Handler, error)) - if ok { - plugins[stripExtension(file.Name())] = newPluginFunc + // plugins must return *Handlers + newPluginFunc, ok := newFunc.(func(*context.Context) (*hp.Handlers, error)) + if !ok { + return nil, fmt.Errorf("plugin %s must return *plugin.Handlers", stripExtension(file.Name())) } + plugins[stripExtension(file.Name())] = newPluginFunc } } diff --git a/internal/pkg/janitor/janitor.go b/internal/pkg/janitor/janitor.go index 165d5ef..07f3ec8 100644 --- a/internal/pkg/janitor/janitor.go +++ b/internal/pkg/janitor/janitor.go @@ -5,38 +5,46 @@ import ( "time" "github.com/patterninc/heimdall/internal/pkg/database" + "github.com/patterninc/heimdall/pkg/object/cluster" + "github.com/patterninc/heimdall/pkg/plugin" +) + +const ( + defaultJobLimit = 25 ) type Janitor struct { - Keepalive int `yaml:"keepalive,omitempty" json:"keepalive,omitempty"` - StaleJob int `yaml:"stale_job,omitempty" json:"stale_job,omitempty"` - db *database.Database + Keepalive int `yaml:"keepalive,omitempty" json:"keepalive,omitempty"` + StaleJob int `yaml:"stale_job,omitempty" json:"stale_job,omitempty"` + db *database.Database + commandHandlers map[string]*plugin.Handlers + clusters cluster.Clusters } -func (j *Janitor) Start(d *database.Database) error { +func (j *Janitor) Start(d *database.Database, commandHandlers map[string]*plugin.Handlers, clusters cluster.Clusters) error { // record database context j.db = d + j.commandHandlers = commandHandlers + j.clusters = clusters // let's run jobs cleanup once before we start it as a go routine if err := j.cleanupStaleJobs(); err != nil { return err } - // start cleanup loop - go func() { - + // start cleanup loops + runCleanupLoop := func(cleanupFn func() error) { for { - - if err := j.cleanupStaleJobs(); err != nil { - fmt.Println(`Janitor error:`, err) + if err := cleanupFn(); err != nil { + fmt.Printf("Janitor error: %v\n", err) } - time.Sleep(60 * time.Second) - } + } - }() + go runCleanupLoop(j.cleanupStaleJobs) + go runCleanupLoop(j.cleanupCancellingJobs) return nil diff --git a/internal/pkg/janitor/job.go b/internal/pkg/janitor/job.go index 9c40421..c5e9a3b 100644 --- a/internal/pkg/janitor/job.go +++ b/internal/pkg/janitor/job.go @@ -1,94 +1,226 @@ package janitor import ( + "context" _ "embed" - "time" + "encoding/json" + "fmt" + "sync" "github.com/hladush/go-telemetry/pkg/telemetry" "github.com/patterninc/heimdall/internal/pkg/database" + "github.com/patterninc/heimdall/pkg/object/job" ) var ( - cleanUpStaleJobsMethod = telemetry.NewMethod("db_connection", "cleanup_stale_jobs") + cleanUpStaleJobsMethod = telemetry.NewMethod("db_connection", "cleanup_stale_jobs") + cleanUpCancellingJobsMethod = telemetry.NewMethod("db_connection", "cleanup_cancelling_jobs") + cleanupMethod = telemetry.NewMethod("db_connection", "cleanup_jobs") + ctx = context.Background() ) //go:embed queries/stale_jobs_select.sql var queryStaleJobsSelect string -//go:embed queries/jobs_set_failed.sql -var queryFailStaleJobs string - //go:embed queries/stale_jobs_delete.sql var queryStaleJobsDelete string -func (j *Janitor) cleanupStaleJobs() error { +//go:embed queries/cancelling_jobs_select.sql +var queryCancellingJobsSelect string + +//go:embed queries/job_set_cancelled.sql +var queryJobSetCancelled string + +// clean up jobs in parallel and return their system IDs for updating +func (j *Janitor) cleanupJobs(jobs []*job.Job) []any { + + var wg sync.WaitGroup + jobIDsChan := make(chan int64, len(jobs)) + + for _, jb := range jobs { + wg.Add(1) + go func(job *job.Job) { + defer wg.Done() + + // look up handlers directly by command ID + handlers := j.commandHandlers[job.CommandID] + if handlers != nil && handlers.CleanupHandler != nil { + // look up cluster from map + cl, found := j.clusters[job.ClusterID] + if found { + // attempt to clean up job resources (log errors but continue) + if err := handlers.CleanupHandler(ctx, job, cl); err != nil { + cleanupMethod.LogAndCountError(fmt.Errorf("cleanup failed for job %s: %w", job.ID, err), "cleanup") + } + } else { + cleanupMethod.LogAndCountError(fmt.Errorf("unknown cluster_id: %s for job %s", job.ClusterID, job.ID), "cluster_lookup") + } + } + + // collect job ID (regardless of cleanup success) + jobIDsChan <- job.SystemID + }(jb) + } + + // wait for all cleanup operations to complete + wg.Wait() + close(jobIDsChan) + + // collect all job IDs from channel + jobIDs := make([]any, 0, len(jobs)) + for id := range jobIDsChan { + jobIDs = append(jobIDs, id) + } + + return jobIDs +} - // Track DB connection for stale jobs cleanup operations - defer cleanUpStaleJobsMethod.RecordLatency(time.Now()) - cleanUpStaleJobsMethod.CountRequest() +func (j *Janitor) cleanupStaleJobs() error { - // let's find the jobs we'll be cleaning up... - sess, err := j.db.NewSession(false) + // use transaction for FOR UPDATE SKIP LOCKED row locking + sess, err := j.db.NewSession(true) if err != nil { cleanUpStaleJobsMethod.LogAndCountError(err, "new_session") return err } defer sess.Close() - rows, err := sess.Query(queryStaleJobsSelect, j.StaleJob) + // query stale jobs + rows, err := sess.Query(queryStaleJobsSelect, j.StaleJob, defaultJobLimit) if err != nil { cleanUpStaleJobsMethod.LogAndCountError(err, "query") return err } defer rows.Close() - staleJobIDs := make([]any, 0, 100) - + // collect all jobs + jobs := make([]*job.Job, 0, defaultJobLimit) for rows.Next() { + var cancellationCtxJSON []byte + jb := &job.Job{} - var jobID int - - if err := rows.Scan(&jobID); err != nil { + if err := rows.Scan(&jb.SystemID, &jb.ID, &cancellationCtxJSON, &jb.CommandID, &jb.ClusterID); err != nil { cleanUpStaleJobsMethod.LogAndCountError(err, "scan") - return err + continue } - if jobID != 0 { - staleJobIDs = append(staleJobIDs, jobID) + // parse cancellation context (JSONB from PostgreSQL) + if len(cancellationCtxJSON) > 0 { + if err := json.Unmarshal(cancellationCtxJSON, &jb.CancellationCtx); err != nil { + cleanUpStaleJobsMethod.LogAndCountError(err, "parse_cancellation_ctx") + // continue anyway + } } + jobs = append(jobs, jb) } - // do we have any stale jobs? - if len(staleJobIDs) == 0 { + // no jobs found, commit and return early + if len(jobs) == 0 { + if err := sess.Commit(); err != nil { + cleanUpStaleJobsMethod.LogAndCountError(err, "commit") + return err + } + cleanUpStaleJobsMethod.CountSuccess() return nil } - // prepare query to update job statuses - updateStaleJobs, jobSystemIDs, err := database.PrepareSliceQuery(queryFailStaleJobs, `$%d`, staleJobIDs) + // clean up jobs and get their IDs + jobIDs := j.cleanupJobs(jobs) + + // delete stale jobs from active_jobs + deleteStaleJobs, jobSystemIDs, err := database.PrepareSliceQuery(queryStaleJobsDelete, `$%d`, jobIDs) if err != nil { cleanUpStaleJobsMethod.LogAndCountError(err, "prepare_slice_query") return err } - if _, err := sess.Exec(updateStaleJobs, jobSystemIDs...); err != nil { + if _, err := sess.Exec(deleteStaleJobs, jobSystemIDs...); err != nil { cleanUpStaleJobsMethod.LogAndCountError(err, "exec") return err } - // delete stale jobs from active jobs - deleteStaleJobs, jobSystemIDs, err := database.PrepareSliceQuery(queryStaleJobsDelete, `$%d`, staleJobIDs) + // commit transaction to release locks + if err := sess.Commit(); err != nil { + cleanUpStaleJobsMethod.LogAndCountError(err, "commit") + return err + } + + cleanUpStaleJobsMethod.CountSuccess() + return nil +} + +func (j *Janitor) cleanupCancellingJobs() error { + + // Start a new session + sess, err := j.db.NewSession(true) if err != nil { - cleanUpStaleJobsMethod.LogAndCountError(err, "prepare_slice_query") + cleanUpCancellingJobsMethod.LogAndCountError(err, "new_session") return err } + defer sess.Close() - if _, err := sess.Exec(deleteStaleJobs, jobSystemIDs...); err != nil { - cleanUpStaleJobsMethod.LogAndCountError(err, "exec") + // query cancelling jobs + rows, err := sess.Query(queryCancellingJobsSelect, defaultJobLimit) + if err != nil { + cleanUpCancellingJobsMethod.LogAndCountError(err, "query") return err } + defer rows.Close() - cleanUpStaleJobsMethod.CountSuccess() - return nil + // collect all jobs + jobs := make([]*job.Job, 0, defaultJobLimit) + for rows.Next() { + var cancellationCtxJSON []byte + jb := &job.Job{} + + if err := rows.Scan(&jb.SystemID, &jb.ID, &cancellationCtxJSON, &jb.CommandID, &jb.ClusterID); err != nil { + cleanUpCancellingJobsMethod.LogAndCountError(err, "scan") + continue + } + + // parse cancellation context + if len(cancellationCtxJSON) > 0 { + if err := json.Unmarshal(cancellationCtxJSON, &jb.CancellationCtx); err != nil { + cleanUpCancellingJobsMethod.LogAndCountError(err, "parse_cancellation_ctx") + // continue anyway + } + } + + jobs = append(jobs, jb) + } + + // no jobs found, commit and return early + if len(jobs) == 0 { + if err := sess.Commit(); err != nil { + cleanUpCancellingJobsMethod.LogAndCountError(err, "commit") + return err + } + cleanUpCancellingJobsMethod.CountSuccess() + return nil + } + + // clean up jobs and get their IDs + jobIDs := j.cleanupJobs(jobs) + + // update status to cancelled + updateQuery, jobSystemIDs, err := database.PrepareSliceQuery(queryJobSetCancelled, `$%d`, jobIDs) + if err != nil { + cleanUpCancellingJobsMethod.LogAndCountError(err, "prepare_slice_query") + return err + } + if _, err := sess.Exec(updateQuery, jobSystemIDs...); err != nil { + cleanUpCancellingJobsMethod.LogAndCountError(err, "exec") + return err + } + + // commit transaction to release locks + if err := sess.Commit(); err != nil { + cleanUpCancellingJobsMethod.LogAndCountError(err, "commit") + return err + } + + cleanUpCancellingJobsMethod.CountSuccess() + return nil } diff --git a/internal/pkg/janitor/queries/cancelling_jobs_select.sql b/internal/pkg/janitor/queries/cancelling_jobs_select.sql new file mode 100644 index 0000000..51dbeae --- /dev/null +++ b/internal/pkg/janitor/queries/cancelling_jobs_select.sql @@ -0,0 +1,19 @@ +select + j.system_job_id, + j.job_id, + j.cancellation_ctx, + cm.command_id, + cl.cluster_id +from + jobs j + join commands cm on cm.system_command_id = j.job_command_id + join clusters cl on cl.system_cluster_id = j.job_cluster_id +where + j.job_status_id = 7 -- CANCELLING +order by + j.system_job_id +for update + skip locked +limit $1 +; + diff --git a/internal/pkg/janitor/queries/job_set_cancelled.sql b/internal/pkg/janitor/queries/job_set_cancelled.sql new file mode 100644 index 0000000..58b1501 --- /dev/null +++ b/internal/pkg/janitor/queries/job_set_cancelled.sql @@ -0,0 +1,8 @@ +update jobs +set + job_status_id = 8, -- CANCELLED + updated_at = extract(epoch from now())::int +where + system_job_id in ( {{ .Slice }} ) +; + diff --git a/internal/pkg/janitor/queries/stale_jobs_select.sql b/internal/pkg/janitor/queries/stale_jobs_select.sql index 2600a4e..af146ff 100644 --- a/internal/pkg/janitor/queries/stale_jobs_select.sql +++ b/internal/pkg/janitor/queries/stale_jobs_select.sql @@ -1,10 +1,21 @@ select - system_job_id + j.system_job_id, + j.job_id, + j.cancellation_ctx, + cm.command_id, + cl.cluster_id from - active_jobs + active_jobs aj + join jobs j on j.system_job_id = aj.system_job_id + join commands cm on cm.system_command_id = j.job_command_id + join clusters cl on cl.system_cluster_id = j.job_cluster_id where - last_heartbeat > 0 and - extract(epoch from now())::int - $1 > last_heartbeat + aj.last_heartbeat > 0 and + extract(epoch from now())::int - $1 > aj.last_heartbeat +order by + j.system_job_id +for update + skip locked limit - 25 + $2 ; diff --git a/internal/pkg/object/command/ecs/ecs.go b/internal/pkg/object/command/ecs/ecs.go index a15819f..fba0e69 100644 --- a/internal/pkg/object/command/ecs/ecs.go +++ b/internal/pkg/object/command/ecs/ecs.go @@ -117,10 +117,11 @@ const ( var ( errMissingTemplate = fmt.Errorf("task definition template is required") - methodMetrics = telemetry.NewMethod("ecs", "ecs plugin") + cleanupMethod = telemetry.NewMethod("ecs", "cleanup") + handlerMethod = telemetry.NewMethod("ecs", "handler") ) -func New(commandCtx *heimdallContext.Context) (plugin.Handler, error) { +func New(commandCtx *heimdallContext.Context) (plugin.Handler, plugin.CleanupHandler, error) { e := &commandContext{ PollingInterval: defaultPollingInterval, @@ -131,11 +132,11 @@ func New(commandCtx *heimdallContext.Context) (plugin.Handler, error) { if commandCtx != nil { if err := commandCtx.Unmarshal(e); err != nil { - return nil, err + return nil, nil, err } } - return e.handler, nil + return e.handler, e.cleanup, nil } @@ -170,7 +171,7 @@ func (e *commandContext) handler(ctx context.Context, r *plugin.Runtime, job *jo // Return error based on failure reason if execCtx.failureError != nil { - methodMetrics.LogAndCountError(execCtx.failureError, "ecs task failure") + handlerMethod.LogAndCountError(execCtx.failureError, fmt.Sprintf("ecs task failure: %s", execCtx.failureReason)) return execCtx.failureError } @@ -666,3 +667,60 @@ func (execCtx *executionContext) retrieveLogs(ctx context.Context) error { return nil } + +// cleanup stops all ECS tasks that were started by the given job +func (e *commandContext) cleanup(ctx context.Context, j *job.Job, c *cluster.Cluster) error { + + // Resolve cluster context to get cluster name + clusterContext := &clusterContext{} + if err := c.Context.Unmarshal(clusterContext); err != nil { + return err + } + + // Initialize AWS config and ECS client + cfg, err := config.LoadDefaultConfig(ctx) + if err != nil { + return err + } + ecsClient := ecs.NewFromConfig(cfg) + + // List all tasks started by this job + var taskARNs []string + for taskNum := 0; taskNum < e.TaskCount; taskNum++ { + startedByValue := fmt.Sprintf("%s%s-%d", startedByPrefix, j.ID, taskNum) + + listTasksOutput, err := ecsClient.ListTasks(ctx, &ecs.ListTasksInput{ + Cluster: aws.String(clusterContext.ClusterName), + DesiredStatus: types.DesiredStatusRunning, + StartedBy: aws.String(startedByValue), + }) + if err != nil { + return err + } + + taskARNs = append(taskARNs, listTasksOutput.TaskArns...) + } + + if len(taskARNs) == 0 { + // No tasks found, nothing to clean up + return nil + } + + // Stop all matching tasks + for _, taskARN := range taskARNs { + stopTaskInput := &ecs.StopTaskInput{ + Cluster: aws.String(clusterContext.ClusterName), + Task: aws.String(taskARN), + Reason: aws.String("stale job cleanup"), + } + _, err := ecsClient.StopTask(ctx, stopTaskInput) + if err != nil { + // Log error but continue stopping other tasks + cleanupMethod.LogAndCountError(err, fmt.Sprintf("failed to stop task %s", taskARN)) + continue + } + } + + return nil + +} diff --git a/pkg/object/job/job.go b/pkg/object/job/job.go index 22b9c83..c289508 100644 --- a/pkg/object/job/job.go +++ b/pkg/object/job/job.go @@ -11,18 +11,19 @@ import ( type Job struct { object.Object `yaml:",inline" json:",inline"` - Status status.Status `yaml:"status,omitempty" json:"status,omitempty"` - IsSync bool `yaml:"is_sync,omitempty" json:"is_sync,omitempty"` - StoreResultSync bool `yaml:"store_result_sync,omitempty" json:"store_result_sync,omitempty"` - Error string `yaml:"error,omitempty" json:"error,omitempty"` - CommandCriteria *set.Set[string] `yaml:"command_criteria,omitempty" json:"command_criteria,omitempty"` - ClusterCriteria *set.Set[string] `yaml:"cluster_criteria,omitempty" json:"cluster_criteria,omitempty"` - CommandID string `yaml:"command_id,omitempty" json:"command_id,omitempty"` - CommandName string `yaml:"command_name,omitempty" json:"command_name,omitempty"` - ClusterID string `yaml:"cluster_id,omitempty" json:"cluster_id,omitempty"` - ClusterName string `yaml:"cluster_name,omitempty" json:"cluster_name,omitempty"` - CancelledBy string `yaml:"cancelled_by,omitempty" json:"cancelled_by,omitempty"` - Result *result.Result `yaml:"result,omitempty" json:"result,omitempty"` + Status status.Status `yaml:"status,omitempty" json:"status,omitempty"` + IsSync bool `yaml:"is_sync,omitempty" json:"is_sync,omitempty"` + StoreResultSync bool `yaml:"store_result_sync,omitempty" json:"store_result_sync,omitempty"` + Error string `yaml:"error,omitempty" json:"error,omitempty"` + CommandCriteria *set.Set[string] `yaml:"command_criteria,omitempty" json:"command_criteria,omitempty"` + ClusterCriteria *set.Set[string] `yaml:"cluster_criteria,omitempty" json:"cluster_criteria,omitempty"` + CommandID string `yaml:"command_id,omitempty" json:"command_id,omitempty"` + CommandName string `yaml:"command_name,omitempty" json:"command_name,omitempty"` + ClusterID string `yaml:"cluster_id,omitempty" json:"cluster_id,omitempty"` + ClusterName string `yaml:"cluster_name,omitempty" json:"cluster_name,omitempty"` + CancelledBy string `yaml:"cancelled_by,omitempty" json:"cancelled_by,omitempty"` + CancellationCtx map[string]interface{} `yaml:"cancellation_ctx,omitempty" json:"cancellation_ctx,omitempty"` + Result *result.Result `yaml:"result,omitempty" json:"result,omitempty"` } func (j *Job) Init() error { diff --git a/pkg/plugin/plugin.go b/pkg/plugin/plugin.go index cfb6db6..151d657 100644 --- a/pkg/plugin/plugin.go +++ b/pkg/plugin/plugin.go @@ -7,4 +7,11 @@ import ( "github.com/patterninc/heimdall/pkg/object/job" ) +type Handlers struct { + Handler Handler + CleanupHandler CleanupHandler +} + type Handler func(context.Context, *Runtime, *job.Job, *cluster.Cluster) error + +type CleanupHandler func(ctx context.Context, j *job.Job, c *cluster.Cluster) error diff --git a/plugins/ecs/ecs.go b/plugins/ecs/ecs.go index 10f8f62..68c67ee 100644 --- a/plugins/ecs/ecs.go +++ b/plugins/ecs/ecs.go @@ -7,6 +7,13 @@ import ( ) // New creates a new ECS plugin handler. -func New(commandContext *context.Context) (plugin.Handler, error) { - return ecs.New(commandContext) +func New(commandContext *context.Context) (*plugin.Handlers, error) { + handler, cleanupHandler, err := ecs.New(commandContext) + if err != nil { + return nil, err + } + return &plugin.Handlers{ + Handler: handler, + CleanupHandler: cleanupHandler, + }, nil }