chore: workspace load in go engine#673
Conversation
WalkthroughAdds a pgx-based singleton DB pool, JSON→protobuf helpers, many DB accessors, concurrent workspace loading keyed to Kafka partition assignment, DB initialization at startup, workspace cache population on partition assignment and on-demand workspace loading in event handling, and changes to release/version policy evaluation and job dispatch signatures. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Main as "main"
participant DB as "db.GetPool / pgxpool"
participant Kafka as "Kafka Consumer"
participant Init as "initWorkspaces"
participant Meta as "db.GetWorkspaceIDs"
participant Loader as "db.LoadWorkspace"
participant Reg as "workspace registry"
Main->>DB: GetPool(ctx)
DB-->>Main: pool
Main->>Kafka: RunConsumer(ctx)
Kafka->>Kafka: on AssignedPartitions
Kafka->>Init: populateWorkspaceCache(ctx)
Init->>Meta: GetWorkspaceIDs(ctx)
Meta-->>Init: []ids
Init->>Init: filter ids by partitionOfKey (murmur2)
par parallel loads
Init->>Loader: LoadWorkspace(ctx, id)
Loader-->>Init: workspace / error
Init->>Reg: Set(id, workspace)
end
Init-->>Kafka: done / aggregated error
sequenceDiagram
autonumber
participant EH as "EventHandler"
participant Reg as "workspace registry"
participant Loader as "db.LoadWorkspace"
participant Span as "Tracing"
EH->>Reg: Exists(wsID)?
alt exists
Reg-->>EH: workspace
else missing
EH->>Loader: LoadWorkspace(ctx, wsID)
alt ok
Loader-->>EH: workspace
EH->>Reg: Set(wsID, workspace)
else error
EH->>Span: record error, set status(Error)
EH-->>EH: return error
end
end
EH->>EH: handle event with workspace
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 9
🧹 Nitpick comments (1)
apps/workspace-engine/pkg/workspace/store/store.go (1)
31-36: Avoid double sub-store initialization inNewWithRepository.
NewWithRepositoryfirst callsNew(), which allocates a fresh repository and runs everyNew*constructor. It then overwritesstore.repowith the caller-provided repository and invokesinitSubStoresa second time. Besides the wasted repository allocation, any side effects inside the sub-store constructors (event registrations, goroutines, metrics, etc.) now run twice, which can break invariants. Construct the store around the supplied repository directly so each sub-store is initialized exactly once.Apply this diff to eliminate the redundant initialization:
-func NewWithRepository(repo *repository.Repository) *Store { - store := New() - store.repo = repo - - initSubStores(store) - return store -} +func NewWithRepository(repo *repository.Repository) *Store { + store := &Store{ + repo: repo, + } + + initSubStores(store) + return store +}
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (1)
apps/workspace-engine/go.sumis excluded by!**/*.sum
📒 Files selected for processing (17)
apps/workspace-engine/go.mod(1 hunks)apps/workspace-engine/main.go(2 hunks)apps/workspace-engine/pkg/cmap/concurrent_map.go(1 hunks)apps/workspace-engine/pkg/db/client.go(1 hunks)apps/workspace-engine/pkg/db/common.go(1 hunks)apps/workspace-engine/pkg/db/deployment_variables.go(1 hunks)apps/workspace-engine/pkg/db/deployment_versions.go(1 hunks)apps/workspace-engine/pkg/db/deployments.go(1 hunks)apps/workspace-engine/pkg/db/environments.go(1 hunks)apps/workspace-engine/pkg/db/resources.go(1 hunks)apps/workspace-engine/pkg/events/events.go(2 hunks)apps/workspace-engine/pkg/events/handler/handler.go(2 hunks)apps/workspace-engine/pkg/kafka/kafka.go(2 hunks)apps/workspace-engine/pkg/workspace/db_workspace_store.go(1 hunks)apps/workspace-engine/pkg/workspace/store/repository/repo.go(1 hunks)apps/workspace-engine/pkg/workspace/store/store.go(2 hunks)apps/workspace-engine/pkg/workspace/workspace.go(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
apps/workspace-engine/**/*.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
apps/workspace-engine/**/*.go: Do not add extraneous inline comments that state the obvious
Do not add comments that simply restate what the code does
Do not add comments for standard Go patterns (e.g., noting WaitGroup or semaphore usage)
Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods
Files:
apps/workspace-engine/pkg/events/events.goapps/workspace-engine/pkg/workspace/workspace.goapps/workspace-engine/main.goapps/workspace-engine/pkg/kafka/kafka.goapps/workspace-engine/pkg/events/handler/handler.goapps/workspace-engine/pkg/db/deployments.goapps/workspace-engine/pkg/cmap/concurrent_map.goapps/workspace-engine/pkg/db/client.goapps/workspace-engine/pkg/workspace/db_workspace_store.goapps/workspace-engine/pkg/db/deployment_versions.goapps/workspace-engine/pkg/db/deployment_variables.goapps/workspace-engine/pkg/db/common.goapps/workspace-engine/pkg/db/resources.goapps/workspace-engine/pkg/workspace/store/store.goapps/workspace-engine/pkg/workspace/store/repository/repo.goapps/workspace-engine/pkg/db/environments.go
🧠 Learnings (1)
📚 Learning: 2025-08-12T20:49:05.086Z
Learnt from: adityachoudhari26
PR: ctrlplanedev/ctrlplane#655
File: apps/workspace-engine/pkg/engine/workspace/fluent.go:166-171
Timestamp: 2025-08-12T20:49:05.086Z
Learning: The UpdateDeploymentVersions() method with OperationCreate case in apps/workspace-engine/pkg/engine/workspace/fluent.go is specifically designed only for creating new deployment versions, not for handling potential duplicates or existing versions.
Applied to files:
apps/workspace-engine/pkg/db/deployment_versions.go
🧬 Code graph analysis (13)
apps/workspace-engine/pkg/events/events.go (2)
apps/workspace-engine/pkg/workspace/workspace.go (1)
WorkspaceStore(55-58)apps/workspace-engine/pkg/events/handler/handler.go (2)
EventListener(74-77)NewEventListener(80-82)
apps/workspace-engine/main.go (1)
apps/workspace-engine/pkg/db/client.go (2)
GetPool(21-49)Close(65-69)
apps/workspace-engine/pkg/kafka/kafka.go (2)
apps/workspace-engine/pkg/workspace/db_workspace_store.go (1)
NewDBWorkspaceStore(16-18)apps/workspace-engine/pkg/events/events.go (1)
NewEventHandler(48-50)
apps/workspace-engine/pkg/events/handler/handler.go (1)
apps/workspace-engine/pkg/workspace/workspace.go (1)
WorkspaceStore(55-58)
apps/workspace-engine/pkg/db/deployments.go (1)
apps/workspace-engine/pkg/db/client.go (1)
GetDB(52-62)
apps/workspace-engine/pkg/cmap/concurrent_map.go (1)
apps/workspace-engine/pkg/workspace/store/repository/repo.go (1)
Load(21-34)
apps/workspace-engine/pkg/workspace/db_workspace_store.go (9)
apps/workspace-engine/pkg/workspace/workspace.go (2)
WorkspaceStore(55-58)Workspace(8-13)apps/workspace-engine/pkg/cmap/concurrent_map.go (3)
ConcurrentMap(18-21)Load(79-91)New(41-43)apps/workspace-engine/pkg/workspace/store/repository/repo.go (3)
InitialEntities(8-19)Load(21-34)New(36-49)apps/workspace-engine/pkg/db/resources.go (1)
GetResources(40-112)apps/workspace-engine/pkg/db/environments.go (1)
GetEnvironments(22-42)apps/workspace-engine/pkg/db/deployments.go (1)
GetDeployments(24-53)apps/workspace-engine/pkg/db/deployment_versions.go (1)
GetDeploymentVersions(28-47)apps/workspace-engine/pkg/db/deployment_variables.go (1)
GetDeploymentVariables(24-43)apps/workspace-engine/pkg/workspace/store/store.go (2)
NewWithRepository(31-37)New(21-29)
apps/workspace-engine/pkg/db/deployment_versions.go (2)
apps/workspace-engine/pkg/db/client.go (1)
GetDB(52-62)apps/workspace-engine/pkg/pb/workspace.pb.go (5)
DeploymentVersionStatus_DEPLOYMENT_VERSION_STATUS_BUILDING(99-99)DeploymentVersionStatus_DEPLOYMENT_VERSION_STATUS_READY(100-100)DeploymentVersionStatus_DEPLOYMENT_VERSION_STATUS_FAILED(101-101)DeploymentVersionStatus_DEPLOYMENT_VERSION_STATUS_REJECTED(102-102)DeploymentVersionStatus_DEPLOYMENT_VERSION_STATUS_UNSPECIFIED(98-98)
apps/workspace-engine/pkg/db/deployment_variables.go (1)
apps/workspace-engine/pkg/db/client.go (2)
GetDB(52-62)Close(65-69)
apps/workspace-engine/pkg/db/resources.go (1)
apps/workspace-engine/pkg/db/client.go (2)
GetDB(52-62)Close(65-69)
apps/workspace-engine/pkg/workspace/store/store.go (2)
apps/workspace-engine/pkg/cmap/concurrent_map.go (1)
New(41-43)apps/workspace-engine/pkg/workspace/store/repository/repo.go (2)
New(36-49)Repository(51-62)
apps/workspace-engine/pkg/workspace/store/repository/repo.go (2)
apps/workspace-engine/pkg/pb/workspace.pb.go (9)
Deployment(1146-1158)Deployment(1171-1171)Deployment(1186-1188)DeploymentVersion(2078-2091)DeploymentVersion(2104-2104)DeploymentVersion(2119-2121)Release(1684-1693)Release(1706-1706)Release(1721-1723)apps/workspace-engine/pkg/cmap/concurrent_map.go (2)
Load(79-91)LoadString(94-96)
apps/workspace-engine/pkg/db/environments.go (1)
apps/workspace-engine/pkg/db/client.go (2)
GetDB(52-62)Close(65-69)
| func scanDeploymentVariable(rows pgx.Rows) (*pb.DeploymentVariable, error) { | ||
| var deploymentVariable pb.DeploymentVariable | ||
| var defaultValueID *string | ||
|
|
||
| err := rows.Scan( | ||
| &deploymentVariable.Id, | ||
| &deploymentVariable.Key, | ||
| &deploymentVariable.Description, | ||
| &deploymentVariable.DeploymentId, | ||
| &defaultValueID, | ||
| ) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| return &deploymentVariable, nil |
There was a problem hiding this comment.
Populate DefaultValueId before returning.
We scan dv.default_value_id into defaultValueID but never copy it onto the proto, so every loaded variable loses its default value reference. Please assign it (respecting the generated field type) before returning.
err := rows.Scan(
&deploymentVariable.Id,
&deploymentVariable.Key,
&deploymentVariable.Description,
&deploymentVariable.DeploymentId,
&defaultValueID,
)
@@
- return &deploymentVariable, nil
+ if defaultValueID != nil {
+ deploymentVariable.DefaultValueId = *defaultValueID
+ }
+ return &deploymentVariable, nil🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/db/deployment_variables.go around lines 45-60, you
scan dv.default_value_id into defaultValueID but never set it on the proto;
after the rows.Scan and before returning, assign the scanned value to
deploymentVariable.DefaultValueId (e.g., if the generated field is a string set
deploymentVariable.DefaultValueId = *defaultValueID when defaultValueID != nil,
or if the generated field is a *string set deploymentVariable.DefaultValueId =
defaultValueID), so the proto reflects the DB value.
There was a problem hiding this comment.
we are scanning and discarding for now
There was a problem hiding this comment.
@adityachoudhari26, understood! Since this is intentional for the current implementation, would you like me to open a follow-up issue to track populating the DefaultValueId field in the future?
✏️ Learnings added
Learnt from: adityachoudhari26
PR: ctrlplanedev/ctrlplane#673
File: apps/workspace-engine/pkg/db/deployment_variables.go:51-65
Timestamp: 2025-10-07T16:44:54.928Z
Learning: In `apps/workspace-engine/pkg/db/deployment_variables.go`, the `default_value_id` field is intentionally scanned from the database but not assigned to the `DefaultValueId` field in the proto struct. This is a temporary decision and the field assignment is deferred for later implementation.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
apps/workspace-engine/pkg/workspace/store/store.go (1)
36-42: Refactor to avoid wasteful allocation.
NewWithRepositorycallsNew()which creates a repository (line 27), then immediately overwrites it (line 38). Additionally,initSubStoresis called twice—once insideNew()(line 32) and again explicitly (line 40).Apply this diff to avoid the unnecessary allocation and duplicate initialization:
func NewWithRepository(repo *repository.Repository) *Store { - store := New() - store.repo = repo - + store := &Store{ + repo: repo, + } initSubStores(store) return store }
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (6)
apps/workspace-engine/pkg/cmap/concurrent_map.go(1 hunks)apps/workspace-engine/pkg/events/events.go(2 hunks)apps/workspace-engine/pkg/events/handler/handler.go(2 hunks)apps/workspace-engine/pkg/workspace/store/repository/repo.go(1 hunks)apps/workspace-engine/pkg/workspace/store/store.go(2 hunks)apps/workspace-engine/pkg/workspace/workspace.go(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- apps/workspace-engine/pkg/events/handler/handler.go
🧰 Additional context used
📓 Path-based instructions (1)
apps/workspace-engine/**/*.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
apps/workspace-engine/**/*.go: Do not add extraneous inline comments that state the obvious
Do not add comments that simply restate what the code does
Do not add comments for standard Go patterns (e.g., noting WaitGroup or semaphore usage)
Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods
Files:
apps/workspace-engine/pkg/events/events.goapps/workspace-engine/pkg/workspace/workspace.goapps/workspace-engine/pkg/cmap/concurrent_map.goapps/workspace-engine/pkg/workspace/store/store.goapps/workspace-engine/pkg/workspace/store/repository/repo.go
🧬 Code graph analysis (4)
apps/workspace-engine/pkg/events/events.go (2)
apps/workspace-engine/pkg/workspace/workspace.go (1)
WorkspaceStore(91-94)apps/workspace-engine/pkg/events/handler/handler.go (2)
EventListener(78-81)NewEventListener(84-86)
apps/workspace-engine/pkg/cmap/concurrent_map.go (1)
apps/workspace-engine/pkg/workspace/store/repository/repo.go (1)
Load(21-34)
apps/workspace-engine/pkg/workspace/store/store.go (2)
apps/workspace-engine/pkg/cmap/concurrent_map.go (1)
New(46-48)apps/workspace-engine/pkg/workspace/store/repository/repo.go (2)
New(36-50)Repository(52-64)
apps/workspace-engine/pkg/workspace/store/repository/repo.go (2)
apps/workspace-engine/pkg/pb/workspace.pb.go (9)
Deployment(1159-1171)Deployment(1184-1184)Deployment(1199-1201)DeploymentVersion(2091-2104)DeploymentVersion(2117-2117)DeploymentVersion(2132-2134)Release(1697-1706)Release(1719-1719)Release(1734-1736)apps/workspace-engine/pkg/cmap/concurrent_map.go (2)
Load(127-139)LoadString(142-144)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Typecheck
- GitHub Check: Lint
- GitHub Check: build (linux/amd64)
🔇 Additional comments (4)
apps/workspace-engine/pkg/workspace/workspace.go (1)
91-94: LGTM! Clean interface design.The
WorkspaceStoreinterface provides a clean abstraction for workspace retrieval and initialization, enabling dependency injection and testability.apps/workspace-engine/pkg/events/events.go (1)
53-55: LGTM! Proper dependency injection.The change correctly wires the
WorkspaceStoredependency into the event handler, replacing direct workspace access with the new store abstraction.apps/workspace-engine/pkg/cmap/concurrent_map.go (1)
126-149: LGTM! Well-designed bulk loading API.The three-function design provides a clean bulk-loading interface: a generic
Loadwith explicit sharding, plus convenient type-specific wrappersLoadStringandLoadStringer. Pre-allocating the map withlen(data)and usingMSetfor bulk insertion is efficient.apps/workspace-engine/pkg/workspace/store/repository/repo.go (1)
30-30: Release.ID() usage is correct Release defines a custom ID() extension and has no GetId() accessor, so no change is required.
| } | ||
|
|
||
|
|
||
| var workspaces = cmap.New[*Workspace]() |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
🧩 Analysis chain
Verify the global workspaces variable is still needed.
The PR summary indicates that the global registry pattern was removed in favor of the new WorkspaceStore interface, but this global variable remains. If it's no longer used, consider removing it to avoid confusion.
Run the following script to check for usages of this global variable:
🏁 Script executed:
#!/bin/bash
# Description: Search for usages of the global workspaces variable
# Search for references to the workspaces variable, excluding this declaration
rg -nP --type=go -C3 '\bworkspaces\b' -g '!workspace.go'Length of output: 148
#!/bin/bash
# Search for all references to the global workspaces variable in Go files
rg -nP '\bworkspaces\b' -tgo -n🏁 Script executed:
#!/bin/bash
# Search for references to the global 'workspaces' variable in all Go files
rg -nP '\bworkspaces\b' -tgo .Length of output: 420
Remove unused global workspaces variable
The global registry declared at apps/workspace-engine/pkg/workspace/workspace.go:89 is never referenced after introducing the WorkspaceStore; delete it to avoid confusion.
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/workspace/workspace.go around line 89, remove the
unused global variable declaration `var workspaces = cmap.New[*Workspace]()`
because the new WorkspaceStore replaces its purpose; delete this line and any
related unused imports (e.g., cmap) to keep the file clean and avoid
unused-global confusion.
There was a problem hiding this comment.
removed the workspace store
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check for any WorkspaceStore interface or type definition
rg -nP '\bWorkspaceStore\b' --type=go
# Verify the global workspaces variable is actually used
echo "=== Usage of global workspaces variable ==="
rg -nP '\bworkspaces\.(Get|Set)\b' --type=go -C2
Length of output: 1272
@adityachoudhari26, thank you for the clarification! I can see that the global workspaces variable at line 89 is still present and actively used by the Exists(), Set(), and GetWorkspace() functions. The original comment suggesting it was unused was incorrect—the global registry pattern is actually the implementation being used in this PR.
The code is working as intended with the global workspaces registry. No changes are needed here! 👍
🧬 Code Graph Analysis Results
[files_relevant_snippets]
-
File: apps/workspace-engine/pkg/workspace/store/store.go
- Snippet 1 (lines 12-31)
func New() *Store { repo := repository.New() store := &Store{ repo: repo, } store.Deployments = NewDeployments(store) store.Environments = NewEnvironments(store) store.Resources = NewResources(store) store.Policies = NewPolicies(store) store.ReleaseTargets = NewReleaseTargets(store) store.DeploymentVersions = NewDeploymentVersions(store) store.Systems = NewSystems(store) store.DeploymentVariables = NewDeploymentVariables(store) store.Releases = NewReleases(store) store.Jobs = NewJobs(store) store.JobAgents = NewJobAgents(store) return store } - Snippet 2 (lines 33-48)
Store struct { repo *repository.Repository Policies *Policies Resources *Resources Deployments *Deployments DeploymentVersions *DeploymentVersions DeploymentVariables *DeploymentVariables Environments *Environments ReleaseTargets *ReleaseTargets Systems *Systems Releases *Releases Jobs *Jobs JobAgents *JobAgents UserApprovalRecords *UserApprovalRecords }
- Snippet 1 (lines 12-31)
-
File: apps/workspace-engine/pkg/workspace/store/repository/repo.go
- Snippet (lines 8-22)
func New() *Repository { return &Repository{ Resources: cmap.New[*pb.Resource](), Deployments: cmap.New[*pb.Deployment](), DeploymentVersions: cmap.New[*pb.DeploymentVersion](), DeploymentVariables: cmap.New[*pb.DeploymentVariable](), Environments: cmap.New[*pb.Environment](), Policies: cmap.New[*pb.Policy](), Systems: cmap.New[*pb.System](), Releases: cmap.New[*pb.Release](), Jobs: cmap.New[*pb.Job](), JobAgents: cmap.New[*pb.JobAgent](), UserApprovalRecords: cmap.New[*pb.UserApprovalRecord](), } }
- Snippet (lines 8-22)
-
File: apps/workspace-engine/pkg/workspace/store/jobs.go
- Snippet (lines 17-19)
Jobs struct { repo *repository.Repository }
- Snippet (lines 17-19)
-
File: apps/workspace-engine/pkg/workspace/releasemanager/manager.go
- Snippet (lines 26-33)
func New(store *store.Store) *Manager { return &Manager{ store: store, currentTargets: make(map[string]*pb.ReleaseTarget, 5000), versionManager: versionmanager.New(store), variableManager: variablemanager.New(store), } }
- Snippet (lines 26-33)
-
File: apps/workspace-engine/pkg/workspace/releasemanager/versionmanager/manager.go
- Snippet (lines 19-24)
func New(store *store.Store) *Manager { return &Manager{ store: store, policyManager: policymanager.New(store), } }
- Snippet (lines 19-24)
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (1)
apps/workspace-engine/pkg/workspace/store/repository/repo.go (1)
22-113: Consider consolidating loader functions with generics.All ten loader functions follow an identical pattern: create a map, iterate the input slice, and set each entity by its ID. This repetition creates a maintenance burden.
You can eliminate the duplication using Go generics:
+func loadEntities[T any](entities []T, keyFunc func(T) string) cmap.ConcurrentMap[string, T] { + m := cmap.New[T]() + for _, entity := range entities { + m.Set(keyFunc(entity), entity) + } + return m +} + -func loadResources(initialResources []*pb.Resource) cmap.ConcurrentMap[string, *pb.Resource] { - resources := cmap.New[*pb.Resource]() - for _, resource := range initialResources { - resources.Set(resource.GetId(), resource) - } - return resources -} - -func loadDeployments(initialDeployments []*pb.Deployment) cmap.ConcurrentMap[string, *pb.Deployment] { - - deployments := cmap.New[*pb.Deployment]() - for _, deployment := range initialDeployments { - deployments.Set(deployment.GetId(), deployment) - } - return deployments -} - -func loadDeploymentVersions(initialDeploymentVersions []*pb.DeploymentVersion) cmap.ConcurrentMap[string, *pb.DeploymentVersion] { - - deploymentVersions := cmap.New[*pb.DeploymentVersion]() - for _, deploymentVersion := range initialDeploymentVersions { - deploymentVersions.Set(deploymentVersion.GetId(), deploymentVersion) - } - return deploymentVersions -} - -func loadDeploymentVariables(initialDeploymentVariables []*pb.DeploymentVariable) cmap.ConcurrentMap[string, *pb.DeploymentVariable] { - - deploymentVariables := cmap.New[*pb.DeploymentVariable]() - for _, deploymentVariable := range initialDeploymentVariables { - deploymentVariables.Set(deploymentVariable.GetId(), deploymentVariable) - } - return deploymentVariables -} - -func loadEnvironments(initialEnvironments []*pb.Environment) cmap.ConcurrentMap[string, *pb.Environment] { - - environments := cmap.New[*pb.Environment]() - for _, environment := range initialEnvironments { - environments.Set(environment.GetId(), environment) - } - return environments -} - -func loadPolicies(initialPolicies []*pb.Policy) cmap.ConcurrentMap[string, *pb.Policy] { - - policies := cmap.New[*pb.Policy]() - for _, policy := range initialPolicies { - policies.Set(policy.GetId(), policy) - } - return policies -} - -func loadSystems(initialSystems []*pb.System) cmap.ConcurrentMap[string, *pb.System] { - systems := cmap.New[*pb.System]() - for _, system := range initialSystems { - systems.Set(system.GetId(), system) - } - return systems -} - -func loadReleases(initialReleases []*pb.Release) cmap.ConcurrentMap[string, *pb.Release] { - releases := cmap.New[*pb.Release]() - for _, release := range initialReleases { - releases.Set(release.ID(), release) - } - return releases -} - -func loadJobs(initialJobs []*pb.Job) cmap.ConcurrentMap[string, *pb.Job] { - jobs := cmap.New[*pb.Job]() - for _, job := range initialJobs { - jobs.Set(job.GetId(), job) - } - return jobs -} - -func loadJobAgents(initialJobAgents []*pb.JobAgent) cmap.ConcurrentMap[string, *pb.JobAgent] { - jobAgents := cmap.New[*pb.JobAgent]() - for _, jobAgent := range initialJobAgents { - jobAgents.Set(jobAgent.GetId(), jobAgent) - } - return jobAgents -} - -func loadUserApprovalRecords(initialUserApprovalRecords []*pb.UserApprovalRecord) cmap.ConcurrentMap[string, *pb.UserApprovalRecord] { - userApprovalRecords := cmap.New[*pb.UserApprovalRecord]() - for _, userApprovalRecord := range initialUserApprovalRecords { - userApprovalRecords.Set(userApprovalRecord.Key(), userApprovalRecord) - } - return userApprovalRecords -}Then update
Load()to use the generic function:func Load(initialEntities *InitialEntities) *Repository { return &Repository{ - Resources: loadResources(initialEntities.Resources), - Deployments: loadDeployments(initialEntities.Deployments), - DeploymentVersions: loadDeploymentVersions(initialEntities.DeploymentVersions), - DeploymentVariables: loadDeploymentVariables(initialEntities.DeploymentVariables), - Environments: loadEnvironments(initialEntities.Environments), - Policies: loadPolicies(initialEntities.Policies), - Systems: loadSystems(initialEntities.Systems), - Releases: loadReleases(initialEntities.Releases), - Jobs: loadJobs(initialEntities.Jobs), - JobAgents: loadJobAgents(initialEntities.JobAgents), - UserApprovalRecords: loadUserApprovalRecords(initialEntities.UserApprovalRecords), + Resources: loadEntities(initialEntities.Resources, func(r *pb.Resource) string { return r.GetId() }), + Deployments: loadEntities(initialEntities.Deployments, func(d *pb.Deployment) string { return d.GetId() }), + DeploymentVersions: loadEntities(initialEntities.DeploymentVersions, func(dv *pb.DeploymentVersion) string { return dv.GetId() }), + DeploymentVariables: loadEntities(initialEntities.DeploymentVariables, func(dv *pb.DeploymentVariable) string { return dv.GetId() }), + Environments: loadEntities(initialEntities.Environments, func(e *pb.Environment) string { return e.GetId() }), + Policies: loadEntities(initialEntities.Policies, func(p *pb.Policy) string { return p.GetId() }), + Systems: loadEntities(initialEntities.Systems, func(s *pb.System) string { return s.GetId() }), + Releases: loadEntities(initialEntities.Releases, func(r *pb.Release) string { return r.ID() }), + Jobs: loadEntities(initialEntities.Jobs, func(j *pb.Job) string { return j.GetId() }), + JobAgents: loadEntities(initialEntities.JobAgents, func(ja *pb.JobAgent) string { return ja.GetId() }), + UserApprovalRecords: loadEntities(initialEntities.UserApprovalRecords, func(u *pb.UserApprovalRecord) string { return u.Key() }), } }
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
apps/workspace-engine/pkg/workspace/store/repository/repo.go(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
apps/workspace-engine/**/*.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
apps/workspace-engine/**/*.go: Do not add extraneous inline comments that state the obvious
Do not add comments that simply restate what the code does
Do not add comments for standard Go patterns (e.g., noting WaitGroup or semaphore usage)
Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods
Files:
apps/workspace-engine/pkg/workspace/store/repository/repo.go
🧬 Code graph analysis (1)
apps/workspace-engine/pkg/workspace/store/repository/repo.go (3)
apps/workspace-engine/pkg/pb/workspace.pb.go (15)
Resource(934-952)Resource(965-965)Resource(980-982)Deployment(1159-1171)Deployment(1184-1184)Deployment(1199-1201)DeploymentVersion(2091-2104)DeploymentVersion(2117-2117)DeploymentVersion(2132-2134)Release(1697-1706)Release(1719-1719)Release(1734-1736)UserApprovalRecord(449-458)UserApprovalRecord(471-471)UserApprovalRecord(486-488)apps/workspace-engine/pkg/cmap/concurrent_map.go (2)
ConcurrentMap(23-26)New(46-48)apps/workspace-engine/pkg/workspace/store/store.go (1)
New(26-34)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Lint
- GitHub Check: Typecheck
🔇 Additional comments (2)
apps/workspace-engine/pkg/workspace/store/repository/repo.go (2)
8-20: LGTM! InitialEntities addresses previous feedback.The struct now includes
UserApprovalRecords(line 19), resolving the concern from the previous review.
86-86: No change needed: custom key methods are intentional
Release.ID()andUserApprovalRecord.Key()implement composite/deterministic keys, so their naming differs from other entities’GetId().
There was a problem hiding this comment.
Actionable comments posted: 5
🧹 Nitpick comments (1)
apps/workspace-engine/pkg/kafka/init_workspaces.go (1)
74-85: Consider limiting concurrent DB operations.If there are many workspaces, unbounded goroutine spawning (line 76) could create excessive concurrent database connections, potentially exhausting the connection pool or degrading performance.
Consider using a semaphore or worker pool pattern to limit concurrent operations to a reasonable number (e.g., 10-20).
Example using a semaphore:
func loadFullWorkspaces(ctx context.Context, workspaceIDs []string) error { var wg sync.WaitGroup var loadErrs []error var mu sync.Mutex // Limit concurrent operations sem := make(chan struct{}, 10) for _, workspaceID := range workspaceIDs { wg.Add(1) go func(workspaceID string) { defer wg.Done() sem <- struct{}{} // Acquire defer func() { <-sem }() // Release fullWorkspace, err := db.LoadWorkspace(ctx, workspaceID) if err != nil { mu.Lock() loadErrs = append(loadErrs, err) mu.Unlock() return } workspace.Set(workspaceID, fullWorkspace) }(workspaceID) } wg.Wait() if len(loadErrs) > 0 { return fmt.Errorf("failed to load full workspaces: %w", errors.Join(loadErrs...)) } return nil }
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (13)
apps/workspace-engine/go.mod(2 hunks)apps/workspace-engine/pkg/db/client.go(1 hunks)apps/workspace-engine/pkg/db/deployment_variables.go(1 hunks)apps/workspace-engine/pkg/db/deployment_versions.go(1 hunks)apps/workspace-engine/pkg/db/deployments.go(1 hunks)apps/workspace-engine/pkg/db/environments.go(1 hunks)apps/workspace-engine/pkg/db/resources.go(1 hunks)apps/workspace-engine/pkg/db/workspace_load.go(1 hunks)apps/workspace-engine/pkg/db/workspaces.go(1 hunks)apps/workspace-engine/pkg/events/handler/handler.go(2 hunks)apps/workspace-engine/pkg/kafka/init_workspaces.go(1 hunks)apps/workspace-engine/pkg/kafka/kafka.go(1 hunks)apps/workspace-engine/pkg/workspace/workspace.go(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
- apps/workspace-engine/pkg/kafka/kafka.go
- apps/workspace-engine/pkg/db/environments.go
- apps/workspace-engine/go.mod
- apps/workspace-engine/pkg/db/resources.go
- apps/workspace-engine/pkg/workspace/workspace.go
- apps/workspace-engine/pkg/db/deployments.go
🧰 Additional context used
📓 Path-based instructions (1)
apps/workspace-engine/**/*.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
apps/workspace-engine/**/*.go: Do not add extraneous inline comments that state the obvious
Do not add comments that simply restate what the code does
Do not add comments for standard Go patterns (e.g., noting WaitGroup or semaphore usage)
Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods
Files:
apps/workspace-engine/pkg/db/workspaces.goapps/workspace-engine/pkg/db/workspace_load.goapps/workspace-engine/pkg/db/deployment_variables.goapps/workspace-engine/pkg/db/client.goapps/workspace-engine/pkg/kafka/init_workspaces.goapps/workspace-engine/pkg/events/handler/handler.goapps/workspace-engine/pkg/db/deployment_versions.go
🧠 Learnings (1)
📚 Learning: 2025-08-12T20:49:05.086Z
Learnt from: adityachoudhari26
PR: ctrlplanedev/ctrlplane#655
File: apps/workspace-engine/pkg/engine/workspace/fluent.go:166-171
Timestamp: 2025-08-12T20:49:05.086Z
Learning: The UpdateDeploymentVersions() method with OperationCreate case in apps/workspace-engine/pkg/engine/workspace/fluent.go is specifically designed only for creating new deployment versions, not for handling potential duplicates or existing versions.
Applied to files:
apps/workspace-engine/pkg/db/deployment_versions.go
🧬 Code graph analysis (6)
apps/workspace-engine/pkg/db/workspaces.go (1)
apps/workspace-engine/pkg/db/client.go (2)
GetDB(51-61)Close(64-68)
apps/workspace-engine/pkg/db/workspace_load.go (12)
apps/workspace-engine/pkg/workspace/workspace.go (2)
Workspace(24-29)New(13-22)apps/workspace-engine/pkg/db/client.go (1)
GetDB(51-61)apps/workspace-engine/pkg/workspace/store/repository/repo.go (1)
New(8-22)apps/workspace-engine/pkg/db/resources.go (1)
GetResources(40-118)apps/workspace-engine/pkg/workspace/store/resources.go (1)
Resources(19-22)apps/workspace-engine/pkg/db/deployments.go (1)
GetDeployments(24-59)apps/workspace-engine/pkg/workspace/store/deployments.go (1)
Deployments(26-32)apps/workspace-engine/pkg/db/deployment_versions.go (1)
GetDeploymentVersions(28-53)apps/workspace-engine/pkg/workspace/store/deployment_versions.go (1)
DeploymentVersions(16-21)apps/workspace-engine/pkg/db/deployment_variables.go (1)
GetDeploymentVariables(24-49)apps/workspace-engine/pkg/db/environments.go (1)
GetEnvironments(22-48)apps/workspace-engine/pkg/workspace/store/environments.go (1)
Environments(23-28)
apps/workspace-engine/pkg/db/deployment_variables.go (1)
apps/workspace-engine/pkg/db/client.go (2)
GetDB(51-61)Close(64-68)
apps/workspace-engine/pkg/kafka/init_workspaces.go (4)
apps/workspace-engine/pkg/kafka/kafka.go (1)
Topic(14-14)apps/workspace-engine/pkg/db/workspace_load.go (1)
LoadWorkspace(11-99)apps/workspace-engine/pkg/workspace/workspace.go (1)
Set(107-109)apps/workspace-engine/pkg/db/workspaces.go (1)
GetWorkspaceIDs(9-35)
apps/workspace-engine/pkg/events/handler/handler.go (2)
apps/workspace-engine/pkg/workspace/workspace.go (3)
Workspace(24-29)Exists(102-105)GetWorkspace(111-118)apps/workspace-engine/pkg/db/workspace_load.go (1)
LoadWorkspace(11-99)
apps/workspace-engine/pkg/db/deployment_versions.go (2)
apps/workspace-engine/pkg/db/client.go (1)
GetDB(51-61)apps/workspace-engine/pkg/pb/workspace.pb.go (5)
DeploymentVersionStatus_DEPLOYMENT_VERSION_STATUS_BUILDING(148-148)DeploymentVersionStatus_DEPLOYMENT_VERSION_STATUS_READY(149-149)DeploymentVersionStatus_DEPLOYMENT_VERSION_STATUS_FAILED(150-150)DeploymentVersionStatus_DEPLOYMENT_VERSION_STATUS_REJECTED(151-151)DeploymentVersionStatus_DEPLOYMENT_VERSION_STATUS_UNSPECIFIED(147-147)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Typecheck
- GitHub Check: Lint
- GitHub Check: build (linux/amd64)
🔇 Additional comments (7)
apps/workspace-engine/pkg/kafka/init_workspaces.go (3)
15-48: LGTM!The murmur2 implementation and partition calculation logic are correct and follow standard Kafka partitioning conventions.
50-60: LGTM!The function correctly retrieves assigned partitions and builds a set for efficient lookup.
94-118: LGTM!The orchestration logic correctly filters workspaces by partition assignment before loading them. The control flow is clear and error handling is appropriate.
apps/workspace-engine/pkg/db/deployment_versions.go (4)
11-26: LGTM!The SQL query correctly joins through the table hierarchy to filter deployment versions by workspace ID. Column selection matches the scanning logic.
28-53: LGTM!The function follows the standard DB query pattern with proper error handling and resource cleanup.
55-89: LGTM!The scanning logic correctly handles all field types including JSON fields and nullable columns. The separation of concerns with helper functions is appropriate.
91-104: LGTM!The status conversion function correctly maps all known status strings to their corresponding enum values with a safe default fallback.
| err := rows.Scan( | ||
| &deploymentVariable.Id, | ||
| &deploymentVariable.Key, | ||
| &deploymentVariable.Description, | ||
| &deploymentVariable.DeploymentId, | ||
| ) |
There was a problem hiding this comment.
Fix the Scan argument mismatch and populate DefaultValueId.
The SELECT pulls five columns but rows.Scan only has four destinations, so this path will fail at runtime. We also drop the scanned default_value_id, so deployment variables lose their default association. Capture the fifth column and copy it onto the proto (handling NULL properly).
@@
-import (
- "context"
-
- "workspace-engine/pkg/pb"
-
- "github.com/jackc/pgx/v5"
-)
+import (
+ "context"
+ "database/sql"
+
+ "workspace-engine/pkg/pb"
+
+ "github.com/jackc/pgx/v5"
+)
@@
- err := rows.Scan(
- &deploymentVariable.Id,
- &deploymentVariable.Key,
- &deploymentVariable.Description,
- &deploymentVariable.DeploymentId,
- )
+ var defaultValueID sql.NullString
+ err := rows.Scan(
+ &deploymentVariable.Id,
+ &deploymentVariable.Key,
+ &deploymentVariable.Description,
+ &deploymentVariable.DeploymentId,
+ &defaultValueID,
+ )
if err != nil {
return nil, err
}
- return &deploymentVariable, nil
+ if defaultValueID.Valid {
+ deploymentVariable.DefaultValueId = defaultValueID.String
+ }
+ return &deploymentVariable, nil🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/db/deployment_variables.go around lines 54 to 59,
the rows.Scan call only reads four columns while the SELECT returns five, and
the default_value_id column is dropped; update the Scan to include a fifth
destination variable (use an appropriate nullable type such as sql.NullInt64 or
sql.NullString matching the DB column type), then map that nullable value into
deploymentVariable.DefaultValueId on the proto only when valid (handle NULL by
leaving the proto field unset or zeroed as appropriate).
There was a problem hiding this comment.
just going to keep it nil for now
There was a problem hiding this comment.
@adityachoudhari26, understood! However, you'll still need to scan the fifth column to avoid a runtime error—pgx requires all selected columns to be read. You can scan it into a throwaway variable:
func scanDeploymentVariable(rows pgx.Rows) (*pb.DeploymentVariable, error) {
var deploymentVariable pb.DeploymentVariable
var _ string // discard default_value_id
err := rows.Scan(
&deploymentVariable.Id,
&deploymentVariable.Key,
&deploymentVariable.Description,
&deploymentVariable.DeploymentId,
&_, // scan but discard
)
if err != nil {
return nil, err
}
return &deploymentVariable, nil
}Alternatively, if you don't need it at all, you could remove dv.default_value_id from the SELECT query.
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
apps/workspace-engine/pkg/kafka/init_workspaces.go (1)
48-75: Critical: Data race on shared slice.The
loadErrsslice at line 63 is accessed concurrently from multiple goroutines without synchronization, causing a data race. This can lead to crashes, lost errors, or memory corruption.Apply this diff to protect the slice with a mutex:
func loadFullWorkspaces(ctx context.Context, workspaceIDs []string) error { var wg sync.WaitGroup var loadErrs []error + var mu sync.Mutex for _, workspaceID := range workspaceIDs { wg.Add(1) go func(workspaceID string) { defer wg.Done() if workspace.Exists(workspaceID) { return } fullWorkspace, err := db.LoadWorkspace(ctx, workspaceID) if err != nil { + mu.Lock() loadErrs = append(loadErrs, err) + mu.Unlock() return } workspace.Set(workspaceID, fullWorkspace) }(workspaceID) } wg.Wait() if len(loadErrs) > 0 { return fmt.Errorf("failed to load full workspaces: %w", errors.Join(loadErrs...)) } return nil }
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
apps/workspace-engine/pkg/kafka/init_workspaces.go(1 hunks)apps/workspace-engine/pkg/kafka/kafka.go(2 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
apps/workspace-engine/**/*.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
apps/workspace-engine/**/*.go: Do not add extraneous inline comments that state the obvious
Do not add comments that simply restate what the code does
Do not add comments for standard Go patterns (e.g., noting WaitGroup or semaphore usage)
Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods
Files:
apps/workspace-engine/pkg/kafka/init_workspaces.goapps/workspace-engine/pkg/kafka/kafka.go
🧬 Code graph analysis (1)
apps/workspace-engine/pkg/kafka/init_workspaces.go (3)
apps/workspace-engine/pkg/workspace/workspace.go (2)
Exists(102-105)Set(107-109)apps/workspace-engine/pkg/db/workspace_load.go (1)
LoadWorkspace(11-99)apps/workspace-engine/pkg/db/workspaces.go (1)
GetWorkspaceIDs(9-35)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Typecheck
- GitHub Check: Lint
🔇 Additional comments (6)
apps/workspace-engine/pkg/kafka/kafka.go (3)
27-37: LGTM!The function correctly retrieves assigned partitions and builds a set representation using a map. Error handling is appropriate.
47-60: LGTM!The function correctly orchestrates workspace cache population by retrieving partition assignments and delegating to
initWorkspaces. Error handling and propagation are appropriate.
77-101: Approve cache initialization. Verified that theworkspace.Existscheck ininit_workspaces.goprevents duplicate loading whenpopulateWorkspaceCacheruns both immediately after subscription and on partition assignment.apps/workspace-engine/pkg/kafka/init_workspaces.go (3)
13-42: LGTM!The MurmurHash2 implementation is correct and follows the standard algorithm. The masking at line 41 ensures non-negative hash values for partition calculation.
44-46: LGTM!The partition assignment logic correctly uses MurmurHash2 for consistent key-to-partition mapping.
77-92: LGTM!The function correctly filters workspace IDs based on partition assignment and delegates loading to
loadFullWorkspaces. The partition-based filtering logic ensures each consumer loads only its assigned workspaces.
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
apps/workspace-engine/pkg/db/workspace_load.go (1)
12-17: Drop the unusedGetDBacquire.Line 12 grabs a pool connection, defers
Release, but never uses the handle—each loader (GetResources, etc.) acquires its own connection. That ties up one extra connection per load and shrinks pool capacity under heavy workspace loads. Remove the upfrontGetDBcall (or replace it with a cheap pool-initialization check) so only the goroutines hold connections while they’re actively querying.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (4)
apps/workspace-engine/pkg/db/workspace_load.go(1 hunks)apps/workspace-engine/pkg/events/handler/handler.go(2 hunks)apps/workspace-engine/pkg/kafka/init_workspaces.go(1 hunks)apps/workspace-engine/pkg/kafka/kafka.go(3 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
apps/workspace-engine/**/*.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
apps/workspace-engine/**/*.go: Do not add extraneous inline comments that state the obvious
Do not add comments that simply restate what the code does
Do not add comments for standard Go patterns (e.g., noting WaitGroup or semaphore usage)
Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods
Files:
apps/workspace-engine/pkg/db/workspace_load.goapps/workspace-engine/pkg/events/handler/handler.goapps/workspace-engine/pkg/kafka/kafka.goapps/workspace-engine/pkg/kafka/init_workspaces.go
🧬 Code graph analysis (3)
apps/workspace-engine/pkg/db/workspace_load.go (12)
apps/workspace-engine/pkg/workspace/workspace.go (2)
Workspace(24-29)New(13-22)apps/workspace-engine/pkg/db/client.go (1)
GetDB(51-61)apps/workspace-engine/pkg/workspace/store/repository/repo.go (1)
New(8-22)apps/workspace-engine/pkg/db/resources.go (1)
GetResources(40-118)apps/workspace-engine/pkg/workspace/store/resources.go (1)
Resources(19-22)apps/workspace-engine/pkg/db/deployments.go (1)
GetDeployments(24-59)apps/workspace-engine/pkg/workspace/store/deployments.go (1)
Deployments(26-32)apps/workspace-engine/pkg/db/deployment_versions.go (1)
GetDeploymentVersions(28-53)apps/workspace-engine/pkg/workspace/store/deployment_versions.go (1)
DeploymentVersions(16-21)apps/workspace-engine/pkg/db/deployment_variables.go (1)
GetDeploymentVariables(24-49)apps/workspace-engine/pkg/db/environments.go (1)
GetEnvironments(22-48)apps/workspace-engine/pkg/workspace/store/environments.go (1)
Environments(23-28)
apps/workspace-engine/pkg/events/handler/handler.go (2)
apps/workspace-engine/pkg/workspace/workspace.go (4)
Workspace(24-29)Exists(102-105)GetWorkspace(111-118)Set(107-109)apps/workspace-engine/pkg/db/workspace_load.go (1)
LoadWorkspace(11-86)
apps/workspace-engine/pkg/kafka/init_workspaces.go (3)
apps/workspace-engine/pkg/workspace/workspace.go (2)
Exists(102-105)Set(107-109)apps/workspace-engine/pkg/db/workspace_load.go (1)
LoadWorkspace(11-86)apps/workspace-engine/pkg/db/workspaces.go (1)
GetWorkspaceIDs(9-35)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Typecheck
- GitHub Check: Lint
| err = c.SubscribeTopics([]string{Topic}, func(c *kafka.Consumer, e kafka.Event) error { | ||
| switch e.(type) { | ||
| case *kafka.AssignedPartitions: | ||
| if err := populateWorkspaceCache(ctx, c); err != nil { | ||
| log.Error("Failed to populate workspace cache", "error", err) | ||
| return err | ||
| } | ||
| default: | ||
| return nil | ||
| } | ||
|
|
||
| return nil | ||
| }) |
There was a problem hiding this comment.
Assign partitions inside the rebalance callback.
Line 85: once you install a rebalance callback, confluent-kafka stops auto-assigning. Without calling c.Assign the consumer stays unassigned, so ReadMessage never yields data. Handle the event explicitly before populating the cache:
- err = c.SubscribeTopics([]string{Topic}, func(c *kafka.Consumer, e kafka.Event) error {
- switch e.(type) {
- case *kafka.AssignedPartitions:
- if err := populateWorkspaceCache(ctx, c); err != nil {
+ err = c.SubscribeTopics([]string{Topic}, func(c *kafka.Consumer, e kafka.Event) error {
+ switch ev := e.(type) {
+ case *kafka.AssignedPartitions:
+ if err := c.Assign(ev.Partitions); err != nil {
+ log.Error("Failed to assign partitions", "error", err)
+ return err
+ }
+ if err := populateWorkspaceCache(ctx, c); err != nil {
log.Error("Failed to populate workspace cache", "error", err)
return err
}
+ case *kafka.RevokedPartitions:
+ if err := c.Unassign(); err != nil {
+ log.Error("Failed to unassign partitions", "error", err)
+ return err
+ }
default:
return nil
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| err = c.SubscribeTopics([]string{Topic}, func(c *kafka.Consumer, e kafka.Event) error { | |
| switch e.(type) { | |
| case *kafka.AssignedPartitions: | |
| if err := populateWorkspaceCache(ctx, c); err != nil { | |
| log.Error("Failed to populate workspace cache", "error", err) | |
| return err | |
| } | |
| default: | |
| return nil | |
| } | |
| return nil | |
| }) | |
| err = c.SubscribeTopics([]string{Topic}, func(c *kafka.Consumer, e kafka.Event) error { | |
| switch ev := e.(type) { | |
| case *kafka.AssignedPartitions: | |
| if err := c.Assign(ev.Partitions); err != nil { | |
| log.Error("Failed to assign partitions", "error", err) | |
| return err | |
| } | |
| if err := populateWorkspaceCache(ctx, c); err != nil { | |
| log.Error("Failed to populate workspace cache", "error", err) | |
| return err | |
| } | |
| case *kafka.RevokedPartitions: | |
| if err := c.Unassign(); err != nil { | |
| log.Error("Failed to unassign partitions", "error", err) | |
| return err | |
| } | |
| default: | |
| return nil | |
| } | |
| return nil | |
| }) |
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/kafka/kafka.go around lines 82 to 94, the rebalance
callback currently only reacts to AssignedPartitions by populating the cache but
does not actually assign the partitions, so the consumer remains unassigned and
ReadMessage will never return data; update the callback to handle the
*kafka.AssignedPartitions event by extracting the partition list and calling
c.Assign(partitions.Partitions) (and also handle *kafka.RevokedPartitions by
calling c.Unassign()) before calling populateWorkspaceCache, returning any
assign/unassign errors appropriately.
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (2)
apps/workspace-engine/pkg/db/deployment_variables.go (2)
24-49: Add doc comment to exported function.Per Go conventions and the coding guidelines, exported functions should be documented. Please add a comment describing what
GetDeploymentVariablesdoes, its parameters, and return values.Example:
+// GetDeploymentVariables retrieves all deployment variables for a given workspace. +// Returns a slice of deployment variables or an error if the query fails. func GetDeploymentVariables(ctx context.Context, workspaceID string) ([]*pb.DeploymentVariable, error) {Based on coding guidelines
59-59: Consider using the blank identifier for discarding.While
new(interface{})works correctly, the more idiomatic Go pattern for discarding a scanned value is:+ var _ interface{} err := rows.Scan( &deploymentVariable.Id, &deploymentVariable.Key, &deploymentVariable.Description, &deploymentVariable.DeploymentId, - new(interface{}), // scan but discard + &_, // scan but discard )
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
apps/workspace-engine/pkg/db/deployment_variables.go(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
apps/workspace-engine/**/*.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
apps/workspace-engine/**/*.go: Do not add extraneous inline comments that state the obvious
Do not add comments that simply restate what the code does
Do not add comments for standard Go patterns (e.g., noting WaitGroup or semaphore usage)
Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods
Files:
apps/workspace-engine/pkg/db/deployment_variables.go
🧬 Code graph analysis (1)
apps/workspace-engine/pkg/db/deployment_variables.go (1)
apps/workspace-engine/pkg/db/client.go (2)
GetDB(51-61)Close(64-68)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Typecheck
- GitHub Check: Lint
🔇 Additional comments (1)
apps/workspace-engine/pkg/db/deployment_variables.go (1)
11-22: LGTM!The SQL query is syntactically correct and efficiently retrieves deployment variables filtered by workspace.
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (2)
apps/workspace-engine/pkg/kafka/kafka.go (2)
45-49: Add error and empty partition checks to prevent divide-by-zero.The past review comment correctly identified that returning
0when the topic has metadata errors or no partitions can cause a divide-by-zero panic inpartitionOfKey. The current code checks topic existence but doesn't validate the metadata itself.Apply this diff to add the missing validation:
topicMeta, ok := md.Topics[Topic] if !ok { return 0, fmt.Errorf("topic %s not found", Topic) } + if topicMeta.Error.Code() != kafka.ErrNoError { + return 0, fmt.Errorf("metadata error for topic %s: %w", Topic, topicMeta.Error) + } + if len(topicMeta.Partitions) == 0 { + return 0, fmt.Errorf("topic %s has no partitions", Topic) + } return len(topicMeta.Partitions), nil
82-98: Handle partition revocation to prevent stale workspace cache.The past review correctly identified that
RevokedPartitionsshould be handled. Without this, when partitions are reassigned to other consumers during rebalancing, the workspace cache can retain stale data for revoked partitions.Apply this diff to handle revocation:
case *kafka.AssignedPartitions: if err := c.Assign(ev.Partitions); err != nil { log.Error("Failed to assign partitions", "error", err) return err } if err := populateWorkspaceCache(ctx, c); err != nil { log.Error("Failed to populate workspace cache", "error", err) return err } + case *kafka.RevokedPartitions: + if err := c.Unassign(); err != nil { + log.Error("Failed to unassign partitions", "error", err) + return err + } default: return nil }
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
apps/workspace-engine/pkg/kafka/kafka.go(3 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
apps/workspace-engine/**/*.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
apps/workspace-engine/**/*.go: Do not add extraneous inline comments that state the obvious
Do not add comments that simply restate what the code does
Do not add comments for standard Go patterns (e.g., noting WaitGroup or semaphore usage)
Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods
Files:
apps/workspace-engine/pkg/kafka/kafka.go
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Lint
- GitHub Check: Typecheck
🔇 Additional comments (2)
apps/workspace-engine/pkg/kafka/kafka.go (2)
28-38: LGTM!The conversion to a set-based representation is appropriate for efficient partition membership checks downstream.
52-65: LGTM!The orchestration logic correctly gathers partition assignment and count before delegating to
initWorkspaces.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
apps/workspace-engine/pkg/db/deployment_variables.go (1)
11-21: Ensure proper indexes for query performance.The query joins three tables and filters by
workspace_id. For optimal performance, verify that indexes exist on:
deployment.system_id(join key)deployment_variable.deployment_id(join key)system.workspace_id(filter column)These indexes should typically be created during schema migrations.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
apps/workspace-engine/pkg/db/deployment_variables.go(1 hunks)apps/workspace-engine/pkg/kafka/kafka.go(3 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
apps/workspace-engine/**/*.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
apps/workspace-engine/**/*.go: Do not add extraneous inline comments that state the obvious
Do not add comments that simply restate what the code does
Do not add comments for standard Go patterns (e.g., noting WaitGroup or semaphore usage)
Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods
Files:
apps/workspace-engine/pkg/kafka/kafka.goapps/workspace-engine/pkg/db/deployment_variables.go
🧠 Learnings (1)
📚 Learning: 2025-10-07T16:44:54.928Z
Learnt from: adityachoudhari26
PR: ctrlplanedev/ctrlplane#673
File: apps/workspace-engine/pkg/db/deployment_variables.go:51-65
Timestamp: 2025-10-07T16:44:54.928Z
Learning: In `apps/workspace-engine/pkg/db/deployment_variables.go`, the `default_value_id` field is intentionally scanned from the database but not assigned to the `DefaultValueId` field in the proto struct. This is a temporary decision and the field assignment is deferred for later implementation.
Applied to files:
apps/workspace-engine/pkg/db/deployment_variables.go
🧬 Code graph analysis (1)
apps/workspace-engine/pkg/db/deployment_variables.go (1)
apps/workspace-engine/pkg/db/client.go (2)
GetDB(51-61)Close(64-68)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Typecheck
- GitHub Check: Lint
🔇 Additional comments (6)
apps/workspace-engine/pkg/kafka/kafka.go (4)
5-5: LGTM!The
fmtimport is correctly added to support error formatting in the new helper functions.
88-109: LGTM! Critical rebalance handling issues resolved.The rebalance callback correctly implements the partition assignment protocol:
- AssignedPartitions: Properly calls
c.Assign()before populating the workspace cache (addresses the past critical issue where the consumer would remain unassigned)- RevokedPartitions: Correctly calls
c.Unassign()to release partitions- Error handling is comprehensive with appropriate logging and error propagation
The redundant initial
populateWorkspaceCachecall has been removed as recommended.
137-140: Verify fallback workspace loading
No evidence of a fallback to load missing workspaces in theEventHandlerimplementation. Please confirm thatapps/workspace-engine/pkg/events/EventHandler(or the equivalent handler) includes logic to load workspaces not in cache when processing events.
28-71: LGTM! Past review comments have been addressed and no further action needed
getAssignedPartitions: cleanly converts partition assignments to a setgetTopicPartitionCount: includes bounds checking, metadata error validation, and empty partition detectionpopulateWorkspaceCache: orchestrates helpers with correct error propagationinitWorkspaces+loadFullWorkspaces: gracefully handle emptyassignedPartitions(no-op on empty slice)apps/workspace-engine/pkg/db/deployment_variables.go (2)
11-21: LGTM!The query is well-structured with proper parameterization and correct syntax. The column count matches the scanning logic, and the trailing comma issue from previous reviews has been resolved.
50-64: LGTM!The helper function correctly scans all four columns from the query result. The column count matches the SELECT statement, and error handling is appropriate. The decision to defer populating
DefaultValueIdis tracked per previous discussions.Based on learnings.
| func GetDeploymentVariables(ctx context.Context, workspaceID string) ([]*pb.DeploymentVariable, error) { | ||
| db, err := GetDB(ctx) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| defer db.Release() | ||
|
|
||
| rows, err := db.Query(ctx, DEPLOYMENT_VARIABLE_SELECT_QUERY, workspaceID) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| defer rows.Close() | ||
|
|
||
| deploymentVariables := make([]*pb.DeploymentVariable, 0) | ||
| for rows.Next() { | ||
| deploymentVariable, err := scanDeploymentVariable(rows) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| deploymentVariables = append(deploymentVariables, deploymentVariable) | ||
| } | ||
| if err := rows.Err(); err != nil { | ||
| return nil, err | ||
| } | ||
| return deploymentVariables, nil | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Add godoc comment for exported function.
The exported function GetDeploymentVariables lacks documentation. Per the coding guidelines, exported functions should be documented to support generated documentation and improve developer ergonomics.
Apply this diff to add the documentation:
+// GetDeploymentVariables retrieves all deployment variables associated with
+// the specified workspace by joining through deployment and system tables.
+// Returns an empty slice if no variables are found for the workspace.
func GetDeploymentVariables(ctx context.Context, workspaceID string) ([]*pb.DeploymentVariable, error) {As per coding guidelines.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func GetDeploymentVariables(ctx context.Context, workspaceID string) ([]*pb.DeploymentVariable, error) { | |
| db, err := GetDB(ctx) | |
| if err != nil { | |
| return nil, err | |
| } | |
| defer db.Release() | |
| rows, err := db.Query(ctx, DEPLOYMENT_VARIABLE_SELECT_QUERY, workspaceID) | |
| if err != nil { | |
| return nil, err | |
| } | |
| defer rows.Close() | |
| deploymentVariables := make([]*pb.DeploymentVariable, 0) | |
| for rows.Next() { | |
| deploymentVariable, err := scanDeploymentVariable(rows) | |
| if err != nil { | |
| return nil, err | |
| } | |
| deploymentVariables = append(deploymentVariables, deploymentVariable) | |
| } | |
| if err := rows.Err(); err != nil { | |
| return nil, err | |
| } | |
| return deploymentVariables, nil | |
| } | |
| // GetDeploymentVariables retrieves all deployment variables associated with | |
| // the specified workspace by joining through deployment and system tables. | |
| // Returns an empty slice if no variables are found for the workspace. | |
| func GetDeploymentVariables(ctx context.Context, workspaceID string) ([]*pb.DeploymentVariable, error) { | |
| db, err := GetDB(ctx) | |
| if err != nil { | |
| return nil, err | |
| } | |
| defer db.Release() | |
| rows, err := db.Query(ctx, DEPLOYMENT_VARIABLE_SELECT_QUERY, workspaceID) | |
| if err != nil { | |
| return nil, err | |
| } | |
| defer rows.Close() | |
| deploymentVariables := make([]*pb.DeploymentVariable, 0) | |
| for rows.Next() { | |
| deploymentVariable, err := scanDeploymentVariable(rows) | |
| if err != nil { | |
| return nil, err | |
| } | |
| deploymentVariables = append(deploymentVariables, deploymentVariable) | |
| } | |
| if err := rows.Err(); err != nil { | |
| return nil, err | |
| } | |
| return deploymentVariables, nil | |
| } |
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/db/deployment_variables.go around lines 23 to 48,
add a godoc comment immediately above the exported GetDeploymentVariables
function that briefly describes what the function does, documents its parameters
(ctx and workspaceID) and its return values (slice of *pb.DeploymentVariable and
error), and any important behavior (e.g., it queries the DB for deployment
variables for the given workspace and returns an empty slice when none found).
Keep the comment concise, start with the function name ("GetDeploymentVariables
..."), and follow standard Go doc conventions.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
apps/workspace-engine/pkg/db/client.go (2)
27-27: Consider returning errors instead of calling log.Fatal.
log.Fatalimmediately exits the process, which prevents graceful shutdown and makes testing difficult. While this is acceptable for startup code where the database is a hard dependency, returning an error would allow the caller (likelymain()) to decide how to handle the failure and perform any necessary cleanup.Consider refactoring
GetPoolto return an error:-func GetPool(ctx context.Context) *pgxpool.Pool { +func GetPool(ctx context.Context) (*pgxpool.Pool, error) { + var initErr error once.Do(func() { postgresURL := viper.GetString("POSTGRES_URL") config, err := pgxpool.ParseConfig(postgresURL) if err != nil { - log.Fatal("Failed to parse database config:", err) + initErr = fmt.Errorf("parse database config: %w", err) + return } // ... (configuration code) ... pool, err = pgxpool.NewWithConfig(ctx, config) if err != nil { - log.Fatal("Failed to create database pool:", err) + initErr = fmt.Errorf("create database pool: %w", err) + return } }) + if initErr != nil { + return nil, initErr + } - return pool + return pool, nil }Then update
main.goto handle the error:pool, err := db.GetPool(ctx) if err != nil { log.Fatal("Failed to initialize database:", err) } defer db.Close()This centralizes the exit decision in
main()while making the code more testable.Also applies to: 46-46
54-56: Lazy initialization can hide startup failures.The lazy initialization pattern in
GetDBmeans that ifGetPoolwas never called explicitly and pool creation fails, the error won't surface until the firstGetDBcall. This delays failure detection and makes debugging harder.Consider removing the lazy initialization and requiring explicit
GetPool()calls during startup:func GetDB(ctx context.Context) (*pgxpool.Conn, error) { - if pool == nil { - GetPool(ctx) - } - conn, err := pool.Acquire(ctx) if err != nil { return nil, err } return conn, nil }This makes the initialization contract explicit and ensures failures happen during startup where they're easier to detect and handle.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
apps/workspace-engine/.air.toml(1 hunks)apps/workspace-engine/pkg/db/client.go(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
apps/workspace-engine/**/*.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
apps/workspace-engine/**/*.go: Do not add extraneous inline comments that state the obvious
Do not add comments that simply restate what the code does
Do not add comments for standard Go patterns (e.g., noting WaitGroup or semaphore usage)
Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods
Files:
apps/workspace-engine/pkg/db/client.go
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Typecheck
- GitHub Check: build (linux/amd64)
- GitHub Check: Lint
🔇 Additional comments (3)
apps/workspace-engine/.air.toml (1)
15-15: LGTM!The shell wrapper correctly loads environment variables from the repository root before starting the binary. The use of
set -a/+aandexecfollows shell best practices.apps/workspace-engine/pkg/db/client.go (2)
65-70: LGTM!The cleanup function correctly checks for nil before closing the pool.
38-38: Drop TracerProvider configuration warning The global TracerProvider is set in apps/workspace-engine/main.go (lines 89–96) before any call to GetPool, sootelpgx.NewTracer()will use the properly configured provider.Likely an incorrect or invalid review comment.
| // GetPool returns the singleton database connection pool | ||
| func GetPool(ctx context.Context) *pgxpool.Pool { | ||
| once.Do(func() { | ||
| postgresURL := viper.GetString("POSTGRES_URL") |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Unify configuration source across all database settings.
The code mixes viper.GetString (line 24) with os.Getenv (lines 30-34, 40-42) for database configuration. This inconsistency can lead to confusion about the expected configuration source and makes the code harder to maintain.
Apply this diff to use Viper consistently:
- if maxConns := os.Getenv("POSTGRES_MAX_POOL_SIZE"); maxConns != "" {
+ if maxConns := viper.GetString("POSTGRES_MAX_POOL_SIZE"); maxConns != "" {
if max, err := strconv.Atoi(maxConns); err == nil {
config.MaxConns = int32(max)
}
}
- if appName := os.Getenv("POSTGRES_APPLICATION_NAME"); appName != "" {
+ if appName := viper.GetString("POSTGRES_APPLICATION_NAME"); appName != "" {
config.ConnConfig.RuntimeParams["application_name"] = appName
}Also applies to: 30-34, 40-42
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/db/client.go around lines 24, 30-34 and 40-42, the
DB configuration currently mixes viper.GetString and os.Getenv; replace all
os.Getenv usages with the corresponding viper.GetString calls (or
viper.GetBool/int as appropriate) so all DB settings are read consistently from
Viper, and remove direct os.Getenv imports/usages; ensure default values or
required checks remain unchanged and update any error messages or logging to
reference the Viper-derived variables.
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (3)
apps/workspace-engine/pkg/events/handler/handler.go (1)
129-145: LGTM with an optional optimization suggestion.The dual-path workspace loading implementation is correct:
- Checks cache first for performance
- Falls back to DB load when workspace is not cached
- Caches the loaded workspace for subsequent events (addressing the previous review comment)
- Handles errors appropriately with tracing and logging
The implementation is safe and functionally correct.
Optional optimization: Concurrent events for the same uncached workspace will trigger duplicate DB loads. While not a correctness issue, you could optimize this with
sync.Map+sync.Oncepattern or a similar deduplication mechanism if DB load performance becomes a concern.Example optimization (if needed):
// Add a loading map to track in-flight loads type loadingEntry struct { once sync.Once ws *workspace.Workspace err error } var loadingWorkspaces sync.Map func getOrLoadWorkspace(ctx context.Context, workspaceID string) (*workspace.Workspace, error) { if workspace.Exists(workspaceID) { return workspace.GetWorkspace(workspaceID), nil } entry, _ := loadingWorkspaces.LoadOrStore(workspaceID, &loadingEntry{}) le := entry.(*loadingEntry) le.once.Do(func() { le.ws, le.err = db.LoadWorkspace(ctx, workspaceID) if le.err == nil { workspace.Set(workspaceID, le.ws) } // Clean up after load completes loadingWorkspaces.Delete(workspaceID) }) return le.ws, le.err }apps/workspace-engine/pkg/db/resources.go (1)
29-32: Redundant COALESCE for rm.key.The
COALESCE(rm.key, '')is unnecessary since theFILTER (WHERE rm.key IS NOT NULL)clause already ensuresrm.keycannot be NULL at this point. TheCOALESCE(rm.value, '')on line 32 is correct and should remain.Apply this diff to remove the redundant COALESCE:
COALESCE( json_object_agg( - COALESCE(rm.key, ''), + rm.key, COALESCE(rm.value, '') ) FILTER (WHERE rm.key IS NOT NULL), '{}'::json ) as metadataapps/workspace-engine/pkg/db/environments.go (1)
10-21: Consider adding ORDER BY for deterministic results.The query lacks an ORDER BY clause, so the order of returned environments is non-deterministic and may vary across executions or database versions.
Apply this diff to add ordering by creation time and ID:
const ENVIRONMENT_SELECT_QUERY = ` SELECT e.id, e.name, e.system_id, e.created_at, e.description, e.resource_selector FROM environment e INNER JOIN system s ON s.id = e.system_id WHERE s.workspace_id = $1 + ORDER BY e.created_at DESC, e.id ASC `
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (7)
apps/workspace-engine/pkg/db/deployment_variables.go(1 hunks)apps/workspace-engine/pkg/db/deployment_versions.go(1 hunks)apps/workspace-engine/pkg/db/environments.go(1 hunks)apps/workspace-engine/pkg/db/resources.go(1 hunks)apps/workspace-engine/pkg/events/handler/handler.go(3 hunks)apps/workspace-engine/pkg/workspace/releasemanager/evaluate.go(3 hunks)apps/workspace-engine/pkg/workspace/releasemanager/versionmanager/manager.go(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- apps/workspace-engine/pkg/db/deployment_versions.go
- apps/workspace-engine/pkg/db/deployment_variables.go
🧰 Additional context used
📓 Path-based instructions (1)
apps/workspace-engine/**/*.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
apps/workspace-engine/**/*.go: Do not add extraneous inline comments that state the obvious
Do not add comments that simply restate what the code does
Do not add comments for standard Go patterns (e.g., noting WaitGroup or semaphore usage)
Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods
Files:
apps/workspace-engine/pkg/db/resources.goapps/workspace-engine/pkg/db/environments.goapps/workspace-engine/pkg/workspace/releasemanager/versionmanager/manager.goapps/workspace-engine/pkg/events/handler/handler.goapps/workspace-engine/pkg/workspace/releasemanager/evaluate.go
🧬 Code graph analysis (5)
apps/workspace-engine/pkg/db/resources.go (1)
apps/workspace-engine/pkg/db/client.go (2)
GetDB(53-63)Close(66-70)
apps/workspace-engine/pkg/db/environments.go (1)
apps/workspace-engine/pkg/db/client.go (2)
GetDB(53-63)Close(66-70)
apps/workspace-engine/pkg/workspace/releasemanager/versionmanager/manager.go (6)
apps/workspace-engine/pkg/workspace/store/store.go (2)
Store(33-48)New(12-31)apps/workspace-engine/pkg/workspace/releasemanager/manager.go (2)
Manager(15-23)New(26-33)apps/workspace-engine/pkg/workspace/releasemanager/versionmanager/policymanager/policymanager.go (2)
Manager(23-27)New(30-35)apps/workspace-engine/pkg/pb/workspace.pb.go (6)
ReleaseTarget(1475-1484)ReleaseTarget(1497-1497)ReleaseTarget(1512-1514)DeploymentVersion(2091-2104)DeploymentVersion(2117-2117)DeploymentVersion(2132-2134)apps/workspace-engine/pkg/workspace/releasemanager/versionmanager/policymanager/decision.go (1)
DeployDecision(10-18)apps/workspace-engine/pkg/workspace/store/deployment_versions.go (1)
DeploymentVersions(16-21)
apps/workspace-engine/pkg/events/handler/handler.go (2)
apps/workspace-engine/pkg/workspace/workspace.go (4)
Workspace(24-29)Exists(102-105)GetWorkspace(111-118)Set(107-109)apps/workspace-engine/pkg/db/workspace_load.go (1)
LoadWorkspace(11-86)
apps/workspace-engine/pkg/workspace/releasemanager/evaluate.go (8)
apps/workspace-engine/pkg/workspace/releasemanager/versionmanager/manager.go (2)
New(19-24)Manager(13-17)apps/workspace-engine/pkg/workspace/releasemanager/manager.go (2)
New(26-33)Manager(15-23)apps/workspace-engine/pkg/workspace/store/repository/repo.go (1)
New(8-22)apps/workspace-engine/pkg/workspace/store/store.go (1)
New(12-31)apps/workspace-engine/pkg/workspace/releasemanager/variablemanager/variablemanager.go (2)
New(19-21)Manager(15-17)apps/workspace-engine/pkg/pb/workspace.pb.go (12)
Job(1335-1352)Job(1365-1365)Job(1380-1382)ReleaseTarget(1475-1484)ReleaseTarget(1497-1497)ReleaseTarget(1512-1514)Release(1697-1706)Release(1719-1719)Release(1734-1736)VariableValue(1551-1564)VariableValue(1577-1577)VariableValue(1592-1594)apps/workspace-engine/pkg/workspace/store/releases.go (1)
Releases(15-17)apps/workspace-engine/pkg/workspace/store/jobs.go (1)
Jobs(17-19)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Typecheck
- GitHub Check: Lint
🔇 Additional comments (6)
apps/workspace-engine/pkg/events/handler/handler.go (2)
8-8: LGTM!The import of the
dbpackage is necessary for the newLoadWorkspacecall and is correctly placed.
25-57: Verify event producers emit the new event types.The initial search for
resource.create/.update/.deleteand similar patterns returned no matches due to filtering—please confirm manually or re-run without exclusions:rg -nP --type=go -C3 '\b(resource|deployment|environment|system|job-agent|job|policy|user-approval-record)\.(create|update|delete)\b' rg -nP --type=go -C3 'kafka\.Producer|Produce\('apps/workspace-engine/pkg/db/resources.go (2)
43-68: LGTM! Error handling and resource management are correct.The function properly:
- Manages connection lifecycle with defer
- Uses parameterized queries to prevent SQL injection
- Handles iteration errors via
rows.Err()check (addressing the previous review comment)- Propagates all errors appropriately
71-113: LGTM! Row scanning and nullable field handling are correct.The function properly handles nullable database columns using
sql.NullStringforprovider_idand*time.Timefor optional timestamps, and delegates JSON parsing to focused helper functions.apps/workspace-engine/pkg/db/environments.go (2)
23-58: LGTM: Solid data accessor implementation.The function correctly manages the connection lifecycle with deferred Release(), uses parameterized queries to prevent SQL injection, properly closes rows, and checks rows.Err() for iteration errors (as addressed in the previous review).
40-46: No changes needed for null handling. Thedescriptionfield is a plain string (NULLs scan to""), andresource_selectoris a protobufStructpointer (NULLs scan tonil).
| GROUP BY r.id, r.version, r.name, r.kind, r.identifier, r.provider_id, r.workspace_id, r.config, r.created_at, r.locked_at, r.updated_at, r.deleted_at | ||
| ` | ||
|
|
||
| func GetResources(ctx context.Context, workspaceID string) ([]*pb.Resource, error) { |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Add documentation for exported function.
The exported function GetResources lacks a godoc comment, which is required by Go conventions.
Based on coding guidelines.
Add a godoc comment explaining the function's purpose and parameters:
+// GetResources retrieves all resources for the specified workspace, including
+// their aggregated metadata from the resource_metadata table.
func GetResources(ctx context.Context, workspaceID string) ([]*pb.Resource, error) {📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func GetResources(ctx context.Context, workspaceID string) ([]*pb.Resource, error) { | |
| // GetResources retrieves all resources for the specified workspace, including | |
| // their aggregated metadata from the resource_metadata table. | |
| func GetResources(ctx context.Context, workspaceID string) ([]*pb.Resource, error) { |
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/db/resources.go around line 42, the exported
function GetResources lacks a Go doc comment; add a godoc comment immediately
above the function that briefly describes what GetResources does, its parameters
(ctx context.Context, workspaceID string), and the return values
([]*pb.Resource, error), following Go conventions (start with "GetResources
...") and keeping it concise and sentence-cased.
| // Clone variables to avoid parent changes affecting this release | ||
| clonedVariables := make(map[string]*pb.VariableValue, len(variables)) | ||
| for k, v := range variables { | ||
| // Deep copy VariableValue (shallow copy is sufficient if VariableValue is immutable) | ||
| if v != nil { | ||
| // Assuming VariableValue is a proto.Message, use proto.Clone if available | ||
| clonedVariables[k] = v.ProtoReflect().Interface().(*pb.VariableValue) | ||
| } else { |
There was a problem hiding this comment.
Fix variable cloning to avoid shared pointers.
Calling ProtoReflect().Interface() on Line 106 just hands back the same *pb.VariableValue, so later mutations to the source map will still bleed into the stored release. Use proto.Clone to produce an actual deep copy.
@@
-import (
- "context"
- "fmt"
- "sync"
- "time"
- "workspace-engine/pkg/cmap"
- "workspace-engine/pkg/pb"
-
- "github.com/charmbracelet/log"
- "github.com/google/uuid"
- "go.opentelemetry.io/otel"
- "go.opentelemetry.io/otel/attribute"
- "go.opentelemetry.io/otel/trace"
- "google.golang.org/protobuf/types/known/structpb"
-)
+import (
+ "context"
+ "fmt"
+ "sync"
+ "time"
+ "workspace-engine/pkg/cmap"
+ "workspace-engine/pkg/pb"
+
+ "github.com/charmbracelet/log"
+ "github.com/google/uuid"
+ "go.opentelemetry.io/otel"
+ "go.opentelemetry.io/otel/attribute"
+ "go.opentelemetry.io/otel/trace"
+ "google.golang.org/protobuf/proto"
+ "google.golang.org/protobuf/types/known/structpb"
+)
@@
- if v != nil {
- // Assuming VariableValue is a proto.Message, use proto.Clone if available
- clonedVariables[k] = v.ProtoReflect().Interface().(*pb.VariableValue)
- } else {
- clonedVariables[k] = nil
- }
+ if v != nil {
+ clonedVariables[k] = proto.Clone(v).(*pb.VariableValue)
+ continue
+ }
+ clonedVariables[k] = nil📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Clone variables to avoid parent changes affecting this release | |
| clonedVariables := make(map[string]*pb.VariableValue, len(variables)) | |
| for k, v := range variables { | |
| // Deep copy VariableValue (shallow copy is sufficient if VariableValue is immutable) | |
| if v != nil { | |
| // Assuming VariableValue is a proto.Message, use proto.Clone if available | |
| clonedVariables[k] = v.ProtoReflect().Interface().(*pb.VariableValue) | |
| } else { | |
| // --- import block in apps/workspace-engine/pkg/workspace/releasemanager/evaluate.go --- | |
| import ( | |
| "context" | |
| "fmt" | |
| "sync" | |
| "time" | |
| "workspace-engine/pkg/cmap" | |
| "workspace-engine/pkg/pb" | |
| "github.com/charmbracelet/log" | |
| "github.com/google/uuid" | |
| "go.opentelemetry.io/otel" | |
| "go.opentelemetry.io/otel/attribute" | |
| "go.opentelemetry.io/otel/trace" | |
| "google.golang.org/protobuf/proto" | |
| "google.golang.org/protobuf/types/known/structpb" | |
| ) | |
| // --- around line 101 in Evaluate() --- | |
| // Clone variables to avoid parent changes affecting this release | |
| clonedVariables := make(map[string]*pb.VariableValue, len(variables)) | |
| for k, v := range variables { | |
| if v != nil { | |
| clonedVariables[k] = proto.Clone(v).(*pb.VariableValue) | |
| continue | |
| } | |
| clonedVariables[k] = nil | |
| } |
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/workspace/releasemanager/evaluate.go around lines
101 to 108, the current clone uses v.ProtoReflect().Interface() which returns
the same pointer and does not deep-copy VariableValue; replace that with
proto.Clone(v).(*pb.VariableValue) (after importing
"google.golang.org/protobuf/proto"), check for nil before cloning, and assign
the cloned pointer into clonedVariables so subsequent mutations to the original
map no longer affect the stored release.
| jobAgentConfig := jobAgent.GetConfig().AsMap() | ||
| jobAgentDeploymentConfig := deployment.GetJobAgentConfig().AsMap() | ||
|
|
||
| return &pb.Job{ | ||
| config := make(map[string]any) | ||
| DeepMerge(config, jobAgentDeploymentConfig) | ||
| DeepMerge(config, jobAgentConfig) | ||
|
|
There was a problem hiding this comment.
Protect against nil configs and let deployment overrides win.
deployment.GetJobAgentConfig() and jobAgent.GetConfig() can be nil; invoking .AsMap() on Line 185/186 will panic. Additionally, you currently merge deployment overrides first, then overwrite them with the agent defaults, so the more specific deployment config never takes effect.
- jobAgentConfig := jobAgent.GetConfig().AsMap()
- jobAgentDeploymentConfig := deployment.GetJobAgentConfig().AsMap()
-
- config := make(map[string]any)
- DeepMerge(config, jobAgentDeploymentConfig)
- DeepMerge(config, jobAgentConfig)
+ config := make(map[string]any)
+
+ if agentCfg := jobAgent.GetConfig(); agentCfg != nil {
+ DeepMerge(config, agentCfg.AsMap())
+ }
+
+ if deploymentCfg := deployment.GetJobAgentConfig(); deploymentCfg != nil {
+ DeepMerge(config, deploymentCfg.AsMap())
+ }🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/workspace/releasemanager/evaluate.go around lines
184 to 190, calling .AsMap() on deployment.GetJobAgentConfig() and
jobAgent.GetConfig() can panic if either is nil, and the current merge order
lets agent defaults overwrite deployment overrides; fix by first defensively
checking each config for nil before calling .AsMap() (treat nil as an empty
map), then merge agent defaults into config first and then merge the deployment
config so deployment overrides win; ensure DeepMerge is called with the maps in
that order and that no nil dereference occurs.
81aca67 to
91446b7
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
apps/workspace-engine/pkg/db/policies.go(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
apps/workspace-engine/**/*.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
apps/workspace-engine/**/*.go: Do not add extraneous inline comments that state the obvious
Do not add comments that simply restate what the code does
Do not add comments for standard Go patterns (e.g., noting WaitGroup or semaphore usage)
Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods
Files:
apps/workspace-engine/pkg/db/policies.go
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Lint
🔇 Additional comments (1)
apps/workspace-engine/pkg/db/policies.go (1)
41-48: LGTM - scaffolding follows the intended pattern.The stub implementation with commented DB access code appropriately establishes the function signature and demonstrates the intended integration pattern with the DB singleton.
| const POLICY_SELECT_QUERY = ` | ||
| SELECT | ||
| p.id, | ||
| p.name, | ||
| p.description, | ||
| p.created_at, | ||
| p.workspace_id, | ||
|
|
||
| COALESCE( | ||
| json_agg( | ||
| json_build_object( | ||
| 'id', pt.id, | ||
| 'type', pt.type, | ||
| 'value', pt.value | ||
| ) | ||
| ) FILTER (WHERE pt.id IS NOT NULL), | ||
| '[]' | ||
| ) AS targets | ||
|
|
||
| , COALESCE( | ||
| json_build_object( | ||
| 'id', pra.id, | ||
| 'requiredApprovalsCount', pra.required_approvals_count | ||
| ), | ||
| '{}' | ||
| ) AS any_approval | ||
|
|
||
| FROM policy p | ||
| LEFT JOIN policy_target pt ON pt.policy_id = p.id | ||
| LEFT JOIN policy_rule_any_approval pra ON pra.policy_id = p.id | ||
| WHERE p.workspace_id = $1 | ||
| ` |
There was a problem hiding this comment.
Missing GROUP BY clause will produce incorrect results.
The query uses json_agg to aggregate policy targets but lacks a GROUP BY clause. When multiple policy_target rows exist for a single policy, the query will return multiple result rows (one per target) instead of one row per policy with an aggregated targets array.
Apply this diff to add the required GROUP BY clause:
LEFT JOIN policy_target pt ON pt.policy_id = p.id
LEFT JOIN policy_rule_any_approval pra ON pra.policy_id = p.id
WHERE p.workspace_id = $1
+ GROUP BY p.id, pra.id
`If pra.id can have multiple rows per policy, the any_approval object should also use aggregation instead of json_build_object.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const POLICY_SELECT_QUERY = ` | |
| SELECT | |
| p.id, | |
| p.name, | |
| p.description, | |
| p.created_at, | |
| p.workspace_id, | |
| COALESCE( | |
| json_agg( | |
| json_build_object( | |
| 'id', pt.id, | |
| 'type', pt.type, | |
| 'value', pt.value | |
| ) | |
| ) FILTER (WHERE pt.id IS NOT NULL), | |
| '[]' | |
| ) AS targets | |
| , COALESCE( | |
| json_build_object( | |
| 'id', pra.id, | |
| 'requiredApprovalsCount', pra.required_approvals_count | |
| ), | |
| '{}' | |
| ) AS any_approval | |
| FROM policy p | |
| LEFT JOIN policy_target pt ON pt.policy_id = p.id | |
| LEFT JOIN policy_rule_any_approval pra ON pra.policy_id = p.id | |
| WHERE p.workspace_id = $1 | |
| ` | |
| const POLICY_SELECT_QUERY = ` | |
| SELECT | |
| p.id, | |
| p.name, | |
| p.description, | |
| p.created_at, | |
| p.workspace_id, | |
| COALESCE( | |
| json_agg( | |
| json_build_object( | |
| 'id', pt.id, | |
| 'type', pt.type, | |
| 'value', pt.value | |
| ) | |
| ) FILTER (WHERE pt.id IS NOT NULL), | |
| '[]' | |
| ) AS targets | |
| , COALESCE( | |
| json_build_object( | |
| 'id', pra.id, | |
| 'requiredApprovalsCount', pra.required_approvals_count | |
| ), | |
| '{}' | |
| ) AS any_approval | |
| FROM policy p | |
| LEFT JOIN policy_target pt ON pt.policy_id = p.id | |
| LEFT JOIN policy_rule_any_approval pra ON pra.policy_id = p.id | |
| WHERE p.workspace_id = $1 | |
| GROUP BY p.id, pra.id | |
| ` |
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/db/policies.go around lines 8 to 39, the SELECT
aggregates policy_target into json_agg but lacks a GROUP BY, causing one row per
target instead of one row per policy; add a GROUP BY on all non-aggregated p.*
columns (p.id, p.name, p.description, p.created_at, p.workspace_id) and any
non-aggregated pra columns you keep, and if policy_rule_any_approval (pra) can
have multiple rows per policy, change its construction to an aggregation (e.g.,
json_agg or jsonb_agg with FILTER or use MAX/MIN inside json_build_object) so
any_approval is a single JSON value per policy; ensure LEFT JOINs remain and
adjust grouping to include pra columns or replace pra fields with an aggregate
expression.
91446b7 to
a0b8730
Compare
31ffa07 to
91446b7
Compare
Summary by CodeRabbit
New Features
Refactor
Chores