chore: add dedicated queue to compute workspace policy targets#591
chore: add dedicated queue to compute workspace policy targets#591adityachoudhari26 merged 1 commit intomainfrom
Conversation
WalkthroughThis change refactors the release target computation workflow by simplifying the Changes
Sequence Diagram(s)sequenceDiagram
participant API/Trigger
participant SystemReleaseWorker
participant WorkspacePolicyWorker
participant DB
participant DownstreamJobs
API/Trigger->>SystemReleaseWorker: Dispatch ComputeSystemsReleaseTargets (system id)
SystemReleaseWorker->>DB: Lock and compute created/deleted release targets
SystemReleaseWorker->>DownstreamJobs: Enqueue deletion events for deleted targets
SystemReleaseWorker->>WorkspacePolicyWorker: Dispatch ComputeWorkspacePolicyTargets (workspace id, created targets)
WorkspacePolicyWorker->>DB: Fetch unprocessed policy targets
alt Policy targets exist
WorkspacePolicyWorker->>DB: Compute each policy target (handle lock errors)
WorkspacePolicyWorker->>DownstreamJobs: Optionally dispatch evaluation for created targets
else No policy targets
WorkspacePolicyWorker->>DownstreamJobs: Dispatch evaluation for created targets
end
Possibly related PRs
Suggested reviewers
Poem
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (7)
packages/events/src/types.ts (1)
81-86: Optional: defaultprocessedPolicyTargetIdsto an empty arrayDown-stream code always does
processedPolicyTargetIds ?? [].
Consider making the property non-optional (default[]) to remove repeated null-coalescing and avoid feedingundefinedinto SQL helpers.apps/event-worker/src/workers/compute-workspace-policy-targets.ts (3)
37-44: Early-exit still calls downstream for empty listIf
releaseTargetsToEvaluateis[], the worker still enqueues an evaluation job that performs no work.
A tiny guard can avoid superfluous queue traffic:if (policyTargets.length === 0) { if (releaseTargetsToEvaluate?.length) await dispatchQueueJob().toEvaluate().releaseTargets(releaseTargetsToEvaluate); return; }
47-66: Potential hot-loop on row-lock contentionOn every
55P03the job immediately re-queues itself without delay.
Consider adding back-off (e.g., BullMQdelayor retry strategy) to avoid thrashing when another long-running transaction is holding the locks.
69-73: Same superfluous-enqueue note applies hereSkip dispatch when
releaseTargetsToEvaluateis an empty array.apps/event-worker/src/workers/compute-systems-release-targets.ts (1)
188-195: Emptycreatedarray still spawns follow-up jobYou always enqueue a workspace job even when
createdis empty, which leads to no-op processing.
Add a quick length check to avoid unnecessary queue churn.packages/events/src/dispatch-jobs.ts (2)
88-91: Minor: provide deduplication opts instead of manual waiting scanBullMQ supports
opts.jobIdoropts.deduplicationto avoid duplicates without an explicitgetWaitingscan.
The manual check works but costs an extra round-trip; using built-in deduping would simplify the code.
130-141: Fluent API: consider returning the inner helpers
toCompute().workspace(id).policyTargets()currently returns nothing, making it hard to await the enqueue in calling code.
Returning the underlying promise would let callers await/job-chain naturally.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
apps/event-worker/src/workers/compute-systems-release-targets.ts(4 hunks)apps/event-worker/src/workers/compute-workspace-policy-targets.ts(1 hunks)apps/event-worker/src/workers/index.ts(2 hunks)e2e/tests/api/release.spec.ts(0 hunks)packages/events/src/dispatch-jobs.ts(2 hunks)packages/events/src/types.ts(3 hunks)
💤 Files with no reviewable changes (1)
- e2e/tests/api/release.spec.ts
🧰 Additional context used
📓 Path-based instructions (1)
`**/*.{ts,tsx}`: **Note on Error Handling:** Avoid strict enforcement of try/catch blocks. Code may use early returns, Promise chains (.then().catch()), or other patterns for error...
**/*.{ts,tsx}: Note on Error Handling:
Avoid strict enforcement of try/catch blocks. Code may use early returns, Promise chains (.then().catch()), or other patterns for error handling. These are acceptable as long as they maintain clarity and predictability.
apps/event-worker/src/workers/index.tspackages/events/src/types.tsapps/event-worker/src/workers/compute-workspace-policy-targets.tsapps/event-worker/src/workers/compute-systems-release-targets.tspackages/events/src/dispatch-jobs.ts
🧬 Code Graph Analysis (3)
apps/event-worker/src/workers/index.ts (1)
apps/event-worker/src/workers/compute-workspace-policy-targets.ts (1)
computeWorkspacePolicyTargetsWorker(27-74)
packages/events/src/types.ts (1)
packages/rule-engine/src/types.ts (1)
ReleaseTargetIdentifier(76-80)
packages/events/src/dispatch-jobs.ts (2)
packages/rule-engine/src/types.ts (1)
ReleaseTargetIdentifier(76-80)packages/events/src/index.ts (1)
getQueue(28-34)
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: build (linux/amd64)
- GitHub Check: Lint
- GitHub Check: Typecheck
- GitHub Check: build (linux/amd64)
🔇 Additional comments (2)
apps/event-worker/src/workers/index.ts (1)
10-10: Worker correctly registered – no issues spottedImport and registry entry look good; naming and channel match the new file.
Also applies to: 63-64
apps/event-worker/src/workers/compute-systems-release-targets.ts (1)
91-118: Lock acquisition order – confirm global consistencyThe worker obtains locks on
releaseTarget,computedEnvironmentResource, thencomputedDeploymentResource.
If any other code acquires the same tables in a different order, Postgres can deadlock.
Please verify that the chosen order is used everywhere or codify it in a shared helper.
| const getPolicyTargets = async ( | ||
| workspaceId: string, | ||
| processedPolicyTargetIds: string[], | ||
| ) => | ||
| db | ||
| .select() | ||
| .from(schema.policyTarget) | ||
| .innerJoin( | ||
| schema.policy, | ||
| eq(schema.policyTarget.policyId, schema.policy.id), | ||
| ) | ||
| .where( | ||
| and( | ||
| eq(schema.policy.workspaceId, workspaceId), | ||
| processedPolicyTargetIds.length > 0 | ||
| ? notInArray(schema.policyTarget.id, processedPolicyTargetIds) | ||
| : undefined, | ||
| ), | ||
| ); |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Guard against undefined in SQL and()
and(cond1, undefined) relies on the query builder silently discarding undefined; should that behaviour ever change, the query will break.
Prefer building the condition list explicitly:
- .where(
- and(
- eq(schema.policy.workspaceId, workspaceId),
- processedPolicyTargetIds.length > 0
- ? notInArray(schema.policyTarget.id, processedPolicyTargetIds)
- : undefined,
- ),
- );
+ .where(
+ and(
+ eq(schema.policy.workspaceId, workspaceId),
+ ...(processedPolicyTargetIds.length
+ ? [notInArray(schema.policyTarget.id, processedPolicyTargetIds)]
+ : []),
+ ),
+ );📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const getPolicyTargets = async ( | |
| workspaceId: string, | |
| processedPolicyTargetIds: string[], | |
| ) => | |
| db | |
| .select() | |
| .from(schema.policyTarget) | |
| .innerJoin( | |
| schema.policy, | |
| eq(schema.policyTarget.policyId, schema.policy.id), | |
| ) | |
| .where( | |
| and( | |
| eq(schema.policy.workspaceId, workspaceId), | |
| processedPolicyTargetIds.length > 0 | |
| ? notInArray(schema.policyTarget.id, processedPolicyTargetIds) | |
| : undefined, | |
| ), | |
| ); | |
| const getPolicyTargets = async ( | |
| workspaceId: string, | |
| processedPolicyTargetIds: string[], | |
| ) => | |
| db | |
| .select() | |
| .from(schema.policyTarget) | |
| .innerJoin( | |
| schema.policy, | |
| eq(schema.policyTarget.policyId, schema.policy.id), | |
| ) | |
| .where( | |
| and( | |
| eq(schema.policy.workspaceId, workspaceId), | |
| ...(processedPolicyTargetIds.length | |
| ? [notInArray(schema.policyTarget.id, processedPolicyTargetIds)] | |
| : []), | |
| ), | |
| ); |
🤖 Prompt for AI Agents
In apps/event-worker/src/workers/compute-workspace-policy-targets.ts around
lines 7 to 25, the SQL query uses and() with a condition that can be undefined,
relying on the query builder to ignore undefined values. To fix this, explicitly
build an array of conditions, adding the notInArray condition only if
processedPolicyTargetIds is non-empty, then pass this array to and() to avoid
any undefined arguments.
| const dispatchComputeWorkspacePolicyTargetsJobs = async ( | ||
| workspaceId: string, | ||
| processedPolicyTargetIds?: string[], | ||
| releaseTargetsToEvaluate?: ReleaseTargetIdentifier[], | ||
| ) => { | ||
| const q = getQueue(Channel.ComputeWorkspacePolicyTargets); | ||
| await q.add(workspaceId, { | ||
| workspaceId, | ||
| processedPolicyTargetIds, | ||
| releaseTargetsToEvaluate, | ||
| }); | ||
| }; |
There was a problem hiding this comment.
Duplicate-job check missing
Unlike the system-level helper above, workspace dispatch does not guard against the same workspace being queued multiple times.
Consider mirroring the waiting-queue check or use BullMQ job deduplication to prevent redundant jobs.
🤖 Prompt for AI Agents
In packages/events/src/dispatch-jobs.ts around lines 93 to 104, the
dispatchComputeWorkspacePolicyTargetsJobs function lacks a check to prevent
queuing duplicate jobs for the same workspace. To fix this, implement a check
before adding a new job to the queue that verifies if a job with the same
workspaceId is already waiting or active in the queue. You can achieve this by
querying the queue for existing jobs with the same identifier or by using
BullMQ's built-in job deduplication features to avoid redundant job entries.
Summary by CodeRabbit
New Features
Improvements
Tests