Skip to content

Commit 502aeed

Browse files
refactor: job dispatcher only fetches single job agent
1 parent d05d8bb commit 502aeed

File tree

5 files changed

+42
-233
lines changed

5 files changed

+42
-233
lines changed

apps/workspace-engine/svc/controllers/jobdispatch/controller.go

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,6 @@ import (
55
"fmt"
66
"time"
77

8-
"github.com/charmbracelet/log"
9-
"github.com/google/uuid"
10-
"github.com/jackc/pgx/v5/pgxpool"
11-
"go.opentelemetry.io/otel"
12-
"go.opentelemetry.io/otel/attribute"
13-
"go.opentelemetry.io/otel/codes"
148
"workspace-engine/pkg/config"
159
"workspace-engine/pkg/jobagents"
1610
"workspace-engine/pkg/jobagents/argo"
@@ -21,6 +15,13 @@ import (
2115
"workspace-engine/pkg/oapi"
2216
"workspace-engine/pkg/reconcile"
2317
"workspace-engine/pkg/reconcile/postgres"
18+
19+
"github.com/charmbracelet/log"
20+
"github.com/google/uuid"
21+
"github.com/jackc/pgx/v5/pgxpool"
22+
"go.opentelemetry.io/otel"
23+
"go.opentelemetry.io/otel/attribute"
24+
"go.opentelemetry.io/otel/codes"
2425
)
2526

2627
var (
@@ -52,11 +53,6 @@ func (c *Controller) Process(ctx context.Context, item reconcile.Item) (reconcil
5253
return reconcile.Result{}, fmt.Errorf("parse job id: %w", err)
5354
}
5455

55-
workspaceID, err := uuid.Parse(item.WorkspaceID)
56-
if err != nil {
57-
return reconcile.Result{}, fmt.Errorf("parse workspace id: %w", err)
58-
}
59-
6056
job, err := c.getter.GetJob(ctx, jobID)
6157
if err != nil {
6258
span.RecordError(err)
@@ -66,7 +62,7 @@ func (c *Controller) Process(ctx context.Context, item reconcile.Item) (reconcil
6662

6763
span.SetAttributes(attribute.String("job", fmt.Sprintf("%+v", job)))
6864

69-
result, err := c.reconcileJob(ctx, workspaceID, jobID, job)
65+
result, err := c.reconcileJob(ctx, jobID, job)
7066
if err != nil {
7167
span.RecordError(err)
7268
span.SetStatus(codes.Error, err.Error())
@@ -95,7 +91,6 @@ func (c *Controller) Process(ctx context.Context, item reconcile.Item) (reconcil
9591

9692
func (c *Controller) reconcileJob(
9793
ctx context.Context,
98-
workspaceID uuid.UUID,
9994
jobID uuid.UUID,
10095
job *oapi.Job,
10196
) (*ReconcileResult, error) {
@@ -106,7 +101,7 @@ func (c *Controller) reconcileJob(
106101
if isWorkflowJob {
107102
return ReconcileWorkflowJob(ctx, c.dispatcher, job)
108103
}
109-
return Reconcile(ctx, c.getter, c.setter, c.verifier, c.dispatcher, workspaceID, job)
104+
return Reconcile(ctx, c.getter, c.setter, c.verifier, c.dispatcher, job)
110105
}
111106

112107
// NewController creates a Controller with the given dependencies.

apps/workspace-engine/svc/controllers/jobdispatch/getters.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,7 @@ import (
1010
type Getter interface {
1111
GetJob(ctx context.Context, jobID uuid.UUID) (*oapi.Job, error)
1212
GetRelease(ctx context.Context, releaseID uuid.UUID) (*oapi.Release, error)
13-
GetDeployment(ctx context.Context, deploymentID uuid.UUID) (*oapi.Deployment, error)
14-
GetResource(ctx context.Context, resourceID uuid.UUID) (*oapi.Resource, error)
1513
GetJobAgent(ctx context.Context, jobAgentID uuid.UUID) (*oapi.JobAgent, error)
16-
ListJobAgentsByWorkspaceID(ctx context.Context, workspaceID uuid.UUID) ([]oapi.JobAgent, error)
1714
GetVerificationPolicies(
1815
ctx context.Context,
1916
rt *ReleaseTarget,

apps/workspace-engine/svc/controllers/jobdispatch/getters_postgres.go

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -48,43 +48,6 @@ func (p *PostgresGetter) GetRelease(
4848
return release, nil
4949
}
5050

51-
func (p *PostgresGetter) GetDeployment(
52-
ctx context.Context,
53-
deploymentID uuid.UUID,
54-
) (*oapi.Deployment, error) {
55-
row, err := db.GetQueries(ctx).GetDeploymentByID(ctx, deploymentID)
56-
if err != nil {
57-
return nil, err
58-
}
59-
return db.ToOapiDeployment(row), nil
60-
}
61-
62-
func (p *PostgresGetter) GetResource(
63-
ctx context.Context,
64-
resourceID uuid.UUID,
65-
) (*oapi.Resource, error) {
66-
row, err := db.GetQueries(ctx).GetResourceByID(ctx, resourceID)
67-
if err != nil {
68-
return nil, err
69-
}
70-
return db.ToOapiResource(row), nil
71-
}
72-
73-
func (p *PostgresGetter) ListJobAgentsByWorkspaceID(
74-
ctx context.Context,
75-
workspaceID uuid.UUID,
76-
) ([]oapi.JobAgent, error) {
77-
rows, err := db.GetQueries(ctx).ListJobAgentsByWorkspaceID(ctx, workspaceID)
78-
if err != nil {
79-
return nil, err
80-
}
81-
agents := make([]oapi.JobAgent, len(rows))
82-
for i, row := range rows {
83-
agents[i] = *db.ToOapiJobAgent(row)
84-
}
85-
return agents, nil
86-
}
87-
8851
func (p *PostgresGetter) GetJobAgent(
8952
ctx context.Context,
9053
jobAgentID uuid.UUID,

apps/workspace-engine/svc/controllers/jobdispatch/reconcile.go

Lines changed: 18 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@ import (
55
"fmt"
66
"time"
77

8+
"workspace-engine/pkg/oapi"
9+
"workspace-engine/svc/controllers/jobdispatch/verification"
10+
811
"github.com/google/uuid"
912
"go.opentelemetry.io/otel/attribute"
1013
"go.opentelemetry.io/otel/codes"
1114
"go.opentelemetry.io/otel/trace"
12-
"workspace-engine/pkg/oapi"
13-
"workspace-engine/pkg/selector"
14-
"workspace-engine/svc/controllers/jobdispatch/verification"
1515
)
1616

1717
type ReconcileResult struct {
@@ -35,110 +35,13 @@ func getRelease(ctx context.Context, getter Getter, job *oapi.Job) (*oapi.Releas
3535
return release, nil
3636
}
3737

38-
func getDeployment(
39-
ctx context.Context,
40-
getter Getter,
41-
release *oapi.Release,
42-
) (*oapi.Deployment, error) {
43-
ctx, span := tracer.Start(ctx, "jobdispatch.getDeployment")
44-
defer span.End()
45-
46-
deploymentID := uuid.MustParse(release.Version.DeploymentId)
47-
deployment, err := getter.GetDeployment(ctx, deploymentID)
48-
if err != nil {
49-
return nil, recordErr(span, "get deployment", err)
50-
}
51-
return deployment, nil
52-
}
53-
54-
func getJobAgents(
55-
ctx context.Context,
56-
getter Getter,
57-
workspaceID uuid.UUID,
58-
release *oapi.Release,
59-
) ([]oapi.JobAgent, error) {
60-
ctx, span := tracer.Start(ctx, "jobdispatch.getJobAgents")
61-
defer span.End()
62-
63-
deployment, err := getDeployment(ctx, getter, release)
64-
if err != nil {
65-
return nil, err
66-
}
67-
68-
if deployment.JobAgentSelector == "" {
69-
return nil, fmt.Errorf("deployment job agent selector is empty")
70-
}
71-
72-
resourceID, err := uuid.Parse(release.ReleaseTarget.ResourceId)
73-
if err != nil {
74-
return nil, fmt.Errorf("parse resource id: %w", err)
75-
}
76-
77-
resource, err := getter.GetResource(ctx, resourceID)
78-
if err != nil {
79-
return nil, fmt.Errorf("get resource: %w", err)
80-
}
81-
82-
allAgents, err := getter.ListJobAgentsByWorkspaceID(ctx, workspaceID)
83-
if err != nil {
84-
return nil, fmt.Errorf("list job agents: %w", err)
85-
}
86-
87-
matched, err := selector.MatchJobAgentsWithResource(
88-
ctx,
89-
deployment.JobAgentSelector,
90-
allAgents,
91-
resource,
92-
)
93-
if err != nil {
94-
return nil, fmt.Errorf("match job agents: %w", err)
95-
}
96-
97-
return matched, nil
98-
}
99-
100-
func getAgentSpecs(
101-
ctx context.Context,
102-
verifier AgentVerifier,
103-
getter Getter,
104-
workspaceID uuid.UUID,
105-
release *oapi.Release,
106-
) ([]oapi.VerificationMetricSpec, error) {
107-
ctx, span := tracer.Start(ctx, "jobdispatch.getAgentSpecs")
108-
defer span.End()
109-
110-
if verifier == nil {
111-
return nil, nil
112-
}
113-
114-
agents, err := getJobAgents(ctx, getter, workspaceID, release)
115-
if err != nil {
116-
return nil, err
117-
}
118-
119-
specs := make([]oapi.VerificationMetricSpec, 0)
120-
for _, agent := range agents {
121-
agentSpecs, err := verifier.AgentVerifications(agent.Type, agent.Config)
122-
if err != nil {
123-
return nil, recordErr(
124-
span,
125-
fmt.Sprintf("get agent verifications for agent %s", agent.Id),
126-
err,
127-
)
128-
}
129-
specs = append(specs, agentSpecs...)
130-
}
131-
return specs, nil
132-
}
133-
13438
// Reconcile dispatches a job and enqueues verifications for the job.
13539
func Reconcile(
13640
ctx context.Context,
13741
getter Getter,
13842
setter Setter,
13943
verifier AgentVerifier,
14044
dispatcher Dispatcher,
141-
workspaceID uuid.UUID,
14245
job *oapi.Job,
14346
) (*ReconcileResult, error) {
14447
ctx, span := tracer.Start(ctx, "jobdispatch.Reconcile")
@@ -149,29 +52,31 @@ func Reconcile(
14952
return nil, err
15053
}
15154

152-
agents, err := getJobAgents(ctx, getter, workspaceID, release)
153-
if err != nil {
154-
span.RecordError(err)
155-
span.SetStatus(codes.Error, err.Error())
156-
return nil, err
157-
}
158-
if len(agents) == 0 {
159-
span.AddEvent("no job agents matched selector for deployment")
160-
return &ReconcileResult{}, nil
161-
}
162-
16355
releaseTarget := &ReleaseTarget{
16456
DeploymentID: uuid.MustParse(release.ReleaseTarget.DeploymentId),
16557
EnvironmentID: uuid.MustParse(release.ReleaseTarget.EnvironmentId),
16658
ResourceID: uuid.MustParse(release.ReleaseTarget.ResourceId),
16759
}
16860

169-
policySpecs, err := getter.GetVerificationPolicies(ctx, releaseTarget)
61+
agentUUID, err := uuid.Parse(job.JobAgentId)
17062
if err != nil {
17163
return nil, err
17264
}
17365

174-
agentSpecs, err := getAgentSpecs(ctx, verifier, getter, workspaceID, release)
66+
agent, err := getter.GetJobAgent(ctx, agentUUID)
67+
if err != nil {
68+
return nil, err
69+
}
70+
71+
var agentSpecs []oapi.VerificationMetricSpec
72+
if verifier != nil {
73+
agentSpecs, err = verifier.AgentVerifications(agent.Type, agent.Config)
74+
if err != nil {
75+
return nil, err
76+
}
77+
}
78+
79+
policySpecs, err := getter.GetVerificationPolicies(ctx, releaseTarget)
17580
if err != nil {
17681
return nil, err
17782
}

0 commit comments

Comments
 (0)