From 7aac8f63f7d1f4dd66f0474d53fe52bad31a6d49 Mon Sep 17 00:00:00 2001 From: Prasad Lohakpure Date: Tue, 23 Sep 2025 02:19:17 +0530 Subject: [PATCH 1/5] DATA-5849: File download support --- internal/pkg/object/command/ecs/ecs.go | 224 +++++++++++++++++- .../command/ecs/startup_script_template.sh | 47 ++++ 2 files changed, 266 insertions(+), 5 deletions(-) create mode 100644 internal/pkg/object/command/ecs/startup_script_template.sh diff --git a/internal/pkg/object/command/ecs/ecs.go b/internal/pkg/object/command/ecs/ecs.go index db25180..2b7c009 100644 --- a/internal/pkg/object/command/ecs/ecs.go +++ b/internal/pkg/object/command/ecs/ecs.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "strings" + "text/template" "time" "github.com/aws/aws-sdk-go-v2/aws" @@ -21,6 +22,116 @@ import ( "github.com/patterninc/heimdall/pkg/result/column" ) +// FileDownload represents a file to be downloaded before container execution +type FileDownload struct { + Source string `yaml:"source,omitempty" json:"source,omitempty"` // S3 URI or HTTP URL + Destination string `yaml:"destination,omitempty" json:"destination,omitempty"` // Local path in container +} + +// StartupScriptConfig represents configuration for the startup script +type StartupScriptConfig struct { + ScriptPath string `yaml:"script_path,omitempty" json:"script_path,omitempty"` // Path to the startup script + DownloadDir string `yaml:"download_dir,omitempty" json:"download_dir,omitempty"` // Directory to download files to + Timeout int `yaml:"timeout,omitempty" json:"timeout,omitempty"` // Timeout in seconds + CreateDirs bool `yaml:"create_dirs,omitempty" json:"create_dirs,omitempty"` // Create destination directories +} + +// ScriptTemplateData represents data for populating the startup script template +type ScriptTemplateData struct { + DownloadDir string + Timeout int + CreateDirs bool + Downloads []ScriptDownload +} + +// ScriptDownload represents a download item for the script template +type ScriptDownload struct { + Source string + Destination string + IsS3 bool +} + +// ContainerModificationOption represents a generic option for modifying container definitions +type ContainerModificationOption func(*types.ContainerDefinition) error + +// ContainerOption represents a generic option for container modifications +type ContainerOption struct { + ModifyContainer ContainerModificationOption + Description string +} + +// WithStartupScriptWrapper creates a container option that injects startup script for file downloads +func (execCtx *executionContext) WithStartupScriptWrapper() ContainerOption { + return ContainerOption{ + ModifyContainer: func(container *types.ContainerDefinition) error { + return execCtx.modifyContainerWithStartupScript(container) + }, + Description: "Inject startup script for file downloads", + } +} + +// modifyContainerWithStartupScript modifies a container definition to include startup script for downloads +func (execCtx *executionContext) modifyContainerWithStartupScript(container *types.ContainerDefinition) error { + if len(execCtx.FileDownloads) == 0 { + return nil // No downloads configured, no modification needed + } + + // Generate startup script + startupScript, err := generateStartupScript(execCtx.FileDownloads, execCtx.StartupScriptConfig) + if err != nil { + return fmt.Errorf("failed to generate startup script: %w", err) + } + + // Store original command + originalCommand := container.Command + if originalCommand == nil { + originalCommand = []string{} + } + + // Create startup script command + scriptCmd := []string{"sh", "-c", startupScript} + container.Command = scriptCmd + + // Add environment variables for the startup script + if container.Environment == nil { + container.Environment = []types.KeyValuePair{} + } + + // Add original command as environment variable for the startup script + originalCmdStr := strings.Join(originalCommand, " ") + container.Environment = append(container.Environment, + types.KeyValuePair{ + Name: aws.String("ORIGINAL_COMMAND"), + Value: aws.String(originalCmdStr), + }) + fmt.Println() + return nil +} + +// getDefaultContainerOptions returns the default container options +func (execCtx *executionContext) getDefaultContainerOptions() []ContainerOption { + options := []ContainerOption{} + + // Add startup script wrapper option if file downloads are configured + if len(execCtx.FileDownloads) > 0 { + options = append(options, execCtx.WithStartupScriptWrapper()) + } + + return options +} + +// applyContainerOptions applies container options to container definitions +func (execCtx *executionContext) applyContainerOptions(containerDefinitions []types.ContainerDefinition, options []ContainerOption) error { + for _, option := range options { + for i := range containerDefinitions { + if err := option.ModifyContainer(&containerDefinitions[i]); err != nil { + return fmt.Errorf("failed to apply container option '%s': %w", option.Description, err) + } + } + } + return nil +} + // ECS command context structure type ecsCommandContext struct { TaskDefinitionTemplate string `yaml:"task_definition_template,omitempty" json:"task_definition_template,omitempty"` @@ -29,6 +140,10 @@ type ecsCommandContext struct { PollingInterval duration.Duration `yaml:"polling_interval,omitempty" json:"polling_interval,omitempty"` Timeout duration.Duration `yaml:"timeout,omitempty" json:"timeout,omitempty"` MaxFailCount int `yaml:"max_fail_count,omitempty" json:"max_fail_count,omitempty"` // max failures before giving up + + // File download configuration + FileDownloads []FileDownload `yaml:"file_downloads,omitempty" json:"file_downloads,omitempty"` + StartupScriptConfig *StartupScriptConfig `yaml:"startup_script_config,omitempty" json:"startup_script_config,omitempty"` } // ECS cluster context structure @@ -77,6 +192,10 @@ type executionContext struct { Timeout duration.Duration `json:"timeout"` MaxFailCount int `json:"max_fail_count"` + // File download configuration + FileDownloads []FileDownload `json:"file_downloads"` + StartupScriptConfig *StartupScriptConfig `json:"startup_script_config"` + ecsClient *ecs.Client taskDefARN *string tasks map[string]*taskTracker @@ -87,6 +206,8 @@ const ( defaultTaskTimeout = duration.Duration(1 * time.Hour) defaultMaxFailCount = 1 defaultTaskCount = 1 + defaultDownloadDir = "/tmp/downloads" + defaultTimeout = 300 startedByPrefix = "heimdall-job-" errMaxFailCount = "task %s failed %d times (max: %d), giving up" errPollingTimeout = "polling timed out for arns %v after %v" @@ -97,6 +218,66 @@ var ( errMissingTemplate = fmt.Errorf("task definition template is required") ) +// generateStartupScript creates a startup script for downloading files using a template +func generateStartupScript(fileDownloads []FileDownload, config *StartupScriptConfig) (string, error) { + if len(fileDownloads) == 0 { + return "#!/bin/bash\necho 'No files to download'\nexec \"$@\"", nil + } + + downloadDir := defaultDownloadDir + timeout := defaultTimeout + createDirs := true + + if config != nil { + if config.DownloadDir != "" { + downloadDir = config.DownloadDir + } + if config.Timeout > 0 { + timeout = config.Timeout + } + createDirs = config.CreateDirs + } + + // Prepare template data + templateData := ScriptTemplateData{ + DownloadDir: downloadDir, + Timeout: timeout, + CreateDirs: createDirs, + Downloads: make([]ScriptDownload, 0, len(fileDownloads)), + } + + // Convert file downloads to script downloads + for _, download := range fileDownloads { + scriptDownload := ScriptDownload{ + Source: download.Source, + Destination: download.Destination, + IsS3: strings.HasPrefix(download.Source, "s3://"), + } + templateData.Downloads = append(templateData.Downloads, scriptDownload) + } + + // Load template + templatePath := "internal/pkg/object/command/ecs/startup_script_template.sh" + templateContent, err := os.ReadFile(templatePath) + if err != nil { + return "", fmt.Errorf("failed to read template file: %w", err) + } + + // Parse template + tmpl, err := template.New("startup_script").Parse(string(templateContent)) + if err != nil { + return "", fmt.Errorf("failed to parse template: %w", err) + } + + // Execute template + var script strings.Builder + if err := tmpl.Execute(&script, templateData); err != nil { + return "", fmt.Errorf("failed to execute template: %w", err) + } + + return script.String(), nil +} + func New(commandContext *context.Context) (plugin.Handler, error) { e := &ecsCommandContext{ @@ -104,6 +285,11 @@ func New(commandContext *context.Context) (plugin.Handler, error) { Timeout: defaultTaskTimeout, MaxFailCount: defaultMaxFailCount, TaskCount: defaultTaskCount, + StartupScriptConfig: &StartupScriptConfig{ + DownloadDir: defaultDownloadDir, + Timeout: defaultTimeout, + CreateDirs: true, + }, } if commandContext != nil { @@ -151,6 +337,15 @@ func (e *ecsCommandContext) handler(r *plugin.Runtime, job *job.Job, cluster *cl // prepare and register task definition with ECS func (execCtx *executionContext) registerTaskDefinition() error { + // Start with the original container definitions + containerDefinitions := execCtx.TaskDefinitionWrapper.TaskDefinition.ContainerDefinitions + + // Apply container options using the options pattern + containerOptions := execCtx.getDefaultContainerOptions() + if err := execCtx.applyContainerOptions(containerDefinitions, containerOptions); err != nil { + return fmt.Errorf("failed to apply container options: %w", err) + } + registerInput := &ecs.RegisterTaskDefinitionInput{ Family: aws.String(aws.ToString(execCtx.TaskDefinitionWrapper.TaskDefinition.Family)), RequiresCompatibilities: []types.Compatibility{types.CompatibilityFargate}, @@ -159,7 +354,7 @@ func (execCtx *executionContext) registerTaskDefinition() error { Memory: aws.String(fmt.Sprintf("%d", execCtx.ClusterConfig.Memory)), ExecutionRoleArn: aws.String(execCtx.ClusterConfig.ExecutionRoleARN), TaskRoleArn: aws.String(execCtx.ClusterConfig.TaskRoleARN), - ContainerDefinitions: execCtx.TaskDefinitionWrapper.TaskDefinition.ContainerDefinitions, + ContainerDefinitions: containerDefinitions, } registerOutput, err := execCtx.ecsClient.RegisterTaskDefinition(ctx, registerInput) @@ -373,6 +568,23 @@ func validateExecutionContext(ctx *executionContext) error { return fmt.Errorf("task count (%d) needs to be greater than 0 and less than cluster max task count (%d)", ctx.TaskCount, ctx.ClusterConfig.MaxTaskCount) } + // Validate file downloads configuration + for i, download := range ctx.FileDownloads { + if download.Source == "" { + return fmt.Errorf("file download %d: source is required", i) + } + if download.Destination == "" { + return fmt.Errorf("file download %d: destination is required", i) + } + } + + // Validate startup script configuration + if ctx.StartupScriptConfig != nil { + if ctx.StartupScriptConfig.Timeout < 0 { + return fmt.Errorf("timeout cannot be negative") + } + } + return nil } @@ -402,13 +614,15 @@ func buildContainerOverrides(execCtx *executionContext) error { containerName := aws.ToString(container.Name) // Use existing override if it exists, otherwise create a blank one - if override, exists := containerOverridesMap[containerName]; exists { - containerOverrides = append(containerOverrides, override) + var override types.ContainerOverride + if existingOverride, exists := containerOverridesMap[containerName]; exists { + override = existingOverride } else { - containerOverrides = append(containerOverrides, types.ContainerOverride{ + override = types.ContainerOverride{ Name: aws.String(containerName), - }) + } } + containerOverrides = append(containerOverrides, override) } execCtx.ContainerOverrides = containerOverrides diff --git a/internal/pkg/object/command/ecs/startup_script_template.sh b/internal/pkg/object/command/ecs/startup_script_template.sh new file mode 100644 index 0000000..d35f042 --- /dev/null +++ b/internal/pkg/object/command/ecs/startup_script_template.sh @@ -0,0 +1,47 @@ +#!/bin/bash +set -e +echo 'Starting file downloads to {{.DownloadDir}}...' +{{if .CreateDirs}}mkdir -p {{.DownloadDir}}{{end}} + +if ! command -v aws &> /dev/null; then + echo 'Installing AWS CLI...' + if apk update && apk add aws-cli; then + echo 'AWS CLI installed successfully' + else + echo 'ERROR: Failed to install AWS CLI' + exit 1 + fi +fi + +{{range .Downloads}} +# Download: {{.Source}} +mkdir -p $(dirname {{.Destination}}) +{{if .IsS3}} +echo "Downloading from S3: {{.Source}}" +if aws s3 cp '{{.Source}}' '{{.Destination}}' --cli-read-timeout {{$.Timeout}} --cli-connect-timeout {{$.Timeout}}; then + echo "Successfully downloaded: {{.Source}}" + + if [ -f '{{.Destination}}' ] && [ -s '{{.Destination}}' ]; then + echo "File verification passed: {{.Destination}}" + file_size=$(stat -c%s '{{.Destination}}' 2>/dev/null || echo "unknown") + echo "File size: $file_size bytes" + else + echo "ERROR: Downloaded file is empty or missing: {{.Destination}}" + exit 1 + fi +else + echo "ERROR: Failed to download from S3: {{.Source}}" + exit 1 +fi +{{end}} +{{end}} +echo 'All files downloaded successfully' +echo 'Starting main application...' +# Execute the original command +if [ -n "$ORIGINAL_COMMAND" ]; then + echo "Executing: $ORIGINAL_COMMAND" + exec $ORIGINAL_COMMAND +else + echo "No original command found, executing: $@" + exec "$@" +fi From 3661e5f84115dc5fa52d1d904c6d1e7e22755b3c Mon Sep 17 00:00:00 2001 From: Prasad Lohakpure Date: Fri, 26 Sep 2025 02:54:38 +0530 Subject: [PATCH 2/5] Minor changes --- internal/pkg/object/command/ecs/ecs.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/internal/pkg/object/command/ecs/ecs.go b/internal/pkg/object/command/ecs/ecs.go index 2b7c009..bf71d80 100644 --- a/internal/pkg/object/command/ecs/ecs.go +++ b/internal/pkg/object/command/ecs/ecs.go @@ -22,6 +22,10 @@ import ( "github.com/patterninc/heimdall/pkg/result/column" ) +var ( + templatePath = "internal/pkg/object/command/ecs/startup_script_template.sh" +) + // FileDownload represents a file to be downloaded before container execution type FileDownload struct { Source string `yaml:"source,omitempty" json:"source,omitempty"` // S3 URI or HTTP URL @@ -257,7 +261,6 @@ func generateStartupScript(fileDownloads []FileDownload, config *StartupScriptCo } // Load template - templatePath := "internal/pkg/object/command/ecs/startup_script_template.sh" templateContent, err := os.ReadFile(templatePath) if err != nil { return "", fmt.Errorf("failed to read template file: %w", err) @@ -614,15 +617,13 @@ func buildContainerOverrides(execCtx *executionContext) error { containerName := aws.ToString(container.Name) // Use existing override if it exists, otherwise create a blank one - var override types.ContainerOverride - if existingOverride, exists := containerOverridesMap[containerName]; exists { - override = existingOverride + if override, exists := containerOverridesMap[containerName]; exists { + containerOverrides = append(containerOverrides, override) } else { - override = types.ContainerOverride{ + containerOverrides = append(containerOverrides, types.ContainerOverride{ Name: aws.String(containerName), - } + }) } - containerOverrides = append(containerOverrides, override) } execCtx.ContainerOverrides = containerOverrides From f811adeac3e362b012a89f8b26a2a1e55ee98bac Mon Sep 17 00:00:00 2001 From: Prasad Lohakpure Date: Fri, 26 Sep 2025 03:01:25 +0530 Subject: [PATCH 3/5] Used embed FS --- internal/pkg/object/command/ecs/ecs.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/internal/pkg/object/command/ecs/ecs.go b/internal/pkg/object/command/ecs/ecs.go index bf71d80..8f2b078 100644 --- a/internal/pkg/object/command/ecs/ecs.go +++ b/internal/pkg/object/command/ecs/ecs.go @@ -2,6 +2,7 @@ package ecs import ( ct "context" + "embed" "encoding/json" "fmt" "os" @@ -22,9 +23,8 @@ import ( "github.com/patterninc/heimdall/pkg/result/column" ) -var ( - templatePath = "internal/pkg/object/command/ecs/startup_script_template.sh" -) +//go:embed startup_script_template.sh +var startupScriptTemplate embed.FS // FileDownload represents a file to be downloaded before container execution type FileDownload struct { @@ -260,10 +260,10 @@ func generateStartupScript(fileDownloads []FileDownload, config *StartupScriptCo templateData.Downloads = append(templateData.Downloads, scriptDownload) } - // Load template - templateContent, err := os.ReadFile(templatePath) + // Load template from embedded filesystem + templateContent, err := startupScriptTemplate.ReadFile("startup_script_template.sh") if err != nil { - return "", fmt.Errorf("failed to read template file: %w", err) + return "", fmt.Errorf("failed to read embedded template file: %w", err) } // Parse template From f6820bd620a7198e416716a8671e6659374a6b15 Mon Sep 17 00:00:00 2001 From: prasadlohakpure Date: Fri, 26 Sep 2025 03:03:03 +0530 Subject: [PATCH 4/5] Apply suggestions from code review Co pilot review suggestions Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- internal/pkg/object/command/ecs/ecs.go | 1 - internal/pkg/object/command/ecs/startup_script_template.sh | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/pkg/object/command/ecs/ecs.go b/internal/pkg/object/command/ecs/ecs.go index 8f2b078..28907ea 100644 --- a/internal/pkg/object/command/ecs/ecs.go +++ b/internal/pkg/object/command/ecs/ecs.go @@ -108,7 +108,6 @@ func (execCtx *executionContext) modifyContainerWithStartupScript(container *typ Name: aws.String("ORIGINAL_COMMAND"), Value: aws.String(originalCmdStr), }) - fmt.Println() return nil } diff --git a/internal/pkg/object/command/ecs/startup_script_template.sh b/internal/pkg/object/command/ecs/startup_script_template.sh index d35f042..32d4557 100644 --- a/internal/pkg/object/command/ecs/startup_script_template.sh +++ b/internal/pkg/object/command/ecs/startup_script_template.sh @@ -40,7 +40,7 @@ echo 'Starting main application...' # Execute the original command if [ -n "$ORIGINAL_COMMAND" ]; then echo "Executing: $ORIGINAL_COMMAND" - exec $ORIGINAL_COMMAND + exec "$ORIGINAL_COMMAND" else echo "No original command found, executing: $@" exec "$@" From 4399b49d88400d43e14cf589c5e628427c4ede01 Mon Sep 17 00:00:00 2001 From: Prasad Lohakpure Date: Mon, 13 Oct 2025 22:29:47 +0530 Subject: [PATCH 5/5] Combined s3 upload in ecs plugin --- .../object/command/ecs/container_options.go | 99 ++++++ internal/pkg/object/command/ecs/ecs.go | 306 ++++++------------ plugins/ecs/README.md | 98 +++++- 3 files changed, 298 insertions(+), 205 deletions(-) create mode 100644 internal/pkg/object/command/ecs/container_options.go diff --git a/internal/pkg/object/command/ecs/container_options.go b/internal/pkg/object/command/ecs/container_options.go new file mode 100644 index 0000000..7b5d99f --- /dev/null +++ b/internal/pkg/object/command/ecs/container_options.go @@ -0,0 +1,99 @@ +package ecs + +import ( + "fmt" + "strings" + + "github.com/aws/aws-sdk-go-v2/aws" +) + +// ContainerOption is a function that modifies container overrides +type ContainerOption func(*executionContext) error + +// ApplyContainerOptions applies a list of container options to container overrides +func ApplyContainerOptions(execCtx *executionContext, options ...ContainerOption) error { + for _, option := range options { + if err := option(execCtx); err != nil { + return err + } + } + return nil +} + +// WithFileUploadScript wraps container commands with a script that downloads files from S3 to the container +func WithFileUploadScript(fileUploads []FileUpload, localDir string) ContainerOption { + return func(execCtx *executionContext) error { + if len(fileUploads) == 0 { + return nil + } + + for i := range execCtx.ContainerOverrides { + override := &execCtx.ContainerOverrides[i] + + // Get the original command from override, or from task definition if not overridden + var originalCommand []string + if len(override.Command) > 0 { + originalCommand = override.Command + } else { + // Get command from task definition + for _, container := range execCtx.TaskDefinitionWrapper.TaskDefinition.ContainerDefinitions { + if aws.ToString(container.Name) == aws.ToString(override.Name) { + originalCommand = container.Command + break + } + } + } + + if len(originalCommand) == 0 { + originalCommand = []string{} + } + + // Generate the download wrapper script + wrapperScript := generateDownloadWrapperScript(fileUploads, localDir, originalCommand) + + // Replace container command with wrapper script + override.Command = []string{"sh", "-c", wrapperScript} + } + + return nil + } +} + +const downloadScriptTemplate = ` +set -e +apk update && apk add aws-cli +mkdir -p {{LOCAL_DIR}} +for s3_path in {{S3_PATHS}};do aws s3 cp "$s3_path" "{{LOCAL_DIR}}/$(basename "$s3_path")" 2>&1;done +exec {{CMD}}` + +// generateDownloadWrapperScript generates a minimal bash script that downloads files from S3 and executes the original command +func generateDownloadWrapperScript(fileUploads []FileUpload, localDir string, originalCommand []string) string { + // Build S3 paths list for the for loop + var s3Paths []string + for _, upload := range fileUploads { + s3Paths = append(s3Paths, fmt.Sprintf(`"%s"`, upload.S3Destination)) + } + s3PathsList := strings.Join(s3Paths, " ") + + // Build command string + cmdStr := "wait" + if len(originalCommand) > 0 { + escapedCmd := make([]string, len(originalCommand)) + for i, arg := range originalCommand { + escapedCmd[i] = fmt.Sprintf("'%s'", strings.ReplaceAll(arg, "'", "'\\''")) + } + cmdStr = strings.Join(escapedCmd, " ") + } + + // Fill template + script := strings.ReplaceAll(downloadScriptTemplate, "{{LOCAL_DIR}}", localDir) + script = strings.ReplaceAll(script, "{{S3_PATHS}}", s3PathsList) + script = strings.ReplaceAll(script, "{{CMD}}", cmdStr) + return script +} + +// Future container options can be added here as needed +// Example: +// func WithEnvironmentVariables(envVars map[string]string) ContainerOption { ... } +// func WithHealthCheck(config HealthCheckConfig) ContainerOption { ... } +// func WithResourceLimits(limits ResourceLimits) ContainerOption { ... } diff --git a/internal/pkg/object/command/ecs/ecs.go b/internal/pkg/object/command/ecs/ecs.go index 8f2b078..992b960 100644 --- a/internal/pkg/object/command/ecs/ecs.go +++ b/internal/pkg/object/command/ecs/ecs.go @@ -2,18 +2,17 @@ package ecs import ( ct "context" - "embed" "encoding/json" "fmt" "os" "strings" - "text/template" "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/ecs" "github.com/aws/aws-sdk-go-v2/service/ecs/types" + "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/patterninc/heimdall/pkg/context" "github.com/patterninc/heimdall/pkg/duration" "github.com/patterninc/heimdall/pkg/object/cluster" @@ -23,117 +22,10 @@ import ( "github.com/patterninc/heimdall/pkg/result/column" ) -//go:embed startup_script_template.sh -var startupScriptTemplate embed.FS - -// FileDownload represents a file to be downloaded before container execution -type FileDownload struct { - Source string `yaml:"source,omitempty" json:"source,omitempty"` // S3 URI or HTTP URL - Destination string `yaml:"destination,omitempty" json:"destination,omitempty"` // Local path in container -} - -// StartupScriptConfig represents configuration for the startup script -type StartupScriptConfig struct { - ScriptPath string `yaml:"script_path,omitempty" json:"script_path,omitempty"` // Path to the startup script - DownloadDir string `yaml:"download_dir,omitempty" json:"download_dir,omitempty"` // Directory to download files to - Timeout int `yaml:"timeout,omitempty" json:"timeout,omitempty"` // Timeout in seconds - CreateDirs bool `yaml:"create_dirs,omitempty" json:"create_dirs,omitempty"` // Create destination directories -} - -// ScriptTemplateData represents data for populating the startup script template -type ScriptTemplateData struct { - DownloadDir string - Timeout int - CreateDirs bool - Downloads []ScriptDownload -} - -// ScriptDownload represents a download item for the script template -type ScriptDownload struct { - Source string - Destination string - IsS3 bool -} - -// ContainerModificationOption represents a generic option for modifying container definitions -type ContainerModificationOption func(*types.ContainerDefinition) error - -// ContainerOption represents a generic option for container modifications -type ContainerOption struct { - ModifyContainer ContainerModificationOption - Description string -} - -// WithStartupScriptWrapper creates a container option that injects startup script for file downloads -func (execCtx *executionContext) WithStartupScriptWrapper() ContainerOption { - return ContainerOption{ - ModifyContainer: func(container *types.ContainerDefinition) error { - return execCtx.modifyContainerWithStartupScript(container) - }, - Description: "Inject startup script for file downloads", - } -} - -// modifyContainerWithStartupScript modifies a container definition to include startup script for downloads -func (execCtx *executionContext) modifyContainerWithStartupScript(container *types.ContainerDefinition) error { - if len(execCtx.FileDownloads) == 0 { - return nil // No downloads configured, no modification needed - } - - // Generate startup script - startupScript, err := generateStartupScript(execCtx.FileDownloads, execCtx.StartupScriptConfig) - if err != nil { - return fmt.Errorf("failed to generate startup script: %w", err) - } - - // Store original command - originalCommand := container.Command - if originalCommand == nil { - originalCommand = []string{} - } - - // Create startup script command - scriptCmd := []string{"sh", "-c", startupScript} - container.Command = scriptCmd - - // Add environment variables for the startup script - if container.Environment == nil { - container.Environment = []types.KeyValuePair{} - } - - // Add original command as environment variable for the startup script - originalCmdStr := strings.Join(originalCommand, " ") - container.Environment = append(container.Environment, - types.KeyValuePair{ - Name: aws.String("ORIGINAL_COMMAND"), - Value: aws.String(originalCmdStr), - }) - fmt.Println() - return nil -} - -// getDefaultContainerOptions returns the default container options -func (execCtx *executionContext) getDefaultContainerOptions() []ContainerOption { - options := []ContainerOption{} - - // Add startup script wrapper option if file downloads are configured - if len(execCtx.FileDownloads) > 0 { - options = append(options, execCtx.WithStartupScriptWrapper()) - } - - return options -} - -// applyContainerOptions applies container options to container definitions -func (execCtx *executionContext) applyContainerOptions(containerDefinitions []types.ContainerDefinition, options []ContainerOption) error { - for _, option := range options { - for i := range containerDefinitions { - if err := option.ModifyContainer(&containerDefinitions[i]); err != nil { - return fmt.Errorf("failed to apply container option '%s': %w", option.Description, err) - } - } - } - return nil +// FileUpload represents configuration for uploading files from container to S3 +type FileUpload struct { + Data string `yaml:"data,omitempty" json:"data,omitempty"` // File content as string + S3Destination string `yaml:"s3_destination,omitempty" json:"s3_destination,omitempty"` // S3 path (e.g., s3://bucket/path/filename) } // ECS command context structure @@ -145,9 +37,8 @@ type ecsCommandContext struct { Timeout duration.Duration `yaml:"timeout,omitempty" json:"timeout,omitempty"` MaxFailCount int `yaml:"max_fail_count,omitempty" json:"max_fail_count,omitempty"` // max failures before giving up - // File download configuration - FileDownloads []FileDownload `yaml:"file_downloads,omitempty" json:"file_downloads,omitempty"` - StartupScriptConfig *StartupScriptConfig `yaml:"startup_script_config,omitempty" json:"startup_script_config,omitempty"` + // File upload configuration + FileUploads []FileUpload `yaml:"file_uploads,omitempty" json:"file_uploads,omitempty"` } // ECS cluster context structure @@ -196,11 +87,11 @@ type executionContext struct { Timeout duration.Duration `json:"timeout"` MaxFailCount int `json:"max_fail_count"` - // File download configuration - FileDownloads []FileDownload `json:"file_downloads"` - StartupScriptConfig *StartupScriptConfig `json:"startup_script_config"` + // File upload configuration + FileUploads []FileUpload `json:"file_uploads"` ecsClient *ecs.Client + s3Client *s3.Client taskDefARN *string tasks map[string]*taskTracker } @@ -210,8 +101,8 @@ const ( defaultTaskTimeout = duration.Duration(1 * time.Hour) defaultMaxFailCount = 1 defaultTaskCount = 1 - defaultDownloadDir = "/tmp/downloads" - defaultTimeout = 300 + defaultUploadTimeout = 30 + defaultLocalDir = "/tmp/downloads" startedByPrefix = "heimdall-job-" errMaxFailCount = "task %s failed %d times (max: %d), giving up" errPollingTimeout = "polling timed out for arns %v after %v" @@ -222,65 +113,6 @@ var ( errMissingTemplate = fmt.Errorf("task definition template is required") ) -// generateStartupScript creates a startup script for downloading files using a template -func generateStartupScript(fileDownloads []FileDownload, config *StartupScriptConfig) (string, error) { - if len(fileDownloads) == 0 { - return "#!/bin/bash\necho 'No files to download'\nexec \"$@\"", nil - } - - downloadDir := defaultDownloadDir - timeout := defaultTimeout - createDirs := true - - if config != nil { - if config.DownloadDir != "" { - downloadDir = config.DownloadDir - } - if config.Timeout > 0 { - timeout = config.Timeout - } - createDirs = config.CreateDirs - } - - // Prepare template data - templateData := ScriptTemplateData{ - DownloadDir: downloadDir, - Timeout: timeout, - CreateDirs: createDirs, - Downloads: make([]ScriptDownload, 0, len(fileDownloads)), - } - - // Convert file downloads to script downloads - for _, download := range fileDownloads { - scriptDownload := ScriptDownload{ - Source: download.Source, - Destination: download.Destination, - IsS3: strings.HasPrefix(download.Source, "s3://"), - } - templateData.Downloads = append(templateData.Downloads, scriptDownload) - } - - // Load template from embedded filesystem - templateContent, err := startupScriptTemplate.ReadFile("startup_script_template.sh") - if err != nil { - return "", fmt.Errorf("failed to read embedded template file: %w", err) - } - - // Parse template - tmpl, err := template.New("startup_script").Parse(string(templateContent)) - if err != nil { - return "", fmt.Errorf("failed to parse template: %w", err) - } - - // Execute template - var script strings.Builder - if err := tmpl.Execute(&script, templateData); err != nil { - return "", fmt.Errorf("failed to execute template: %w", err) - } - - return script.String(), nil -} - func New(commandContext *context.Context) (plugin.Handler, error) { e := &ecsCommandContext{ @@ -288,11 +120,6 @@ func New(commandContext *context.Context) (plugin.Handler, error) { Timeout: defaultTaskTimeout, MaxFailCount: defaultMaxFailCount, TaskCount: defaultTaskCount, - StartupScriptConfig: &StartupScriptConfig{ - DownloadDir: defaultDownloadDir, - Timeout: defaultTimeout, - CreateDirs: true, - }, } if commandContext != nil { @@ -319,6 +146,11 @@ func (e *ecsCommandContext) handler(r *plugin.Runtime, job *job.Job, cluster *cl return err } + // Upload files to S3 if configured + if err := execCtx.uploadFilesToS3(); err != nil { + return fmt.Errorf("failed to upload files to S3: %w", err) + } + // Start tasks if err := execCtx.startTasks(job.ID); err != nil { return err @@ -340,15 +172,9 @@ func (e *ecsCommandContext) handler(r *plugin.Runtime, job *job.Job, cluster *cl // prepare and register task definition with ECS func (execCtx *executionContext) registerTaskDefinition() error { - // Start with the original container definitions + // Use the original container definitions from the template containerDefinitions := execCtx.TaskDefinitionWrapper.TaskDefinition.ContainerDefinitions - // Apply container options using the options pattern - containerOptions := execCtx.getDefaultContainerOptions() - if err := execCtx.applyContainerOptions(containerDefinitions, containerOptions); err != nil { - return fmt.Errorf("failed to apply container options: %w", err) - } - registerInput := &ecs.RegisterTaskDefinitionInput{ Family: aws.String(aws.ToString(execCtx.TaskDefinitionWrapper.TaskDefinition.Family)), RequiresCompatibilities: []types.Compatibility{types.CompatibilityFargate}, @@ -548,6 +374,19 @@ func buildExecutionContext(commandCtx *ecsCommandContext, j *job.Job, c *cluster return nil, err } + // Apply container options to ContainerOverrides + var options []ContainerOption + + // Add file upload script option if configured + if len(execCtx.FileUploads) > 0 { + options = append(options, WithFileUploadScript(execCtx.FileUploads, defaultLocalDir)) + } + + // Apply all options to container overrides + if err := ApplyContainerOptions(execCtx, options...); err != nil { + return nil, fmt.Errorf("failed to apply container options: %w", err) + } + // Validate the resolved configuration if err := validateExecutionContext(execCtx); err != nil { return nil, err @@ -559,6 +398,7 @@ func buildExecutionContext(commandCtx *ecsCommandContext, j *job.Job, c *cluster return nil, err } execCtx.ecsClient = ecs.NewFromConfig(cfg) + execCtx.s3Client = s3.NewFromConfig(cfg) return execCtx, nil @@ -571,20 +411,17 @@ func validateExecutionContext(ctx *executionContext) error { return fmt.Errorf("task count (%d) needs to be greater than 0 and less than cluster max task count (%d)", ctx.TaskCount, ctx.ClusterConfig.MaxTaskCount) } - // Validate file downloads configuration - for i, download := range ctx.FileDownloads { - if download.Source == "" { - return fmt.Errorf("file download %d: source is required", i) + // Validate file uploads configuration + for i, upload := range ctx.FileUploads { + if upload.Data == "" { + return fmt.Errorf("file upload %d: data is required", i) } - if download.Destination == "" { - return fmt.Errorf("file download %d: destination is required", i) + if upload.S3Destination == "" { + return fmt.Errorf("file upload %d: s3_destination is required", i) } - } - - // Validate startup script configuration - if ctx.StartupScriptConfig != nil { - if ctx.StartupScriptConfig.Timeout < 0 { - return fmt.Errorf("timeout cannot be negative") + // Validate that destination is an S3 URI + if !strings.HasPrefix(upload.S3Destination, "s3://") { + return fmt.Errorf("file upload %d: s3_destination must be an S3 URI (s3://bucket/path/filename)", i) } } @@ -752,6 +589,67 @@ func isTaskSuccessful(task types.Task, execCtx *executionContext) bool { } +// uploadFilesToS3 uploads file data to S3 after task completion +func (execCtx *executionContext) uploadFilesToS3() error { + // Skip if no files to upload + if len(execCtx.FileUploads) == 0 { + return nil + } + + for i, upload := range execCtx.FileUploads { + // Parse S3 URI (s3://bucket/key/filename) + bucket, key, err := parseS3URI(upload.S3Destination) + if err != nil { + return fmt.Errorf("failed to parse S3 URI for upload %d: %w", i, err) + } + + // Upload file content to S3 + + input := &s3.PutObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + Body: strings.NewReader(upload.Data), + } + + // Set timeout context with default timeout + uploadCtx, cancel := ct.WithTimeout(ctx, time.Duration(defaultUploadTimeout)*time.Second) + defer cancel() + + _, err = execCtx.s3Client.PutObject(uploadCtx, input) + if err != nil { + return fmt.Errorf("failed to upload file %d to S3 (%s): %w", i, upload.S3Destination, err) + } + + } + + return nil +} + +// parseS3URI parses an S3 URI into bucket and key components +func parseS3URI(s3URI string) (bucket, key string, err error) { + if !strings.HasPrefix(s3URI, "s3://") { + return "", "", fmt.Errorf("invalid S3 URI: must start with s3://") + } + + // Remove s3:// prefix + path := strings.TrimPrefix(s3URI, "s3://") + + // Split into bucket and key + parts := strings.SplitN(path, "/", 2) + if len(parts) < 2 { + return "", "", fmt.Errorf("invalid S3 URI: must include bucket and key (s3://bucket/key)") + } + + bucket = parts[0] + key = parts[1] + + if bucket == "" || key == "" { + return "", "", fmt.Errorf("invalid S3 URI: bucket and key cannot be empty") + } + + return bucket, key, nil +} + // storeResults builds and stores the final result for the job. func storeResults(execCtx *executionContext, j *job.Job) error { diff --git a/plugins/ecs/README.md b/plugins/ecs/README.md index bef4fc1..a98de09 100644 --- a/plugins/ecs/README.md +++ b/plugins/ecs/README.md @@ -53,6 +53,102 @@ An ECS command requires a task definition template and configuration for running --- +## 📤 File Uploads to S3 + +The ECS plugin supports file uploads to containers (via S3): + +### Direct String Upload + +Upload file content passed as strings before tasks execution starts: + +```yaml +file_uploads: + - data: "File content here as a string" + s3_destination: "s3://my-bucket/results/output.txt" + - data: '{"result": "success", "count": 42}' + s3_destination: "s3://my-bucket/results/data.json" +``` + +**Features:** +- File content passed as string in configuration +- Uploaded before ECS tasks starts +- Uses Heimdall's IAM role credentials +- 30-second timeout per upload +- Destination must be full S3 path with filename + +**Use Case:** Small metadata files, status reports, configuration files + + +```yaml +- name: ecs-job-with-string-uploads + status: active + plugin: ecs + version: 0.0.1 + description: Run ECS tasks and upload results to S3 + context: + task_definition_template: task.json + task_count: 2 + + file_uploads: + - data: "Processing completed at 2025-10-13" + s3_destination: "s3://results-bucket/status.txt" + - data: '{"tasks": 2, "status": "completed"}' + s3_destination: "s3://results-bucket/metadata.json" +``` + +### API usage + +```json +{ + "name": "run-batch-with-upload", + "version": "0.0.1", + "command_criteria": ["type:ecs"], + "cluster_criteria": ["type:fargate"], + "context": { + "task_count": 1, + "file_uploads": [ + { + "data": "Task execution results: SUCCESS", + "destination": "s3://my-bucket/jobs/job-123/results.txt" + }, + { + "data": "{\"job_id\": \"123\", \"timestamp\": \"2025-10-13T10:00:00Z\", \"status\": \"completed\"}", + "destination": "s3://my-bucket/jobs/job-123/metadata.json" + } + ] + } +} +``` + +### S3 URI Format + +File destinations must follow the S3 URI format: +- **Format**: `s3://bucket-name/path/to/filename.ext` +- **Example**: `s3://my-results/2025/10/output.json` +- The path must include the complete object key with filename + +### IAM Permissions + +Ensure your ECS task role has S3 upload permissions: + +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "s3:PutObject", + "s3:ListBucket" + ], + "Resource": "arn:aws:s3:::your-bucket/*" + } + ] +} +``` + +--- + ## 🖥️ Cluster Configuration ECS clusters must specify Fargate configuration, IAM roles, and network settings: @@ -379,4 +475,4 @@ This configuration will run 2 tasks, each with the environment variables: - `ENVIRONMENT=production` - `LOG_LEVEL=info` - `DATA_SOURCE=s3://my-bucket/data/` -- `TASK_NAME=heimdall-job-{job-id}-{task-number}` (automatically added) \ No newline at end of file +- `TASK_NAME=heimdall-job-{job-id}-{task-number}` (automatically added)