refactor: move dispatch hooks into worker#535
Conversation
WalkthroughThis set of changes refactors the deletion and event-handling workflows for deployments, environments, and resources to use asynchronous job queues rather than immediate, inline logic. New workers are introduced to handle deletion events for deployments, environments, and release targets, with dedicated channels and deduplication. The code removes direct event dispatching and complex selector-difference logic from API routes and worker functions, delegating these responsibilities to the new queue-driven workers. The event types and channel mappings are updated to support these changes, and related triggers and resource deletion utilities are removed or refactored to align with the new architecture. Changes
Sequence Diagram(s)sequenceDiagram
participant API
participant Queue
participant Worker
participant DB
participant EventHandler
API->>Queue: Enqueue DeleteDeployment/DeleteEnvironment/DeleteResource job
Queue->>Worker: Deliver job (with deduplication)
Worker->>DB: Begin transaction, lock rows, delete entity
Worker->>DB: Fetch associated ReleaseTargets
Worker->>DB: Commit transaction
Worker->>Queue: Enqueue DeletedReleaseTarget jobs for each deleted target
Queue->>Worker: Deliver DeletedReleaseTarget job
Worker->>DB: Fetch deployment and resource by IDs
Worker->>EventHandler: Dispatch DeploymentResourceRemoved event
Possibly related PRs
Suggested reviewers
Poem
📜 Recent review detailsConfiguration used: .coderabbit.yaml 📒 Files selected for processing (8)
✅ Files skipped from review due to trivial changes (1)
🚧 Files skipped from review as they are similar to previous changes (2)
🧰 Additional context used📓 Path-based instructions (1)`**/*.{ts,tsx}`: **Note on Error Handling:** Avoid strict enforcement of try/catch blocks. Code may use early returns, Promise chains (.then().catch()), or other patterns for error...
🧬 Code Graph Analysis (1)apps/webservice/src/app/api/v1/environments/[environmentId]/route.ts (1)
⏰ Context from checks skipped due to timeout of 90000ms (4)
🔇 Additional comments (16)
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
There was a problem hiding this comment.
Actionable comments posted: 5
🔭 Outside diff range comments (1)
packages/api/src/router/environment.ts (1)
279-282: 🛠️ Refactor suggestionQueue write should be awaited for consistency with the surrounding code
In
createyouawait getQueue(...).add(...), but here the promise is fire-and-forgot.
If the Redis write fails, the API will still report success and the update event will never be processed.- getQueue(Channel.UpdateEnvironment).add(input.id, { + await getQueue(Channel.UpdateEnvironment).add(input.id, { ...updatedEnv, oldSelector: oldEnv.environment.resourceSelector, });This keeps behaviour consistent with the
createmutation and surfaces enqueue errors to the client.
🧹 Nitpick comments (6)
packages/events/src/types.ts (1)
30-31: Consider extracting a sharedIdPayloadtype for DRYness
DeleteDeploymentandDeleteEnvironmentnow share the exact{ id: string }payload shape. Extracting a small alias reduces duplication and makes future refactors (e.g., switching to ULIDs) one-liner changes.+type IdPayload = { id: string }; … - [Channel.DeleteDeployment]: { id: string }; - [Channel.DeleteEnvironment]: { id: string }; + [Channel.DeleteDeployment]: IdPayload; + [Channel.DeleteEnvironment]: IdPayload;apps/event-worker/src/workers/delete-deployment.ts (2)
21-24: Variable shadowing slightly harms readability
const releaseTargets = …inside the transaction shadows the outer variable of the same name.
Although technically correct, it makes the control flow harder to scan.- const releaseTargets = await tx.query.releaseTarget.findMany({ + const targets = await tx.query.releaseTarget.findMany({ … - return releaseTargets; + return targets;
32-35: Bulk-enqueue for better throughput
for … getQueue().add()issues one Redis round-trip per release target.
BullMQ supportsaddBulk, which can reduce latency and connection pressure:- for (const rt of releaseTargets) - getQueue(Channel.DeletedReleaseTarget).add(rt.id, rt, { - deduplication: { id: rt.id }, - }); + await getQueue(Channel.DeletedReleaseTarget).addBulk( + releaseTargets.map((rt) => ({ + name: rt.id, + data: rt, + opts: { deduplication: { id: rt.id } }, + })), + );apps/event-worker/src/workers/delete-environment.ts (2)
21-24: Avoid inner/outerreleaseTargetsshadowingRename the inner constant to keep scopes clear (same rationale as in
delete-deployment.ts).
32-35: Switch toaddBulkfor efficiencyIdentical to the deployment worker: batching improves Redis throughput.
packages/api/src/router/environment.ts (1)
295-300: Consider explicitawaitor a simple ACK response for delete mutation
mutation(({ input }) => getQueue(...).add(...))returns the raw BullMQJobpromise.
If the client only needs confirmation that the request was accepted, returning{ ok: true }(afterawait) or awaiting and discarding the job object may be clearer and avoids leaking internal queue details.No change is strictly required, but aligning this with the
create/updatepatterns would improve consistency.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (20)
apps/event-worker/src/workers/compute-systems-release-targets.ts(3 hunks)apps/event-worker/src/workers/delete-deployment.ts(1 hunks)apps/event-worker/src/workers/delete-environment.ts(1 hunks)apps/event-worker/src/workers/delete-resource.ts(2 hunks)apps/event-worker/src/workers/deleted-release-target.ts(1 hunks)apps/event-worker/src/workers/index.ts(2 hunks)apps/event-worker/src/workers/update-deployment.ts(1 hunks)apps/event-worker/src/workers/update-environment.ts(0 hunks)apps/event-worker/src/workers/updated-resources/index.ts(0 hunks)apps/pty-proxy/src/controller/sockets.ts(2 hunks)packages/api/src/router/deployment.ts(2 hunks)packages/api/src/router/environment.ts(3 hunks)packages/events/src/types.ts(2 hunks)packages/job-dispatch/src/deployment-update.ts(0 hunks)packages/job-dispatch/src/events/handlers/resource-removed.ts(2 hunks)packages/job-dispatch/src/events/triggers/deployment-removed.ts(2 hunks)packages/job-dispatch/src/events/triggers/environment-deleted.ts(0 hunks)packages/job-dispatch/src/events/triggers/index.ts(0 hunks)packages/job-dispatch/src/events/triggers/resource-deleted.ts(0 hunks)packages/job-dispatch/src/resource/delete.ts(0 hunks)
💤 Files with no reviewable changes (7)
- apps/event-worker/src/workers/updated-resources/index.ts
- packages/job-dispatch/src/events/triggers/index.ts
- apps/event-worker/src/workers/update-environment.ts
- packages/job-dispatch/src/events/triggers/resource-deleted.ts
- packages/job-dispatch/src/deployment-update.ts
- packages/job-dispatch/src/resource/delete.ts
- packages/job-dispatch/src/events/triggers/environment-deleted.ts
🧰 Additional context used
📓 Path-based instructions (1)
`**/*.{ts,tsx}`: **Note on Error Handling:** Avoid strict enforcement of try/catch blocks. Code may use early returns, Promise chains (.then().catch()), or other patterns for error...
**/*.{ts,tsx}: Note on Error Handling:
Avoid strict enforcement of try/catch blocks. Code may use early returns, Promise chains (.then().catch()), or other patterns for error handling. These are acceptable as long as they maintain clarity and predictability.
packages/job-dispatch/src/events/triggers/deployment-removed.tspackages/job-dispatch/src/events/handlers/resource-removed.tspackages/api/src/router/deployment.tsapps/pty-proxy/src/controller/sockets.tsapps/event-worker/src/workers/index.tsapps/event-worker/src/workers/compute-systems-release-targets.tspackages/events/src/types.tsapps/event-worker/src/workers/update-deployment.tsapps/event-worker/src/workers/delete-environment.tspackages/api/src/router/environment.tsapps/event-worker/src/workers/delete-resource.tsapps/event-worker/src/workers/deleted-release-target.tsapps/event-worker/src/workers/delete-deployment.ts
🧬 Code Graph Analysis (7)
packages/api/src/router/deployment.ts (1)
packages/events/src/index.ts (1)
getQueue(28-34)
apps/event-worker/src/workers/index.ts (3)
apps/event-worker/src/workers/delete-deployment.ts (1)
deleteDeploymentWorker(6-46)apps/event-worker/src/workers/delete-environment.ts (1)
deleteEnvironmentWorker(6-46)apps/event-worker/src/workers/deleted-release-target.ts (1)
deletedReleaseTargetWorker(16-46)
apps/event-worker/src/workers/compute-systems-release-targets.ts (3)
packages/db/src/client.ts (1)
db(15-15)packages/events/src/index.ts (1)
getQueue(28-34)apps/event-worker/src/utils/dispatch-evaluate-jobs.ts (1)
dispatchEvaluateJobs(5-11)
packages/api/src/router/environment.ts (1)
packages/events/src/index.ts (1)
getQueue(28-34)
apps/event-worker/src/workers/delete-resource.ts (1)
packages/events/src/index.ts (1)
getQueue(28-34)
apps/event-worker/src/workers/deleted-release-target.ts (7)
packages/logger/src/index.ts (1)
logger(48-48)packages/events/src/index.ts (1)
createWorker(10-25)packages/db/src/schema/release.ts (1)
releaseTarget(20-42)packages/db/src/schema/resource.ts (1)
resource(59-87)packages/db/src/client.ts (1)
db(15-15)packages/db/src/common.ts (1)
takeFirst(9-13)packages/job-dispatch/src/events/index.ts (1)
handleEvent(8-9)
apps/event-worker/src/workers/delete-deployment.ts (2)
packages/events/src/index.ts (2)
createWorker(10-25)getQueue(28-34)packages/db/src/client.ts (1)
db(15-15)
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: Typecheck
- GitHub Check: Lint
- GitHub Check: build (linux/amd64)
- GitHub Check: build (linux/amd64)
🔇 Additional comments (19)
packages/job-dispatch/src/events/triggers/deployment-removed.ts (2)
9-9: Good addition of HookAction import for standardization.Adding the import for
HookActionenum from@ctrlplane/validators/eventsis a good practice to standardize event action names.
42-42: Improved type safety using enum instead of string literal.Replacing the string literal with
HookAction.DeploymentResourceRemovedenum value enhances type safety and maintainability. This change is consistent with the broader refactoring to centralize event action handling.packages/job-dispatch/src/events/handlers/resource-removed.ts (2)
6-6: Good addition of HookAction import for standardization.Adding the import for
HookActionenum from@ctrlplane/validators/eventsaligns with best practices for type safety.
16-16: Improved type safety using enum instead of string literal.Replacing the string literal with
HookAction.DeploymentResourceRemovedenum value enhances type safety and maintainability. This change is consistent with the broader refactoring effort to centralize event handling.apps/pty-proxy/src/controller/sockets.ts (2)
6-6: Good import addition for queue-based operation.Adding imports for
ChannelandgetQueuefrom@ctrlplane/eventssupports the transition to queue-based asynchronous processing.
33-39: Well-implemented transition to queue-based resource deletion.The code effectively transitions from direct resource deletion to an asynchronous queue-based approach, which aligns with the broader architectural refactoring. The use of resource ID as a deduplication key is a good practice to prevent duplicate deletion jobs.
This change maintains the same functionality while improving the system's architecture and scalability.
apps/event-worker/src/workers/index.ts (2)
10-13: Good addition of new worker imports.Adding imports for the new worker modules (
deleteDeploymentWorker,deleteEnvironmentWorker, anddeletedReleaseTargetWorker) is consistent with the refactoring to move deletion logic into dedicated workers.
52-54: Well-structured registration of new workers.The workers are properly registered with their corresponding channels in the exported
workersobject. This enables the system to process the new event types and maintains consistency with the existing pattern.Based on the provided snippets of the worker implementations, they appear to handle database transactions appropriately and include proper error handling.
packages/api/src/router/deployment.ts (2)
17-17: Simplified imports align with worker refactoring.The import statement has been appropriately reduced to only include the
updateDeploymentfunction, removing event-related imports that are no longer needed in this file.
322-327: Excellent refactoring of deletion logic to use job queue.The deletion mutation has been effectively simplified to enqueue a job instead of performing the deletion directly. This change offers several benefits:
- Improves API responsiveness by making deletion asynchronous
- Centralizes deletion logic in a dedicated worker
- Adds automatic deduplication to prevent duplicate deletion attempts
- Better separates concerns between API and data manipulation logic
This architectural change aligns perfectly with the PR objective of moving dispatch hooks into workers.
apps/event-worker/src/workers/update-deployment.ts (2)
3-3: Simplified imports reflect streamlined worker responsibility.The removal of imports like
and,not,selector, andhandleEventreflects the architectural improvement of removing resource exit handling logic from this worker. This change properly focuses the worker on its core responsibility of handling deployment updates.
13-53: Well-structured worker with clear separation of concerns.The worker now properly focuses on handling job agent changes and resource selector changes without direct event dispatching. This refactoring aligns with moving event handling to dedicated workers, making the codebase more maintainable and modular.
apps/event-worker/src/workers/compute-systems-release-targets.ts (4)
83-83: Good restructuring of transaction return value.Changing the transaction to return both
matchedanddeletedrelease targets improves the design by explicitly tracking both sets of data, enabling proper handling of each collection outside the transaction.
157-164: Properly structured return data for downstream processing.The addition of explicitly querying and returning both matched and deleted release targets provides the downstream code with all necessary information to process both sets appropriately. This supports the new architecture where deleted targets need special handling.
167-173: Well-implemented event-driven deletion handling.The added code correctly enqueues each deleted release target to the appropriate channel with proper deduplication. The early exit check when no matched targets exist is a good optimization. This implementation cleanly separates deletion handling from other processing.
194-194: Correctly updated to use matched targets from new structure.The dispatchEvaluateJobs call has been properly updated to use the matched release targets from the new return structure, maintaining the existing functionality while supporting the new architecture.
apps/event-worker/src/workers/deleted-release-target.ts (2)
1-9: Well-organized imports and proper logger setup.The imports are appropriately structured to include all necessary dependencies for the worker's functionality. Creating a scoped logger with the worker name is a good practice for easier log filtering and debugging.
11-46: Excellent implementation of the dedicated worker for release target deletion events.This worker is well-designed with:
- Clear documentation explaining its purpose and role in the process
- Efficient concurrent fetching of deployment and resource data
- Proper event construction using standardized action types
- Integration with the existing event handling infrastructure
- Comprehensive error handling with logging
The implementation perfectly addresses the PR objective of moving dispatch hooks into dedicated workers, centralizing the event handling logic that was previously scattered across multiple components. This will make the codebase more maintainable and easier to debug.
packages/api/src/router/environment.ts (1)
270-276: Confirm that policy updates are intentionally disallowedYou deliberately overwrite any
policyIdsupplied ininput.datawith the existingpolicyId(oldEnv.environment.policyId).
If callers are expected to modify the policy via this endpoint, their change will be silently ignored.Please verify that this behaviour is desired; otherwise consider merging the provided
policyId(with proper ACL checks) or throwing a validation error.
| const isRowLocked = e.code === "55P03"; | ||
| if (isRowLocked) { | ||
| await getQueue(Channel.DeleteDeployment).add(job.name, job.data); | ||
| return; | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Unbounded immediate retries may cause a hot loop
When a row is locked (55P03) the job is re-queued with no delay or back-off, potentially hammering the queue every millisecond until the lock is released.
Consider exponential backoff or a small fixed delay:
- await getQueue(Channel.DeleteDeployment).add(job.name, job.data);
+ await getQueue(Channel.DeleteDeployment).add(job.name, job.data, {
+ delay: 1000, // 1-second cool-off
+ attempts: 5, // optional: limit retries
+ });📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const isRowLocked = e.code === "55P03"; | |
| if (isRowLocked) { | |
| await getQueue(Channel.DeleteDeployment).add(job.name, job.data); | |
| return; | |
| } | |
| const isRowLocked = e.code === "55P03"; | |
| if (isRowLocked) { | |
| await getQueue(Channel.DeleteDeployment).add(job.name, job.data, { | |
| delay: 1000, // 1-second cool-off | |
| attempts: 5, // optional: limit retries | |
| }); | |
| return; | |
| } |
| const isRowLocked = e.code === "55P03"; | ||
| if (isRowLocked) { | ||
| await getQueue(Channel.DeleteEnvironment).add(job.name, job.data); | ||
| return; | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Introduce retry back-off to prevent tight loops
Re-queuing immediately after a lock conflict can create a hammering loop under heavy load.
Add a short delay or back-off strategy (see deployment worker suggestion).
| for (const rt of rts) | ||
| getQueue(Channel.DeletedReleaseTarget).add(rt.id, rt, { | ||
| deduplication: { id: rt.id }, | ||
| }); |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Await the asynchronous queue writes to avoid silent job-creation failures
getQueue(...).add() returns a Promise. Because these promises are not awaited (or collected with Promise.all), the worker acknowledges success and the DB transaction commits even if a Redis/network error prevents the jobs from being enqueued.
That can leave the system with soft-deleted release targets but no follow-up DeletedReleaseTarget events, breaking downstream invariants.
Suggested fix (keeps dedup & scales with many RTs):
- for (const rt of rts)
- getQueue(Channel.DeletedReleaseTarget).add(rt.id, rt, {
- deduplication: { id: rt.id },
- });
+ await Promise.all(
+ rts.map((rt) =>
+ getQueue(Channel.DeletedReleaseTarget).add(rt.id, rt, {
+ deduplication: { id: rt.id },
+ }),
+ );(If the list can be large, queue.addBulk() would be even more efficient.)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for (const rt of rts) | |
| getQueue(Channel.DeletedReleaseTarget).add(rt.id, rt, { | |
| deduplication: { id: rt.id }, | |
| }); | |
| await Promise.all( | |
| rts.map((rt) => | |
| getQueue(Channel.DeletedReleaseTarget).add(rt.id, rt, { | |
| deduplication: { id: rt.id }, | |
| }), | |
| ); |
Summary by CodeRabbit
New Features
Refactor
Bug Fixes
Chores