diff --git a/internal/pkg/object/command/postgres/postgres.go b/internal/pkg/object/command/postgres/postgres.go new file mode 100644 index 0000000..baab46a --- /dev/null +++ b/internal/pkg/object/command/postgres/postgres.go @@ -0,0 +1,156 @@ +package postgres + +import ( + "fmt" + "strings" + "sync" + + "github.com/patterninc/heimdall/internal/pkg/database" + pkgcontext "github.com/patterninc/heimdall/pkg/context" + "github.com/patterninc/heimdall/pkg/object/cluster" + "github.com/patterninc/heimdall/pkg/object/job" + "github.com/patterninc/heimdall/pkg/plugin" + "github.com/patterninc/heimdall/pkg/result" + "github.com/patterninc/heimdall/pkg/result/column" +) + +// postgresJobContext represents the context for a PostgreSQL job +type postgresJobContext struct { + Query string `yaml:"query,omitempty" json:"query,omitempty"` + ReturnResult bool `yaml:"return_result,omitempty" json:"return_result,omitempty"` +} + +type postgresClusterContext struct { + ConnectionString string `yaml:"connection_string,omitempty" json:"connection_string,omitempty"` +} + +type postgresCommandContext struct { + mu sync.Mutex +} + +// New creates a new PostgreSQL plugin handler. +func New(_ *pkgcontext.Context) (plugin.Handler, error) { + p := &postgresCommandContext{} + return p.handler, nil +} + +// Handler for the PostgreSQL query execution. +func (p *postgresCommandContext) handler(r *plugin.Runtime, j *job.Job, c *cluster.Cluster) error { + jobContext, err := validateJobContext(j) + if err != nil { + return err + } + + clusterContext, err := validateClusterContext(c) + if err != nil { + return err + } + + db := &database.Database{ConnectionString: clusterContext.ConnectionString} + + if jobContext.ReturnResult { + return executeSyncQuery(db, jobContext.Query, j) + } + return p.executeAsyncQueries(db, jobContext.Query, j) +} + +func validateJobContext(j *job.Job) (*postgresJobContext, error) { + jobContext := &postgresJobContext{} + if j.Context != nil { + if err := j.Context.Unmarshal(jobContext); err != nil { + return nil, fmt.Errorf("failed to unmarshal job context: %w", err) + } + } + if jobContext.Query == "" { + return nil, fmt.Errorf("query is required in job context") + } + return jobContext, nil +} + +func validateClusterContext(c *cluster.Cluster) (*postgresClusterContext, error) { + clusterContext := &postgresClusterContext{} + if c.Context != nil { + if err := c.Context.Unmarshal(clusterContext); err != nil { + return nil, fmt.Errorf("failed to unmarshal cluster context: %w", err) + } + } + if clusterContext.ConnectionString == "" { + return nil, fmt.Errorf("connection_string is required in cluster context") + } + return clusterContext, nil +} + +func executeSyncQuery(db *database.Database, query string, j *job.Job) error { + // Allow a single query, even if it ends with a semicolon + queries := splitAndTrimQueries(query) + if len(queries) != 1 { + return fmt.Errorf("multiple queries are not allowed when return_result is true") + } + + sess, err := db.NewSession(false) + if err != nil { + return fmt.Errorf("failed to open PostgreSQL connection: %w", err) + } + defer sess.Close() + + rows, err := sess.Query(queries[0]) + if err != nil { + return fmt.Errorf("PostgreSQL query execution failed: %w", err) + } + defer rows.Close() + + rowsResult, err := result.FromRows(rows) + if err != nil { + return fmt.Errorf("failed to process PostgreSQL query results: %w", err) + } + + j.Result = rowsResult + return nil +} + +func (p *postgresCommandContext) executeAsyncQueries(db *database.Database, query string, j *job.Job) error { + sess, err := db.NewSession(false) + if err != nil { + j.Result = &result.Result{ + Columns: []*column.Column{{ + Name: "error", + Type: column.Type("string"), + }}, + Data: [][]any{{fmt.Sprintf("Async PostgreSQL connection error: %v", err)}}, + } + return fmt.Errorf("Async PostgreSQL connection error: %v", err) + } + defer sess.Close() + + _, err = sess.Exec(query) + if err != nil { + j.Result = &result.Result{ + Columns: []*column.Column{{ + Name: "error", + Type: column.Type("string"), + }}, + Data: [][]any{{fmt.Sprintf("Async PostgreSQL query execution error: %v", err)}}, + } + return fmt.Errorf("Async PostgreSQL query execution error: %v", err) + } + + j.Result = &result.Result{ + Columns: []*column.Column{{ + Name: "message", + Type: column.Type("string"), + }}, + Data: [][]any{{"All queries executed successfully"}}, + } + return nil +} + +func splitAndTrimQueries(query string) []string { + queries := []string{} + for _, q := range strings.Split(query, ";") { + q = strings.TrimSpace(q) + if q != "" { + queries = append(queries, q) + } + } + return queries +} diff --git a/plugins/postgres/README.md b/plugins/postgres/README.md new file mode 100644 index 0000000..0a435fa --- /dev/null +++ b/plugins/postgres/README.md @@ -0,0 +1,115 @@ +# ⚡ PostgreSQL Plugin + +The **PostgreSQL Plugin** enables Heimdall to run SQL queries on configured PostgreSQL databases. It supports direct SQL, SQL files, batch execution, and both synchronous and asynchronous modes. + +--- + +## 🧩 Plugin Overview + +* **Plugin Name:** `postgres` +* **Execution Modes:** Synchronous (return_result: true) and Asynchronous (return_result: false) +* **Use Case:** Running SQL queries (single or batch) against PostgreSQL databases + +--- + +## ⚙️ Defining a Postgres Command + +A Postgres command can specify execution mode and other preferences. Example: + +```yaml + - name: postgres-0.0.1 + status: active + plugin: postgres + version: 0.0.1 + description: Execute queries against PostgreSQL databases + tags: + - type:postgres + cluster_tags: + - type:postgres + - data:local +``` + +--- + +## 🖥️ Cluster Configuration + +Each Postgres cluster must define a `connection_string`: + +```yaml + - name: postgres + status: active + version: 0.0.1 + description: PostgreSQL Production Database + context: + connection_string: "postgresql://user:password@host:port/database" + tags: + - type:postgres + - data:local +``` + +--- + +## 🚀 Submitting a Postgres Job + +A Postgres job provides the SQL query to be executed, and can specify execution mode: + +```json +{ + "name": "run-pg-query", + "version": "0.0.1", + "command_criteria": [ + "type:postgres" + ], + "cluster_criteria": [ + "data:local" + ], + "context": { + "query": "select * from employees limit 10;", + "return_result": true + } +} +``` + +--- + +## 📦 Job Context & Runtime + +The Postgres plugin handles: + +* Executing single or multiple SQL statements (batch) +* Supporting both direct query strings and SQL file execution +* Synchronous mode (`return_result: true`): returns query results, only one query allowed +* Asynchronous mode (`return_result: false`): executes all queries, returns success or error + +### Job Context Example + +```yaml +query: SELECT * FROM my_table # Required - SQL query to execute or path to .sql file +return_result: true # Optional - Whether to return query results (default: false) +``` + +### Cluster Context Example + +```yaml +connection_string: postgresql://user:password@host:port/database # Required +``` + +--- + +## 📊 Returning Job Results + +If enabled in the environment, Heimdall exposes query results via: + +``` +GET /api/v1/job//result +``` + +--- + +## 🧠 Best Practices + +* Use synchronous mode for SELECT queries where results are needed +* Use asynchronous mode for DDL/DML or batch operations +* Always secure your connection strings and database credentials +* Use `type:postgres` tags to isolate command and cluster matching +* Use SQL files for large or complex batch operations diff --git a/plugins/postgres/postgres.go b/plugins/postgres/postgres.go new file mode 100644 index 0000000..87300e8 --- /dev/null +++ b/plugins/postgres/postgres.go @@ -0,0 +1,12 @@ +package main + +import ( + "github.com/patterninc/heimdall/internal/pkg/object/command/postgres" + "github.com/patterninc/heimdall/pkg/context" + "github.com/patterninc/heimdall/pkg/plugin" +) + +// New creates a new instance of the postgres plugin. +func New(c *context.Context) (plugin.Handler, error) { + return postgres.New(c) +}