diff --git a/apps/workspace-engine/main.go b/apps/workspace-engine/main.go index c6dfd1b7d..b0b32bffb 100644 --- a/apps/workspace-engine/main.go +++ b/apps/workspace-engine/main.go @@ -41,7 +41,7 @@ func main() { allServices := []svc.Service{ pprof.New(pprof.DefaultAddr(config.Global.PprofPort)), - httpsvc.New(config.Global), + httpsvc.New(config.Global, db.GetPool(ctx)), claimcleanup.New(db.GetPool(ctx), 30*time.Second), deploymentplan.New(WorkerID, db.GetPool(ctx)), diff --git a/apps/workspace-engine/oapi/openapi.json b/apps/workspace-engine/oapi/openapi.json index ff08770aa..0d8b2d0d8 100644 --- a/apps/workspace-engine/oapi/openapi.json +++ b/apps/workspace-engine/oapi/openapi.json @@ -2933,9 +2933,6 @@ "description": "Configuration for the job agent", "type": "object" }, - "id": { - "type": "string" - }, "name": { "type": "string" }, @@ -2949,7 +2946,6 @@ } }, "required": [ - "id", "name", "ref", "config", diff --git a/apps/workspace-engine/oapi/spec/schemas/workflows.jsonnet b/apps/workspace-engine/oapi/spec/schemas/workflows.jsonnet index 21448921e..36f3dec9c 100644 --- a/apps/workspace-engine/oapi/spec/schemas/workflows.jsonnet +++ b/apps/workspace-engine/oapi/spec/schemas/workflows.jsonnet @@ -3,10 +3,9 @@ local openapi = import '../lib/openapi.libsonnet'; { WorkflowJobAgent: { type: 'object', - required: ['id', 'name', 'ref', 'config', 'selector'], + required: ['name', 'ref', 'config', 'selector'], properties: { name: { type: 'string' }, - id: { type: 'string' }, ref: { type: 'string', description: 'Reference to the job agent' }, config: { type: 'object', additionalProperties: true, description: 'Configuration for the job agent' }, selector: { type: 'string', description: 'CEL expression to determine if the job agent should dispatch a job' }, diff --git a/apps/workspace-engine/pkg/oapi/oapi.gen.go b/apps/workspace-engine/pkg/oapi/oapi.gen.go index fea504f73..6f9227d5a 100644 --- a/apps/workspace-engine/pkg/oapi/oapi.gen.go +++ b/apps/workspace-engine/pkg/oapi/oapi.gen.go @@ -1361,7 +1361,6 @@ type WorkflowJob struct { type WorkflowJobAgent struct { // Config Configuration for the job agent Config map[string]interface{} `json:"config"` - Id string `json:"id"` Name string `json:"name"` // Ref Reference to the job agent diff --git a/apps/workspace-engine/pkg/oapi/persistence.go b/apps/workspace-engine/pkg/oapi/persistence.go index ebcd5fd82..4d015c99e 100644 --- a/apps/workspace-engine/pkg/oapi/persistence.go +++ b/apps/workspace-engine/pkg/oapi/persistence.go @@ -78,7 +78,7 @@ func (wt *Workflow) CompactionKey() (string, string) { } func (wtt *WorkflowJobAgent) CompactionKey() (string, string) { - return "workflow_job_agent", wtt.Id + return "workflow_job_agent", wtt.Ref } func (w *WorkflowRun) CompactionKey() (string, string) { diff --git a/apps/workspace-engine/svc/controllers/jobdispatch/setters_postgres.go b/apps/workspace-engine/svc/controllers/jobdispatch/setters_postgres.go index 31da10e18..190cd9471 100644 --- a/apps/workspace-engine/svc/controllers/jobdispatch/setters_postgres.go +++ b/apps/workspace-engine/svc/controllers/jobdispatch/setters_postgres.go @@ -99,10 +99,12 @@ func (s *PostgresSetter) UpdateJob( } releaseID := existingJob.ReleaseId - if releaseID != "" { - if err := dispatchProgressionTargets(ctx, s.Queue, jobIDUUID); err != nil { - return fmt.Errorf("dispatch progression targets: %w", err) - } + if releaseID == "" || releaseID == "00000000-0000-0000-0000-000000000000" { + return nil + } + + if err := dispatchProgressionTargets(ctx, s.Queue, jobIDUUID); err != nil { + return fmt.Errorf("dispatch progression targets: %w", err) } return nil diff --git a/apps/workspace-engine/svc/http/http.go b/apps/workspace-engine/svc/http/http.go index a0c41624b..7262c70d9 100644 --- a/apps/workspace-engine/svc/http/http.go +++ b/apps/workspace-engine/svc/http/http.go @@ -6,6 +6,7 @@ import ( "net/http" "github.com/charmbracelet/log" + "github.com/jackc/pgx/v5/pgxpool" "workspace-engine/pkg/config" "workspace-engine/svc" "workspace-engine/svc/http/server" @@ -16,17 +17,18 @@ var _ svc.Service = (*Service)(nil) // Service wraps the workspace-engine HTTP server as a service.Service. type Service struct { cfg config.Config + pool *pgxpool.Pool httpServer *http.Server } -func New(cfg config.Config) *Service { - return &Service{cfg: cfg} +func New(cfg config.Config, pool *pgxpool.Pool) *Service { + return &Service{cfg: cfg, pool: pool} } func (s *Service) Name() string { return "http" } func (s *Service) Start(_ context.Context) error { - srv := server.New() + srv := server.New(s.pool) router := srv.SetupRouter() addr := fmt.Sprintf("%s:%d", s.cfg.Host, s.cfg.Port) diff --git a/apps/workspace-engine/svc/http/server/openapi/server.go b/apps/workspace-engine/svc/http/server/openapi/server.go index c8991d90c..111cf5258 100644 --- a/apps/workspace-engine/svc/http/server/openapi/server.go +++ b/apps/workspace-engine/svc/http/server/openapi/server.go @@ -1,6 +1,7 @@ package openapi import ( + "github.com/jackc/pgx/v5/pgxpool" "workspace-engine/pkg/oapi" release_targets "workspace-engine/svc/http/server/openapi/release_targets" "workspace-engine/svc/http/server/openapi/resources" @@ -9,9 +10,9 @@ import ( "workspace-engine/svc/http/server/openapi/workflows" ) -func New() *Server { +func New(pool *pgxpool.Pool) *Server { return &Server{ - Workflows: workflows.NewWorkflows(), + Workflows: workflows.NewWorkflows(pool), ReleaseTargets: release_targets.New(), Verifications: verifications.New(), } diff --git a/apps/workspace-engine/svc/http/server/openapi/workflows/setters.go b/apps/workspace-engine/svc/http/server/openapi/workflows/setters.go index 5673fc732..522425fb5 100644 --- a/apps/workspace-engine/svc/http/server/openapi/workflows/setters.go +++ b/apps/workspace-engine/svc/http/server/openapi/workflows/setters.go @@ -103,7 +103,7 @@ func (s *PostgresSetter) dispatchJobForAgent( dispatchContext *oapi.DispatchContext, workspaceID string, ) error { - jobAgentIDUUID, err := uuid.Parse(jobAgent.Id) + jobAgentIDUUID, err := uuid.Parse(jobAgent.Ref) if err != nil { return fmt.Errorf("parse job agent id: %w", err) } diff --git a/apps/workspace-engine/svc/http/server/openapi/workflows/workflows.go b/apps/workspace-engine/svc/http/server/openapi/workflows/workflows.go index 086145d1d..f8921ff53 100644 --- a/apps/workspace-engine/svc/http/server/openapi/workflows/workflows.go +++ b/apps/workspace-engine/svc/http/server/openapi/workflows/workflows.go @@ -6,7 +6,9 @@ import ( "net/http" "github.com/gin-gonic/gin" + "github.com/jackc/pgx/v5/pgxpool" "workspace-engine/pkg/oapi" + "workspace-engine/pkg/reconcile/postgres" ) type Workflows struct { @@ -14,8 +16,12 @@ type Workflows struct { setter Setter } -func NewWorkflows() Workflows { - return Workflows{getter: &PostgresGetter{}} +func NewWorkflows(pool *pgxpool.Pool) Workflows { + queue := postgres.New(pool) + return Workflows{ + getter: &PostgresGetter{}, + setter: NewPostgresSetter(queue), + } } func getInputs(c *gin.Context) (map[string]any, error) { diff --git a/apps/workspace-engine/svc/http/server/openapi/workflows/workflows_test.go b/apps/workspace-engine/svc/http/server/openapi/workflows/workflows_test.go new file mode 100644 index 000000000..14c31a86c --- /dev/null +++ b/apps/workspace-engine/svc/http/server/openapi/workflows/workflows_test.go @@ -0,0 +1,176 @@ +package workflows + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "workspace-engine/pkg/oapi" +) + +func stringInput(key string, def *string) oapi.WorkflowInput { + var input oapi.WorkflowInput + _ = input.FromWorkflowStringInput(oapi.WorkflowStringInput{ + Key: key, + Type: "string", + Default: def, + }) + return input +} + +func numberInput(key string, def *float32) oapi.WorkflowInput { + var input oapi.WorkflowInput + _ = input.FromWorkflowNumberInput(oapi.WorkflowNumberInput{ + Key: key, + Type: "number", + Default: def, + }) + return input +} + +func booleanInput(key string, def *bool) oapi.WorkflowInput { + var input oapi.WorkflowInput + _ = input.FromWorkflowBooleanInput(oapi.WorkflowBooleanInput{ + Key: key, + Type: "boolean", + Default: def, + }) + return input +} + +func ptr[T any](v T) *T { return &v } + +func TestResolveInputs_ProvidedInputsPassThrough(t *testing.T) { + workflow := &oapi.Workflow{ + Inputs: []oapi.WorkflowInput{ + stringInput("env", ptr("staging")), + }, + } + provided := map[string]any{"env": "production"} + + resolved, err := resolveInputs(workflow, provided) + + require.NoError(t, err) + assert.Equal(t, "production", resolved["env"]) +} + +func TestResolveInputs_MissingInputsGetDefaults(t *testing.T) { + workflow := &oapi.Workflow{ + Inputs: []oapi.WorkflowInput{ + stringInput("env", ptr("staging")), + numberInput("retries", ptr(float32(3))), + booleanInput("dryRun", ptr(true)), + }, + } + provided := map[string]any{} + + resolved, err := resolveInputs(workflow, provided) + + require.NoError(t, err) + assert.Equal(t, "staging", resolved["env"]) + assert.Equal(t, float32(3), resolved["retries"]) + assert.Equal(t, true, resolved["dryRun"]) +} + +func TestResolveInputs_InputsWithoutDefaultsStayAbsent(t *testing.T) { + workflow := &oapi.Workflow{ + Inputs: []oapi.WorkflowInput{ + stringInput("env", nil), + numberInput("retries", nil), + booleanInput("dryRun", nil), + }, + } + provided := map[string]any{} + + resolved, err := resolveInputs(workflow, provided) + + require.NoError(t, err) + assert.NotContains(t, resolved, "env") + assert.NotContains(t, resolved, "retries") + assert.NotContains(t, resolved, "dryRun") +} + +func TestResolveInputs_ProvidedOverridesDefault(t *testing.T) { + workflow := &oapi.Workflow{ + Inputs: []oapi.WorkflowInput{ + stringInput("env", ptr("staging")), + numberInput("retries", ptr(float32(3))), + booleanInput("dryRun", ptr(true)), + }, + } + provided := map[string]any{ + "env": "production", + "retries": 10, + "dryRun": false, + } + + resolved, err := resolveInputs(workflow, provided) + + require.NoError(t, err) + assert.Equal(t, "production", resolved["env"]) + assert.Equal(t, 10, resolved["retries"]) + assert.Equal(t, false, resolved["dryRun"]) +} + +func TestResolveInputs_MixedProvidedAndDefaults(t *testing.T) { + workflow := &oapi.Workflow{ + Inputs: []oapi.WorkflowInput{ + stringInput("env", ptr("staging")), + numberInput("retries", ptr(float32(3))), + booleanInput("verbose", nil), + }, + } + provided := map[string]any{"env": "production"} + + resolved, err := resolveInputs(workflow, provided) + + require.NoError(t, err) + assert.Equal(t, "production", resolved["env"]) + assert.Equal(t, float32(3), resolved["retries"]) + assert.NotContains(t, resolved, "verbose") +} + +func TestResolveInputs_DoesNotMutateProvidedMap(t *testing.T) { + workflow := &oapi.Workflow{ + Inputs: []oapi.WorkflowInput{ + stringInput("env", ptr("staging")), + }, + } + provided := map[string]any{"existing": "value"} + + _, err := resolveInputs(workflow, provided) + + require.NoError(t, err) + assert.Len(t, provided, 1) + assert.Equal(t, "value", provided["existing"]) +} + +func TestResolveInputs_EmptyWorkflowInputs(t *testing.T) { + workflow := &oapi.Workflow{ + Inputs: []oapi.WorkflowInput{}, + } + provided := map[string]any{"extra": "value"} + + resolved, err := resolveInputs(workflow, provided) + + require.NoError(t, err) + assert.Equal(t, "value", resolved["extra"]) +} + +func TestResolveInputs_ExtraProvidedInputsPassThrough(t *testing.T) { + workflow := &oapi.Workflow{ + Inputs: []oapi.WorkflowInput{ + stringInput("env", ptr("staging")), + }, + } + provided := map[string]any{ + "env": "production", + "extra": "unexpected", + } + + resolved, err := resolveInputs(workflow, provided) + + require.NoError(t, err) + assert.Equal(t, "production", resolved["env"]) + assert.Equal(t, "unexpected", resolved["extra"]) +} diff --git a/apps/workspace-engine/svc/http/server/server.go b/apps/workspace-engine/svc/http/server/server.go index 070e8fcbe..2ed98d2b3 100644 --- a/apps/workspace-engine/svc/http/server/server.go +++ b/apps/workspace-engine/svc/http/server/server.go @@ -7,6 +7,7 @@ import ( "github.com/charmbracelet/log" "github.com/gin-gonic/gin" + "github.com/jackc/pgx/v5/pgxpool" swaggerfiles "github.com/swaggo/files" ginswagger "github.com/swaggo/gin-swagger" "go.opentelemetry.io/otel" @@ -20,11 +21,13 @@ import ( var tracer = otel.Tracer("server") // Server implements the OpenAPI ServerInterface for the workspace engine. -type Server struct{} +type Server struct { + pool *pgxpool.Pool +} // New creates a new Server instance. -func New() *Server { - return &Server{} +func New(pool *pgxpool.Pool) *Server { + return &Server{pool: pool} } // SetupRouter configures and returns a Gin router with all routes and middleware. @@ -52,7 +55,7 @@ func (s *Server) SetupRouter() *gin.Engine { ) // Register OpenAPI handlers - oapi.RegisterHandlers(router, openapi.New()) + oapi.RegisterHandlers(router, openapi.New(s.pool)) return router }