chore: in memory deployment resource selector#667
Conversation
WalkthroughIntroduces an InMemoryDeploymentResourceSelector that precomputes and persists deployment↔resource matches, updates the pipeline to upsert/remove deployments and resources via repository before updating selectors, and refactors Workspace to accept an injected SelectorManager built with the new in-memory selector. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant App
participant WorkspaceLoader as Workspace.load()
participant InMemSel as InMemoryDeploymentResourceSelector
participant DB
App->>WorkspaceLoader: load(workspaceId)
WorkspaceLoader->>DB: fetch deployments, resources (+metadata)
DB-->>WorkspaceLoader: datasets
WorkspaceLoader->>InMemSel: InMemoryDeploymentResourceSelector.create(id)
InMemSel->>InMemSel: build maps & precompute matches
InMemSel->>DB: insert initial computedDeploymentResource rows
DB-->>InMemSel: ack
WorkspaceLoader->>WorkspaceLoader: assemble SelectorManager (in-memory + DB selectors)
WorkspaceLoader-->>App: new Workspace({ id, selectorManager })
note over InMemSel,DB: persisted rows reflect precomputed matches
sequenceDiagram
autonumber
participant Pipeline as OperationPipeline
participant Repo as Repository
participant SelMgr as SelectorManager
participant InMemSel as InMemoryDeploymentResourceSelector
participant DB as DB
rect rgb(235,245,255)
note right of Pipeline: Upsert Deployment
Pipeline->>Repo: getById / create or update(deployment)
Repo-->>Pipeline: upserted
Pipeline->>SelMgr: upsertDeployment(deployment)
SelMgr->>InMemSel: upsertSelector(deployment)
InMemSel->>DB: sync computedDeploymentResource (+/-)
DB-->>InMemSel: ack
end
rect rgb(255,240,235)
note right of Pipeline: Remove Deployment
Pipeline->>Repo: delete(deployment.id)
Repo-->>Pipeline: deleted
Pipeline->>SelMgr: removeDeployment(deployment)
SelMgr->>InMemSel: removeSelector(deployment)
InMemSel->>DB: delete matching computedDeploymentResource rows
DB-->>InMemSel: ack
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests
📜 Recent review detailsConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Disabled knowledge base sources:
📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
apps/event-queue/src/workspace/workspace.ts (1)
110-118: Bug: getOrLoad may return undefined after load.You set and then return the pre-existing
workspacevariable, not the newly loaded instance.static async getOrLoad(id: string) { - const workspace = WorkspaceManager.get(id); - if (!workspace) { - const ws = await Workspace.load(id); - WorkspaceManager.set(id, ws); - } - - return workspace; + const existing = WorkspaceManager.get(id); + if (existing) return existing; + const ws = await Workspace.load(id); + WorkspaceManager.set(id, ws); + return ws; }
🧹 Nitpick comments (8)
apps/event-queue/src/selector/in-memory/deployment-resource.ts (6)
30-56: Startup complexity is O(#resources × #deployments). Consider large-workspace safeguards.For very large workspaces, this nested precompute may be heavy. Consider chunked initialization, pre-grouping selectors by simple predicates (e.g., kind/provider), or gating with metrics.
58-61: Expose a read-only view (or keep internal) to prevent accidental mutation.Returning the internal Map by reference allows external mutation. Make the getter private or return a defensive copy.
- get selectorMatches() { - return this.matches; - } + private get selectorMatches(): ReadonlyMap<string, ReadonlySet<string>> { + return new Map( + Array.from(this.matches.entries()).map(([k, v]) => [k, new Set(v)]), + ); + }
94-103: Filter out soft-deleted deployments/systems in initial selector load.Without filtering, deleted deployments may be re-materialized in computed pairs.
- const allSelectors = await dbClient - .select() - .from(schema.deployment) - .innerJoin( - schema.system, - eq(schema.deployment.systemId, schema.system.id), - ) - .where(eq(schema.system.workspaceId, workspaceId)) - .then((results) => results.map((result) => result.deployment)); + const allSelectors = await dbClient + .select() + .from(schema.deployment) + .innerJoin( + schema.system, + eq(schema.deployment.systemId, schema.system.id), + ) + .where( + and( + eq(schema.system.workspaceId, workspaceId), + isNull(schema.deployment.deletedAt), + isNull(schema.system.deletedAt), + ), + ) + .then((results) => results.map((result) => result.deployment));
126-185: Make entity upsert atomic to avoid divergence on partial failures.Wrap delete/insert into a transaction and use the txn handle.
- async upsertEntity(entity: FullResource): Promise<void> { + async upsertEntity(entity: FullResource): Promise<void> { if (this.matches.get(entity.id) == null) this.matches.set(entity.id, new Set()); this.entities.set(entity.id, entity); @@ - if (unmatchedDeployments.length > 0) - await dbClient + await dbClient.transaction(async (tx) => { + if (unmatchedDeployments.length > 0) + await tx .delete(schema.computedDeploymentResource) .where( and( eq(schema.computedDeploymentResource.resourceId, entity.id), inArray( schema.computedDeploymentResource.deploymentId, unmatchedDeployments, ), ), ); - - await Promise.all( - newlyMatchedDeployments.map(async (deploymentId) => { - try { - await dbClient - .insert(schema.computedDeploymentResource) - .values({ resourceId: entity.id, deploymentId }) - .onConflictDoNothing(); - } catch (e) { - log.error("Error inserting computed deployment resource for entity", { - error: e instanceof Error ? e.message : String(e), - resourceId: entity.id, - deploymentId, - }); - } - }), - ); + await Promise.all( + newlyMatchedDeployments.map(async (deploymentId) => { + try { + await tx + .insert(schema.computedDeploymentResource) + .values({ resourceId: entity.id, deploymentId }) + .onConflictDoNothing(); + } catch (e) { + log.error( + "Error inserting computed deployment resource for entity", + { + error: e instanceof Error ? e.message : String(e), + resourceId: entity.id, + deploymentId, + }, + ); + } + }), + ); + });
194-252: Apply the same transaction pattern for selector upsert.Mirror the transactional update used in entity upsert to keep DB and in-memory views aligned.
62-124: Memory/load operational note.Loading all resources+metadata and all deployments at once may spike memory/CPU on very large workspaces. Add telemetry (counts, timings) and consider paging for metadata fetch.
apps/event-queue/src/workspace/workspace.ts (2)
55-58: Minor: unnecessary Promise.resolve in async function.
return wsis sufficient inside an async method.- return Promise.resolve(ws); + return ws;
38-55: Add basic startup telemetry.Log counts/timings for selector initialization to aid capacity planning and regressions after deploys.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (3)
apps/event-queue/src/selector/in-memory/deployment-resource.ts(1 hunks)apps/event-queue/src/workspace/pipeline.ts(4 hunks)apps/event-queue/src/workspace/workspace.ts(2 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.{ts,tsx}
📄 CodeRabbit inference engine (CLAUDE.md)
**/*.{ts,tsx}: Use TypeScript with explicit types (prefer interfaces for public APIs)
Import styles: Use named imports, group imports by source (std lib > external > internal)
Consistent type imports:import type { Type } from "module"
Prefer async/await over raw promises
Handle errors explicitly (use try/catch and typed error responses)
Files:
apps/event-queue/src/workspace/pipeline.tsapps/event-queue/src/selector/in-memory/deployment-resource.tsapps/event-queue/src/workspace/workspace.ts
⚙️ CodeRabbit configuration file
**/*.{ts,tsx}: Note on Error Handling:
Avoid strict enforcement of try/catch blocks. Code may use early returns, Promise chains (.then().catch()), or other patterns for error handling. These are acceptable as long as they maintain clarity and predictability.
Files:
apps/event-queue/src/workspace/pipeline.tsapps/event-queue/src/selector/in-memory/deployment-resource.tsapps/event-queue/src/workspace/workspace.ts
**/*.{js,jsx,ts,tsx,json,md,yml,yaml}
📄 CodeRabbit inference engine (CLAUDE.md)
Formatting: Prettier is used with
@ctrlplane/prettier-config
Files:
apps/event-queue/src/workspace/pipeline.tsapps/event-queue/src/selector/in-memory/deployment-resource.tsapps/event-queue/src/workspace/workspace.ts
🧠 Learnings (1)
📚 Learning: 2024-10-30T23:10:58.869Z
Learnt from: adityachoudhari26
PR: ctrlplanedev/ctrlplane#187
File: apps/jobs/src/ephemeral-env-checker/index.ts:57-0
Timestamp: 2024-10-30T23:10:58.869Z
Learning: In the codebase, deployments are decoupled from environments. When deleting environments (e.g., in `apps/jobs/src/ephemeral-env-checker/index.ts`), associated deployments should not be deleted.
Applied to files:
apps/event-queue/src/workspace/pipeline.ts
🧬 Code graph analysis (3)
apps/event-queue/src/workspace/pipeline.ts (1)
packages/events/src/kafka/events.ts (1)
FullResource(62-64)
apps/event-queue/src/selector/in-memory/deployment-resource.ts (4)
packages/logger/src/index.ts (1)
logger(48-48)packages/events/src/kafka/events.ts (1)
FullResource(62-64)apps/event-queue/src/selector/in-memory/resource-match.ts (1)
resourceMatchesSelector(32-85)apps/event-queue/src/selector/selector.ts (1)
Selector(11-25)
apps/event-queue/src/workspace/workspace.ts (2)
apps/event-queue/src/selector/selector.ts (2)
SelectorManager(37-113)deploymentResourceSelector(52-54)apps/event-queue/src/selector/in-memory/deployment-resource.ts (1)
InMemoryDeploymentResourceSelector(30-294)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: build (linux/amd64)
- GitHub Check: Typecheck
- GitHub Check: Lint
🔇 Additional comments (10)
apps/event-queue/src/workspace/pipeline.ts (6)
342-347: Deletion order is correct; maintains env↔deployment decoupling.Delete-from-repo then remove-from-selector aligns with prior guidance that environment deletion must not cascade to deployments. No change requested.
475-475: Switch to upsertDeployment: good.This ensures repository state and selector state stay in sync on updates.
501-501: Switch to removeDeployment: good.Consistent with the new helpers and selector sync.
350-356: ResourceRepository persists metadata — no change requiredDbResourceRepository.create/update explicitly insert/upsert and delete resource metadata (apps/event-queue/src/repository/db-resource-repository.ts), so the pipeline's get → create/update flow will persist FullResource.metadata.
379-380: Resource delete: FK cascade confirmed — duplicate deletes are benign.
Migrations show computed_* tables (e.g. computed_deployment_resource, deployment_selector_computed_resource, environment_selector_computed_resource) have FOREIGN KEY(resource_id) REFERENCES public.resource(id) ON DELETE cascade (see packages/db/drizzle/0090_organic_sinister_six.sql and 0089_robust_diamondback.sql). selectorManager.removeResource also deletes computed rows, so both paths may touch the same rows but are idempotent — no change required unless you want to avoid redundant work.
325-340: Confirm soft-delete semantics and update idempotency for deployment upsert
- Verify the implementation behind opts.workspace.repository.deploymentRepository.get/create/update (not found in this run). Confirm whether update revives soft-deleted deployments or if revive requires create, and that update is idempotent for no-op changes (no unintended side effects or timestamp updates).
- Check call sites: apps/event-queue/src/workspace/pipeline.ts (upsertDeployment, ~lines 325–337) and apps/event-queue/src/job-manager/job-manager.ts (get usage).
apps/event-queue/src/selector/in-memory/deployment-resource.ts (1)
22-28: Match-all behavior is correct.Null
resourceSelector⇒ match-all is consistent with prior semantics.apps/event-queue/src/workspace/workspace.ts (3)
38-55: In-memory selector integration looks good.Factory-based creation isolates the initial sync and simplifies DI into SelectorManager.
67-67: Constructor uses injected selectorManager as intended.Good DI; reduces hidden side effects during construction.
31-34: Constructor API change: ensure all call sites pass selectorManager.This is a breaking change. Please verify no external
new Workspace({ id })calls remain.
| const matches = inMemoryDeploymentResourceSelector.selectorMatches; | ||
|
|
||
| const computed: { resourceId: string; deploymentId: string }[] = []; | ||
| for (const [resourceId, deploymentIds] of matches) | ||
| for (const deploymentId of deploymentIds) | ||
| computed.push({ resourceId, deploymentId }); | ||
|
|
||
| if (computed.length > 0) | ||
| await dbClient | ||
| .insert(schema.computedDeploymentResource) | ||
| .values(computed) | ||
| .onConflictDoNothing(); | ||
|
|
||
| return inMemoryDeploymentResourceSelector; | ||
| } |
There was a problem hiding this comment.
Initial load inserts but never prunes stale computed pairs; add cleanup to avoid drift.
If the table already contains pairs from a prior run, they persist indefinitely. Prune rows not in the newly computed set for this workspace.
const matches = inMemoryDeploymentResourceSelector.selectorMatches;
@@
if (computed.length > 0)
await dbClient
.insert(schema.computedDeploymentResource)
.values(computed)
.onConflictDoNothing();
+
+ // Prune stale pairs for this workspace
+ const existing = await dbClient
+ .select({
+ resourceId: schema.computedDeploymentResource.resourceId,
+ deploymentId: schema.computedDeploymentResource.deploymentId,
+ })
+ .from(schema.computedDeploymentResource)
+ .innerJoin(
+ schema.resource,
+ eq(schema.computedDeploymentResource.resourceId, schema.resource.id),
+ )
+ .innerJoin(
+ schema.deployment,
+ eq(
+ schema.computedDeploymentResource.deploymentId,
+ schema.deployment.id,
+ ),
+ )
+ .innerJoin(
+ schema.system,
+ eq(schema.deployment.systemId, schema.system.id),
+ )
+ .where(
+ and(
+ eq(schema.resource.workspaceId, workspaceId),
+ eq(schema.system.workspaceId, workspaceId),
+ isNull(schema.resource.deletedAt),
+ isNull(schema.deployment.deletedAt),
+ isNull(schema.system.deletedAt),
+ ),
+ );
+
+ const desired = new Set(
+ computed.map((p) => `${p.resourceId}:${p.deploymentId}`),
+ );
+ const stale = existing.filter(
+ (p) => !desired.has(`${p.resourceId}:${p.deploymentId}`),
+ );
+ if (stale.length > 0) {
+ await Promise.all(
+ stale.map((p) =>
+ dbClient
+ .delete(schema.computedDeploymentResource)
+ .where(
+ and(
+ eq(schema.computedDeploymentResource.resourceId, p.resourceId),
+ eq(
+ schema.computedDeploymentResource.deploymentId,
+ p.deploymentId,
+ ),
+ ),
+ ),
+ ),
+ );
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const matches = inMemoryDeploymentResourceSelector.selectorMatches; | |
| const computed: { resourceId: string; deploymentId: string }[] = []; | |
| for (const [resourceId, deploymentIds] of matches) | |
| for (const deploymentId of deploymentIds) | |
| computed.push({ resourceId, deploymentId }); | |
| if (computed.length > 0) | |
| await dbClient | |
| .insert(schema.computedDeploymentResource) | |
| .values(computed) | |
| .onConflictDoNothing(); | |
| return inMemoryDeploymentResourceSelector; | |
| } | |
| const matches = inMemoryDeploymentResourceSelector.selectorMatches; | |
| const computed: { resourceId: string; deploymentId: string }[] = []; | |
| for (const [resourceId, deploymentIds] of matches) | |
| for (const deploymentId of deploymentIds) | |
| computed.push({ resourceId, deploymentId }); | |
| if (computed.length > 0) | |
| await dbClient | |
| .insert(schema.computedDeploymentResource) | |
| .values(computed) | |
| .onConflictDoNothing(); | |
| // Prune stale pairs for this workspace | |
| const existing = await dbClient | |
| .select({ | |
| resourceId: schema.computedDeploymentResource.resourceId, | |
| deploymentId: schema.computedDeploymentResource.deploymentId, | |
| }) | |
| .from(schema.computedDeploymentResource) | |
| .innerJoin( | |
| schema.resource, | |
| eq(schema.computedDeploymentResource.resourceId, schema.resource.id), | |
| ) | |
| .innerJoin( | |
| schema.deployment, | |
| eq( | |
| schema.computedDeploymentResource.deploymentId, | |
| schema.deployment.id, | |
| ), | |
| ) | |
| .innerJoin( | |
| schema.system, | |
| eq(schema.deployment.systemId, schema.system.id), | |
| ) | |
| .where( | |
| and( | |
| eq(schema.resource.workspaceId, workspaceId), | |
| eq(schema.system.workspaceId, workspaceId), | |
| isNull(schema.resource.deletedAt), | |
| isNull(schema.deployment.deletedAt), | |
| isNull(schema.system.deletedAt), | |
| ), | |
| ); | |
| const desired = new Set( | |
| computed.map((p) => `${p.resourceId}:${p.deploymentId}`), | |
| ); | |
| const stale = existing.filter( | |
| (p) => !desired.has(`${p.resourceId}:${p.deploymentId}`), | |
| ); | |
| if (stale.length > 0) { | |
| await Promise.all( | |
| stale.map((p) => | |
| dbClient | |
| .delete(schema.computedDeploymentResource) | |
| .where( | |
| and( | |
| eq(schema.computedDeploymentResource.resourceId, p.resourceId), | |
| eq( | |
| schema.computedDeploymentResource.deploymentId, | |
| p.deploymentId, | |
| ), | |
| ), | |
| ), | |
| ), | |
| ); | |
| } | |
| return inMemoryDeploymentResourceSelector; | |
| } |
Summary by CodeRabbit
Refactor
Bug Fixes