diff --git a/apps/workspace-engine/oapi/openapi.json b/apps/workspace-engine/oapi/openapi.json index 37a566f95..72155435d 100644 --- a/apps/workspace-engine/oapi/openapi.json +++ b/apps/workspace-engine/oapi/openapi.json @@ -702,21 +702,21 @@ "status": { "$ref": "#/components/schemas/JobStatus" }, - "taskId": { - "type": "string" - }, "traceToken": { "type": "string" }, "updatedAt": { "format": "date-time", "type": "string" + }, + "workflowStepId": { + "type": "string" } }, "required": [ "id", "releaseId", - "taskId", + "workflowStepId", "jobAgentId", "jobAgentConfig", "status", @@ -807,9 +807,9 @@ "releaseId", "startedAt", "status", - "taskId", "traceToken", - "updatedAt" + "updatedAt", + "workflowStepId" ], "type": "string" }, @@ -1835,6 +1835,24 @@ ], "type": "object" }, + "TestRunnerJobAgentConfig": { + "properties": { + "delaySeconds": { + "description": "Delay in seconds before resolving the job.", + "format": "int", + "type": "integer" + }, + "message": { + "description": "Optional message to include in the job output.", + "type": "string" + }, + "status": { + "description": "Final status to set (e.g. \"successful\", \"failure\").", + "type": "string" + } + }, + "type": "object" + }, "UserApprovalRecord": { "properties": { "createdAt": { @@ -2097,6 +2115,22 @@ } ] }, + "WorkflowJobAgentConfig": { + "properties": { + "config": { + "additionalProperties": true, + "type": "object" + }, + "id": { + "type": "string" + } + }, + "required": [ + "id", + "config" + ], + "type": "object" + }, "WorkflowNumberInput": { "properties": { "default": { @@ -2124,17 +2158,20 @@ "id": { "type": "string" }, - "workflowId": { - "type": "string" + "index": { + "type": "integer" }, - "workflowStepTemplateId": { + "jobAgent": { + "$ref": "#/components/schemas/WorkflowJobAgentConfig" + }, + "workflowId": { "type": "string" } }, "required": [ "id", "workflowId", - "workflowStepTemplateId" + "index" ], "type": "object" }, @@ -2144,20 +2181,7 @@ "type": "string" }, "jobAgent": { - "properties": { - "config": { - "additionalProperties": true, - "type": "object" - }, - "id": { - "type": "string" - } - }, - "required": [ - "id", - "config" - ], - "type": "object" + "$ref": "#/components/schemas/WorkflowJobAgentConfig" }, "name": { "type": "string" diff --git a/apps/workspace-engine/oapi/spec/schemas/jobs.jsonnet b/apps/workspace-engine/oapi/spec/schemas/jobs.jsonnet index c1f40c8b0..899d7373b 100644 --- a/apps/workspace-engine/oapi/spec/schemas/jobs.jsonnet +++ b/apps/workspace-engine/oapi/spec/schemas/jobs.jsonnet @@ -5,7 +5,7 @@ local Job = { required: [ 'id', 'releaseId', - 'taskId', + 'workflowStepId', 'jobAgentId', 'jobAgentConfig', 'status', @@ -16,7 +16,7 @@ local Job = { properties: { id: { type: 'string' }, releaseId: { type: 'string' }, - taskId: { type: 'string' }, + workflowStepId: { type: 'string' }, jobAgentId: { type: 'string' }, jobAgentConfig: openapi.schemaRef('JobAgentConfig'), externalId: { type: 'string' }, diff --git a/apps/workspace-engine/oapi/spec/schemas/workflows.jsonnet b/apps/workspace-engine/oapi/spec/schemas/workflows.jsonnet index b64718c07..e49a62257 100644 --- a/apps/workspace-engine/oapi/spec/schemas/workflows.jsonnet +++ b/apps/workspace-engine/oapi/spec/schemas/workflows.jsonnet @@ -1,20 +1,22 @@ local openapi = import '../lib/openapi.libsonnet'; { + WorkflowJobAgentConfig: { + type: 'object', + required: ['id', 'config'], + properties: { + id: { type: 'string' }, + config: { type: 'object', additionalProperties: true }, + }, + }, + WorkflowStepTemplate: { type: 'object', required: ['id', 'name', 'jobAgent'], properties: { name: { type: 'string' }, id: { type: 'string' }, - jobAgent: { - type: 'object', - required: ['id', 'config'], - properties: { - id: { type: 'string' }, - config: { type: 'object', additionalProperties: true }, - }, - }, + jobAgent: openapi.schemaRef('WorkflowJobAgentConfig'), }, }, @@ -88,11 +90,12 @@ local openapi = import '../lib/openapi.libsonnet'; WorkflowStep: { type: 'object', - required: ['id', 'workflowId', 'workflowStepTemplateId'], + required: ['id', 'workflowId', 'index'], properties: { id: { type: 'string' }, + index: { type: 'integer' }, workflowId: { type: 'string' }, - workflowStepTemplateId: { type: 'string' }, + jobAgent: openapi.schemaRef('WorkflowJobAgentConfig'), }, }, } diff --git a/apps/workspace-engine/pkg/oapi/job.go b/apps/workspace-engine/pkg/oapi/job.go index 84ab9c647..4d020e4be 100644 --- a/apps/workspace-engine/pkg/oapi/job.go +++ b/apps/workspace-engine/pkg/oapi/job.go @@ -145,14 +145,6 @@ func (j *Job) GetTerraformCloudJobAgentConfig() (*TerraformCloudJobAgentConfig, return &cfg, nil } -// TestRunnerJobAgentConfig defines config for test-runner job agent. -type TestRunnerJobAgentConfig struct { - Type string `json:"type,omitempty"` - DelaySeconds *int `json:"delaySeconds,omitempty"` - Status *string `json:"status,omitempty"` - Message *string `json:"message,omitempty"` -} - func (j *Job) GetTestRunnerJobAgentConfig() (*TestRunnerJobAgentConfig, error) { cfgJson, err := json.Marshal(j.JobAgentConfig) if err != nil { @@ -162,8 +154,5 @@ func (j *Job) GetTestRunnerJobAgentConfig() (*TestRunnerJobAgentConfig, error) { if err := json.Unmarshal(cfgJson, &cfg); err != nil { return nil, fmt.Errorf("failed to unmarshal job agent config: %w", err) } - if cfg.Type != "test-runner" { - return nil, fmt.Errorf("config type is not test-runner") - } return &cfg, nil } diff --git a/apps/workspace-engine/pkg/oapi/oapi.gen.go b/apps/workspace-engine/pkg/oapi/oapi.gen.go index d037f2983..4702aff43 100644 --- a/apps/workspace-engine/pkg/oapi/oapi.gen.go +++ b/apps/workspace-engine/pkg/oapi/oapi.gen.go @@ -98,9 +98,9 @@ const ( JobUpdateEventFieldsToUpdateReleaseId JobUpdateEventFieldsToUpdate = "releaseId" JobUpdateEventFieldsToUpdateStartedAt JobUpdateEventFieldsToUpdate = "startedAt" JobUpdateEventFieldsToUpdateStatus JobUpdateEventFieldsToUpdate = "status" - JobUpdateEventFieldsToUpdateTaskId JobUpdateEventFieldsToUpdate = "taskId" JobUpdateEventFieldsToUpdateTraceToken JobUpdateEventFieldsToUpdate = "traceToken" JobUpdateEventFieldsToUpdateUpdatedAt JobUpdateEventFieldsToUpdate = "updatedAt" + JobUpdateEventFieldsToUpdateWorkflowStepId JobUpdateEventFieldsToUpdate = "workflowStepId" ) // Defines values for JobVerificationStatus. @@ -478,9 +478,9 @@ type Job struct { ReleaseId string `json:"releaseId"` StartedAt *time.Time `json:"startedAt,omitempty"` Status JobStatus `json:"status"` - TaskId string `json:"taskId"` TraceToken *string `json:"traceToken,omitempty"` UpdatedAt time.Time `json:"updatedAt"` + WorkflowStepId string `json:"workflowStepId"` } // JobAgent defines model for JobAgent. @@ -912,6 +912,18 @@ type TerraformCloudRunMetricProvider struct { // TerraformCloudRunMetricProviderType Provider type type TerraformCloudRunMetricProviderType string +// TestRunnerJobAgentConfig defines model for TestRunnerJobAgentConfig. +type TestRunnerJobAgentConfig struct { + // DelaySeconds Delay in seconds before resolving the job. + DelaySeconds *int `json:"delaySeconds,omitempty"` + + // Message Optional message to include in the job output. + Message *string `json:"message,omitempty"` + + // Status Final status to set (e.g. "successful", "failure"). + Status *string `json:"status,omitempty"` +} + // UserApprovalRecord defines model for UserApprovalRecord. type UserApprovalRecord struct { CreatedAt string `json:"createdAt"` @@ -1045,6 +1057,12 @@ type WorkflowInput struct { union json.RawMessage } +// WorkflowJobAgentConfig defines model for WorkflowJobAgentConfig. +type WorkflowJobAgentConfig struct { + Config map[string]interface{} `json:"config"` + Id string `json:"id"` +} + // WorkflowNumberInput defines model for WorkflowNumberInput. type WorkflowNumberInput struct { Default float32 `json:"default"` @@ -1057,19 +1075,17 @@ type WorkflowNumberInputType string // WorkflowStep defines model for WorkflowStep. type WorkflowStep struct { - Id string `json:"id"` - WorkflowId string `json:"workflowId"` - WorkflowStepTemplateId string `json:"workflowStepTemplateId"` + Id string `json:"id"` + Index int `json:"index"` + JobAgent *WorkflowJobAgentConfig `json:"jobAgent,omitempty"` + WorkflowId string `json:"workflowId"` } // WorkflowStepTemplate defines model for WorkflowStepTemplate. type WorkflowStepTemplate struct { - Id string `json:"id"` - JobAgent struct { - Config map[string]interface{} `json:"config"` - Id string `json:"id"` - } `json:"jobAgent"` - Name string `json:"name"` + Id string `json:"id"` + JobAgent WorkflowJobAgentConfig `json:"jobAgent"` + Name string `json:"name"` } // WorkflowStringInput defines model for WorkflowStringInput. diff --git a/apps/workspace-engine/pkg/oapi/workflow.go b/apps/workspace-engine/pkg/oapi/workflow.go new file mode 100644 index 000000000..be13d3e17 --- /dev/null +++ b/apps/workspace-engine/pkg/oapi/workflow.go @@ -0,0 +1,5 @@ +package oapi + +func (w *Workflow) Map() map[string]interface{} { + return structToMap(w) +} diff --git a/apps/workspace-engine/pkg/workspace/jobagents/argo/argoapp.go b/apps/workspace-engine/pkg/workspace/jobagents/argo/argoapp.go new file mode 100644 index 000000000..d4f9d7ece --- /dev/null +++ b/apps/workspace-engine/pkg/workspace/jobagents/argo/argoapp.go @@ -0,0 +1,32 @@ +package argo + +import ( + "context" + "workspace-engine/pkg/workspace/jobagents/types" + "workspace-engine/pkg/workspace/store" +) + +var _ types.Dispatchable = &ArgoApplication{} + +type ArgoApplication struct { + store *store.Store +} + +func NewArgoApplication(store *store.Store) *ArgoApplication { + return &ArgoApplication{store: store} +} + +func (a *ArgoApplication) Type() string { + return "argo-application" +} + +func (a *ArgoApplication) Supports() types.Capabilities { + return types.Capabilities{ + Workflows: true, + Deployments: false, + } +} + +func (a *ArgoApplication) Dispatch(ctx context.Context, context types.RenderContext) error { + panic("unimplemented") +} diff --git a/apps/workspace-engine/pkg/workspace/jobagents/github/githubaction.go b/apps/workspace-engine/pkg/workspace/jobagents/github/githubaction.go new file mode 100644 index 000000000..e9374b73f --- /dev/null +++ b/apps/workspace-engine/pkg/workspace/jobagents/github/githubaction.go @@ -0,0 +1,33 @@ +package github + +import ( + "context" + "workspace-engine/pkg/workspace/jobagents/types" + "workspace-engine/pkg/workspace/store" +) + +var _ types.Dispatchable = &GithubAction{} + +type GithubAction struct { + store *store.Store +} + +func NewGithubAction(store *store.Store) *GithubAction { + return &GithubAction{store: store} +} + +func (a *GithubAction) Type() string { + return "github-action" +} + +func (a *GithubAction) Supports() types.Capabilities { + return types.Capabilities{ + Workflows: true, + Deployments: false, + } +} + +// Dispatch implements types.Dispatchable. +func (a *GithubAction) Dispatch(ctx context.Context, context types.RenderContext) error { + panic("unimplemented") +} diff --git a/apps/workspace-engine/pkg/workspace/jobagents/registry.go b/apps/workspace-engine/pkg/workspace/jobagents/registry.go new file mode 100644 index 000000000..a3aca0039 --- /dev/null +++ b/apps/workspace-engine/pkg/workspace/jobagents/registry.go @@ -0,0 +1,80 @@ +package jobagents + +import ( + "context" + "fmt" + + "workspace-engine/pkg/oapi" + "workspace-engine/pkg/workspace/jobagents/argo" + "workspace-engine/pkg/workspace/jobagents/github" + "workspace-engine/pkg/workspace/jobagents/terraformcloud" + "workspace-engine/pkg/workspace/jobagents/testrunner" + "workspace-engine/pkg/workspace/jobagents/types" + "workspace-engine/pkg/workspace/store" +) + +type Registry struct { + dispatchers map[string]types.Dispatchable + store *store.Store +} + +func NewRegistry(store *store.Store) *Registry { + r := &Registry{} + r.dispatchers = make(map[string]types.Dispatchable) + r.store = store + + r.Register(testrunner.New(store)) + r.Register(argo.NewArgoApplication(store)) + r.Register(terraformcloud.NewTFE(store)) + r.Register(github.NewGithubAction(store)) + + return r +} + +// Register adds a dispatcher to the registry. +func (r *Registry) Register(dispatcher types.Dispatchable) { + r.dispatchers[dispatcher.Type()] = dispatcher +} + +func (r *Registry) Dispatch(ctx context.Context, job *oapi.Job) error { + jobAgent, ok := r.store.JobAgents.Get(job.JobAgentId) + if !ok { + return fmt.Errorf("job agent %s not found", job.JobAgentId) + } + + dispatcher, ok := r.dispatchers[jobAgent.Type] + if !ok { + return fmt.Errorf("job agent type %s not found", jobAgent.Type) + } + + renderContext := types.RenderContext{} + renderContext.Job = job + renderContext.JobAgent = jobAgent + + isWorkflow := job.WorkflowStepId != "" + caps := dispatcher.Supports() + + if isWorkflow && !caps.Workflows { + return fmt.Errorf("job agent type %s does not support workflows", jobAgent.Type) + } + + if !isWorkflow && !caps.Deployments { + return fmt.Errorf("job agent type %s does not support deployments", jobAgent.Type) + } + + if jobWithRelease, _ := r.store.Jobs.GetWithRelease(job.Id); jobWithRelease != nil { + renderContext.Release = &jobWithRelease.Release + renderContext.Deployment = jobWithRelease.Deployment + renderContext.Environment = jobWithRelease.Environment + renderContext.Resource = jobWithRelease.Resource + } + + if workflowStep, ok := r.store.WorkflowSteps.Get(job.WorkflowStepId); ok { + renderContext.WorkflowStep = workflowStep + if workflow, ok := r.store.Workflows.Get(workflowStep.WorkflowId); ok { + renderContext.Workflow = workflow + } + } + + return dispatcher.Dispatch(ctx, renderContext) +} diff --git a/apps/workspace-engine/pkg/workspace/jobagents/terraformcloud/tfe.go b/apps/workspace-engine/pkg/workspace/jobagents/terraformcloud/tfe.go new file mode 100644 index 000000000..44cb5dc63 --- /dev/null +++ b/apps/workspace-engine/pkg/workspace/jobagents/terraformcloud/tfe.go @@ -0,0 +1,32 @@ +package terraformcloud + +import ( + "context" + "workspace-engine/pkg/workspace/jobagents/types" + "workspace-engine/pkg/workspace/store" +) + +var _ types.Dispatchable = &TFE{} + +type TFE struct { + store *store.Store +} + +func NewTFE(store *store.Store) *TFE { + return &TFE{store: store} +} + +func (t *TFE) Type() string { + return "tfe" +} + +func (t *TFE) Supports() types.Capabilities { + return types.Capabilities{ + Workflows: false, + Deployments: true, + } +} + +func (t *TFE) Dispatch(ctx context.Context, context types.RenderContext) error { + panic("unimplemented") +} diff --git a/apps/workspace-engine/pkg/workspace/jobagents/testrunner/testrunner.go b/apps/workspace-engine/pkg/workspace/jobagents/testrunner/testrunner.go new file mode 100644 index 000000000..930307d64 --- /dev/null +++ b/apps/workspace-engine/pkg/workspace/jobagents/testrunner/testrunner.go @@ -0,0 +1,245 @@ +package testrunner + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "workspace-engine/pkg/config" + "workspace-engine/pkg/messaging" + "workspace-engine/pkg/messaging/confluent" + "workspace-engine/pkg/oapi" + "workspace-engine/pkg/workspace/jobagents/types" + "workspace-engine/pkg/workspace/store" + + confluentkafka "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + oteltrace "go.opentelemetry.io/otel/trace" +) + +var tracer = otel.Tracer("jobagents/testrunner") + +var _ types.Dispatchable = &TestRunner{} + +type TestRunner struct { + store *store.Store + timeFunc func(d time.Duration) <-chan time.Time + producerFactory func() (messaging.Producer, error) +} + +func New(store *store.Store) *TestRunner { + return &TestRunner{ + store: store, + timeFunc: time.After, + } +} + +// Options contains optional configuration for testing. +type Options struct { + TimeFunc func(d time.Duration) <-chan time.Time + ProducerFactory func() (messaging.Producer, error) +} + +// NewWithOptions creates a TestRunner with custom options (useful for testing). +func NewWithOptions(store *store.Store, opts Options) *TestRunner { + t := &TestRunner{ + store: store, + timeFunc: time.After, + producerFactory: opts.ProducerFactory, + } + if opts.TimeFunc != nil { + t.timeFunc = opts.TimeFunc + } + return t +} + +func (t *TestRunner) Type() string { + return "test-runner" +} + +func (t *TestRunner) Supports() types.Capabilities { + return types.Capabilities{ + Workflows: true, + Deployments: true, + } +} + +func (t *TestRunner) Dispatch(ctx context.Context, renderCtx types.RenderContext) error { + ctx, span := tracer.Start(ctx, "TestRunner.Dispatch") + defer span.End() + + job := renderCtx.Job + span.SetAttributes(attribute.String("job.id", job.Id)) + + cfg, err := job.GetTestRunnerJobAgentConfig() + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "failed to parse job config") + return err + } + + delay := t.getDelay(cfg) + finalStatus := t.getFinalStatus(cfg) + message := "" + if cfg.Message != nil { + message = *cfg.Message + } + + span.SetAttributes( + attribute.Int64("delay_seconds", int64(delay.Seconds())), + attribute.String("final_status", string(finalStatus)), + ) + + // Start a goroutine to resolve the job after the delay + go t.resolveJobAfterDelay(context.WithoutCancel(ctx), job.Id, delay, finalStatus, message) + + span.SetStatus(codes.Ok, "test-runner scheduled") + return nil +} + +func (t *TestRunner) getDelay(cfg *oapi.TestRunnerJobAgentConfig) time.Duration { + if cfg.DelaySeconds != nil { + return time.Duration(*cfg.DelaySeconds) * time.Second + } + return 5 * time.Second // default delay +} + +func (t *TestRunner) getFinalStatus(cfg *oapi.TestRunnerJobAgentConfig) oapi.JobStatus { + if cfg.Status != nil && *cfg.Status == string(oapi.JobStatusFailure) { + return oapi.JobStatusFailure + } + return oapi.JobStatusSuccessful +} + +func (t *TestRunner) resolveJobAfterDelay(ctx context.Context, jobID string, delay time.Duration, status oapi.JobStatus, message string) { + _, span := tracer.Start(ctx, "TestRunner.resolveJobAfterDelay") + defer span.End() + + span.SetAttributes( + attribute.String("job.id", jobID), + attribute.Int64("delay_seconds", int64(delay.Seconds())), + ) + + // Wait for the configured delay + <-t.timeFunc(delay) + + // Get the current job state + job, exists := t.store.Jobs.Get(jobID) + if !exists { + span.RecordError(fmt.Errorf("job %s not found", jobID)) + span.SetStatus(codes.Error, "job not found") + return + } + + // Only update if job is still in a pending/running state + // Don't override if it was already completed, failed, or cancelled externally + if job.Status != oapi.JobStatusPending && job.Status != oapi.JobStatusInProgress { + span.AddEvent("Job already in terminal state, skipping resolution", + oteltrace.WithAttributes(attribute.String("current_status", string(job.Status)))) + return + } + + // Build the message + resolveMsg := fmt.Sprintf("Resolved by test-runner after %v", delay) + var finalMessage string + if message != "" { + finalMessage = message + "\n" + resolveMsg + } else { + finalMessage = resolveMsg + } + + // Send job update event to Kafka queue + if err := t.sendJobUpdateEvent(job, status, finalMessage); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "failed to send job update event") + return + } + + span.SetStatus(codes.Ok, "job resolved by test-runner") + span.SetAttributes(attribute.String("final_status", string(status))) +} + +func (t *TestRunner) getKafkaProducer() (messaging.Producer, error) { + if t.producerFactory != nil { + return t.producerFactory() + } + return confluent.NewConfluent(config.Global.KafkaBrokers).CreateProducer(config.Global.KafkaTopic, &confluentkafka.ConfigMap{ + "bootstrap.servers": config.Global.KafkaBrokers, + "enable.idempotence": true, + "compression.type": "snappy", + "message.send.max.retries": 10, + "retry.backoff.ms": 100, + }) +} + +func (t *TestRunner) sendJobUpdateEvent(job *oapi.Job, status oapi.JobStatus, message string) error { + _, span := tracer.Start(context.Background(), "TestRunner.sendJobUpdateEvent") + defer span.End() + + span.SetAttributes( + attribute.String("job.id", job.Id), + attribute.String("status", string(status)), + ) + + workspaceId := t.store.ID() + + // Create a copy of the job for the event - don't modify the original in the store + // The event handler needs to see the previous status to trigger actions + updatedAt := time.Now().UTC() + jobCopy := *job + jobCopy.Status = status + jobCopy.UpdatedAt = updatedAt + jobCopy.Message = &message + if status == oapi.JobStatusSuccessful || status == oapi.JobStatusFailure { + jobCopy.CompletedAt = &updatedAt + } + + fieldsToUpdate := []oapi.JobUpdateEventFieldsToUpdate{ + oapi.JobUpdateEventFieldsToUpdateStatus, + oapi.JobUpdateEventFieldsToUpdateMessage, + oapi.JobUpdateEventFieldsToUpdateUpdatedAt, + } + if jobCopy.CompletedAt != nil { + fieldsToUpdate = append(fieldsToUpdate, oapi.JobUpdateEventFieldsToUpdateCompletedAt) + } + + eventPayload := oapi.JobUpdateEvent{ + Id: &jobCopy.Id, + Job: jobCopy, + FieldsToUpdate: &fieldsToUpdate, + } + + producer, err := t.getKafkaProducer() + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "failed to create Kafka producer") + return fmt.Errorf("failed to create Kafka producer: %w", err) + } + defer producer.Close() + + event := map[string]any{ + "eventType": "job.updated", + "workspaceId": workspaceId, + "data": eventPayload, + "timestamp": time.Now().Unix(), + } + + eventBytes, err := json.Marshal(event) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "failed to marshal event") + return fmt.Errorf("failed to marshal event: %w", err) + } + + if err := producer.Publish([]byte(workspaceId), eventBytes); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "failed to publish event") + return err + } + + span.SetStatus(codes.Ok, "event published") + return nil +} diff --git a/apps/workspace-engine/pkg/workspace/jobagents/types/types.go b/apps/workspace-engine/pkg/workspace/jobagents/types/types.go new file mode 100644 index 000000000..353c7f9f6 --- /dev/null +++ b/apps/workspace-engine/pkg/workspace/jobagents/types/types.go @@ -0,0 +1,31 @@ +package types + +import ( + "context" + "workspace-engine/pkg/oapi" +) + +type Dispatchable interface { + Type() string + Dispatch(ctx context.Context, context RenderContext) error + Supports() Capabilities +} + +type Capabilities struct { + Workflows bool + Deployments bool +} + +type RenderContext struct { + Job *oapi.Job `json:"job"` + JobAgent *oapi.JobAgent `json:"jobAgent"` + JobAgentConfig *oapi.JobAgentConfig `json:"-"` + Release *oapi.Release `json:"release"` + Deployment *oapi.Deployment `json:"deployment"` + Environment *oapi.Environment `json:"environment"` + Resource *oapi.Resource `json:"resource"` + Workflow *oapi.Workflow `json:"workflow"` + WorkflowStep *oapi.WorkflowStep `json:"step"` + Version *oapi.DeploymentVersion `json:"version"` + Inputs map[string]any `json:"inputs"` +} diff --git a/apps/workspace-engine/pkg/workspace/releasemanager/manager.go b/apps/workspace-engine/pkg/workspace/releasemanager/manager.go index 36ff43d25..8696619e1 100644 --- a/apps/workspace-engine/pkg/workspace/releasemanager/manager.go +++ b/apps/workspace-engine/pkg/workspace/releasemanager/manager.go @@ -488,7 +488,7 @@ func (m *Manager) InvalidateReleaseTargetState(releaseTarget *oapi.ReleaseTarget m.cache.Invalidate(releaseTarget) } -// Planner returns the planner instance for backward compatibility. +// Planner returns the planner instance for API func (m *Manager) Planner() *deployment.Planner { return m.deployment.Planner() } diff --git a/apps/workspace-engine/pkg/workspace/store/jobs.go b/apps/workspace-engine/pkg/workspace/store/jobs.go index b3cbc7e36..c7b057fe9 100644 --- a/apps/workspace-engine/pkg/workspace/store/jobs.go +++ b/apps/workspace-engine/pkg/workspace/store/jobs.go @@ -139,3 +139,13 @@ func (j *Jobs) GetWithRelease(id string) (*oapi.JobWithRelease, error) { Resource: resource, }, nil } + +func (j *Jobs) GetByWorkflowStepId(workflowStepId string) []*oapi.Job { + jobs := make([]*oapi.Job, 0) + for _, job := range j.repo.Jobs.Items() { + if job.WorkflowStepId == workflowStepId { + jobs = append(jobs, job) + } + } + return jobs +} diff --git a/apps/workspace-engine/pkg/workspace/store/workflow_tasks.go b/apps/workspace-engine/pkg/workspace/store/workflow_step.go similarity index 78% rename from apps/workspace-engine/pkg/workspace/store/workflow_tasks.go rename to apps/workspace-engine/pkg/workspace/store/workflow_step.go index e662fc37f..00a22e315 100644 --- a/apps/workspace-engine/pkg/workspace/store/workflow_tasks.go +++ b/apps/workspace-engine/pkg/workspace/store/workflow_step.go @@ -39,3 +39,13 @@ func (w *WorkflowSteps) Remove(ctx context.Context, id string) { w.repo.WorkflowSteps.Remove(id) w.store.changeset.RecordDelete(workflowStep) } + +func (w *WorkflowSteps) GetByWorkflowId(workflowId string) []*oapi.WorkflowStep { + steps := make([]*oapi.WorkflowStep, 0) + for _, step := range w.repo.WorkflowSteps.Items() { + if step.WorkflowId == workflowId { + steps = append(steps, step) + } + } + return steps +} diff --git a/apps/workspace-engine/pkg/workspace/store/workflow_task_templates.go b/apps/workspace-engine/pkg/workspace/store/workflow_step_templates.go similarity index 100% rename from apps/workspace-engine/pkg/workspace/store/workflow_task_templates.go rename to apps/workspace-engine/pkg/workspace/store/workflow_step_templates.go diff --git a/apps/workspace-engine/pkg/workspace/workflowmanager/manager.go b/apps/workspace-engine/pkg/workspace/workflowmanager/manager.go new file mode 100644 index 000000000..03b11cbb1 --- /dev/null +++ b/apps/workspace-engine/pkg/workspace/workflowmanager/manager.go @@ -0,0 +1,84 @@ +package workflowmanager + +import ( + "context" + "errors" + "fmt" + "maps" + "workspace-engine/pkg/oapi" + "workspace-engine/pkg/workspace/store" + + "github.com/google/uuid" +) + +type Manager struct { + store *store.Store +} + +func NewWorkflowManager(store *store.Store) *Manager { + return &Manager{ + store: store, + } +} + +func (w *Manager) CreateWorkflow(ctx context.Context, workflowTemplateId string, inputs map[string]any) (*oapi.Workflow, error) { + workflowTemplate, ok := w.store.WorkflowTemplates.Get(workflowTemplateId) + if !ok { + return nil, fmt.Errorf("workflow template %s not found", workflowTemplateId) + } + + workflow := &oapi.Workflow{ + Id: uuid.New().String(), + WorkflowTemplateId: workflowTemplateId, + Inputs: maps.Clone(inputs), + } + + for idx, stepTemplate := range workflowTemplate.Steps { + step := &oapi.WorkflowStep{ + Id: uuid.New().String(), + WorkflowId: workflow.Id, + Index: idx, + JobAgent: &oapi.WorkflowJobAgentConfig{ + Id: stepTemplate.JobAgent.Id, + Config: maps.Clone(stepTemplate.JobAgent.Config), + }, + } + w.store.WorkflowSteps.Upsert(ctx, step) + } + + w.store.Workflows.Upsert(ctx, workflow) + return workflow, nil +} + +// dispatchJobForStep dispatches a job for the given step +func (m *Manager) dispatchStep(ctx context.Context, workflow *oapi.Workflow, step *oapi.WorkflowStep) error { + // job := &oapi.Job{ + // Id: uuid.New().String(), + // WorkflowStepId: step.Id, + // JobAgentId: step.JobAgent.Id, + // JobAgentConfig: step.JobAgent.Config, + // } + return errors.New("not implemented") +} + +// ReconcileWorkflow reconciles a workflow, advancing to the next step if ready. +func (m *Manager) ReconcileWorkflow(ctx context.Context, workflow *oapi.Workflow) error { + wfv, err := NewWorkflowView(m.store, workflow.Id) + if err != nil { + return fmt.Errorf("failed to create workflow view: %w", err) + } + + if wfv.IsComplete() { + return nil + } + if wfv.HasPendingJobs() { + return nil + } + + nextStep := wfv.GetNextStep() + if nextStep == nil { + return nil + } + + return m.dispatchStep(ctx, workflow, nextStep) +} diff --git a/apps/workspace-engine/pkg/workspace/workflowmanager/workflow_view.go b/apps/workspace-engine/pkg/workspace/workflowmanager/workflow_view.go new file mode 100644 index 000000000..1e7e24460 --- /dev/null +++ b/apps/workspace-engine/pkg/workspace/workflowmanager/workflow_view.go @@ -0,0 +1,92 @@ +package workflowmanager + +import ( + "fmt" + "sort" + "workspace-engine/pkg/oapi" + "workspace-engine/pkg/workspace/store" +) + +type WorkflowView struct { + store *store.Store + workflow *oapi.Workflow + steps []*oapi.WorkflowStep + + stepJobs map[string][]*oapi.Job +} + +func NewWorkflowView(store *store.Store, workflowId string) (*WorkflowView, error) { + workflow, ok := store.Workflows.Get(workflowId) + if !ok { + return nil, fmt.Errorf("workflow %s not found", workflowId) + } + steps := store.WorkflowSteps.GetByWorkflowId(workflowId) + sort.Slice(steps, func(i, j int) bool { + return steps[i].Index < steps[j].Index + }) + + stepJobs := make(map[string][]*oapi.Job) + for _, step := range steps { + stepJobs[step.Id] = store.Jobs.GetByWorkflowStepId(step.Id) + } + + return &WorkflowView{ + store: store, + workflow: workflow, + steps: steps, + }, nil +} + +func (w *WorkflowView) IsComplete() bool { + for _, step := range w.steps { + if !w.isStepComplete(step.Id) { + return false + } + } + return true +} + +func (w *WorkflowView) isStepComplete(stepId string) bool { + for _, job := range w.stepJobs[stepId] { + if len(w.stepJobs[stepId]) == 0 { + return false + } + if job.Status == oapi.JobStatusPending || job.Status == oapi.JobStatusInProgress { + return false + } + } + return true +} + +func (w *WorkflowView) HasPendingJobs() bool { + for _, stepJobs := range w.stepJobs { + for _, job := range stepJobs { + if job.Status == oapi.JobStatusPending || job.Status == oapi.JobStatusInProgress || job.Status == oapi.JobStatusActionRequired { + return true + } + } + } + return false +} + +func (w *WorkflowView) GetNextStep() *oapi.WorkflowStep { + for _, step := range w.steps { + if !w.isStepComplete(step.Id) { + return step + } + } + return nil +} + +func (w *WorkflowView) GetWorkflow() *oapi.Workflow { + + return w.workflow +} + +func (w *WorkflowView) GetSteps() []*oapi.WorkflowStep { + return w.steps +} + +func (w *WorkflowView) GetStep(index int) *oapi.WorkflowStep { + return w.steps[index] +}