perf: optimize updated resource queue#585
Conversation
WalkthroughThe changes introduce a new function, Changes
Sequence Diagram(s)sequenceDiagram
participant Caller as Any Caller (API, Agent, Handler)
participant Events as events package
participant Queue as Job Queue
Caller->>Events: dispatchUpdatedResourceJob(resource)
Events->>Queue: get waiting jobs
alt Job with resource.id exists
Events-->>Caller: return (do not enqueue)
else No existing job
Events->>Queue: add job (resource.id, resource)
Queue-->>Events: confirmation
Events-->>Caller: return
end
Poem
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (2)
apps/pty-proxy/src/controller/agent-socket.ts (1)
141-142: Optional: make dispatch fire-and-forgetIf the rest of the method doesn’t rely on the queue write succeeding synchronously, drop the
awaitto avoid stalling the agent handshake on network latency:- await dispatchUpdatedResourceJob(res); + void dispatchUpdatedResourceJob(res);Purely a latency optimisation—ignore if strong delivery guarantees are required here.
apps/webservice/src/app/api/github/webhook/handlers/pull_request.ts (1)
303-304: You might not need the await here
Queuing a BullMQ job is fire-and-forget for most use-cases, and awaiting it makes the webhook response path a bit slower.
If webhook latency matters, consider returning the promise un-awaited (orvoid dispatchUpdatedResourceJob(...)) unless callers rely on the resolution.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
apps/pty-proxy/src/controller/agent-socket.ts(2 hunks)apps/webservice/src/app/api/github/webhook/handlers/pull_request.ts(2 hunks)apps/webservice/src/app/api/v1/resources/[resourceId]/route.ts(2 hunks)packages/events/src/index.ts(1 hunks)packages/events/src/resource-provider-scan/handle-provider-scan.ts(2 hunks)packages/events/src/updated-resource/dispatch-job.ts(1 hunks)packages/events/src/updated-resource/index.ts(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
`**/*.{ts,tsx}`: **Note on Error Handling:** Avoid strict enforcement of try/catch blocks. Code may use early returns, Promise chains (.then().catch()), or other patterns for error...
**/*.{ts,tsx}: Note on Error Handling:
Avoid strict enforcement of try/catch blocks. Code may use early returns, Promise chains (.then().catch()), or other patterns for error handling. These are acceptable as long as they maintain clarity and predictability.
packages/events/src/index.tsapps/webservice/src/app/api/v1/resources/[resourceId]/route.tspackages/events/src/updated-resource/index.tspackages/events/src/resource-provider-scan/handle-provider-scan.tsapps/pty-proxy/src/controller/agent-socket.tsapps/webservice/src/app/api/github/webhook/handlers/pull_request.tspackages/events/src/updated-resource/dispatch-job.ts
🧬 Code Graph Analysis (4)
packages/events/src/resource-provider-scan/handle-provider-scan.ts (2)
packages/db/src/schema/resource.ts (1)
resource(58-86)packages/events/src/updated-resource/dispatch-job.ts (1)
dispatchUpdatedResourceJob(5-11)
apps/pty-proxy/src/controller/agent-socket.ts (1)
packages/events/src/updated-resource/dispatch-job.ts (1)
dispatchUpdatedResourceJob(5-11)
apps/webservice/src/app/api/github/webhook/handlers/pull_request.ts (1)
packages/events/src/updated-resource/dispatch-job.ts (1)
dispatchUpdatedResourceJob(5-11)
packages/events/src/updated-resource/dispatch-job.ts (2)
packages/db/src/schema/resource.ts (1)
resource(58-86)packages/events/src/index.ts (1)
getQueue(28-34)
⏰ Context from checks skipped due to timeout of 90000ms (5)
- GitHub Check: build (linux/amd64)
- GitHub Check: build (linux/amd64)
- GitHub Check: Typecheck
- GitHub Check: Lint
- GitHub Check: build (linux/amd64)
🔇 Additional comments (5)
packages/events/src/index.ts (1)
39-40: Barrel export looks correctThe additional re-export makes
dispatchUpdatedResourceJobavailable at the package root without introducing naming collisions or type-level ambiguities.packages/events/src/updated-resource/index.ts (1)
1-1: Minimal, purposeful barrel fileStraightforward re-export; no issues spotted.
packages/events/src/resource-provider-scan/handle-provider-scan.ts (1)
9-9: Possible circular-import hotspot
handle-provider-scan.tsimports from"../index.js", whilepackages/events/src/index.tsin turn re-exports from"./resource-provider-scan/handle-provider-scan.js".
This creates a circular dependency chain:handle-provider-scan → index → handle-provider-scan
In Node ESM that often works, but the imported bindings can be
undefinedduring module initialisation, leading to intermittent “Cannot read property … of undefined” errors in cold starts or tests.Consider importing the needed symbols directly to break the cycle, e.g.
import { getQueue } from "../redis.js"; import { dispatchUpdatedResourceJob } from "../updated-resource/dispatch-job.js";or move shared utilities to a dedicated, cycle-free helper module.
apps/webservice/src/app/api/v1/resources/[resourceId]/route.ts (1)
8-12: Import group is fineThe extended import list is coherent and keeps tree-shaking behaviour unchanged.
apps/webservice/src/app/api/github/webhook/handlers/pull_request.ts (1)
7-7: Nice abstraction – 👍 for switching to the shared dispatch helper
Replacing the direct queue access withdispatchUpdatedResourceJobcentralises duplicate-protection logic and keeps this handler thin.
| for (const resource of updatedResources) | ||
| await dispatchUpdatedResourceJob(resource); |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Sequential await harms scan-time throughput
Each call hits Redis twice (getWaiting + add) and you await inside the for…of, turning N resources into N round-trips.
A safe, non-duplicating parallel dispatch is trivial because each resource ID is unique within updatedResources.
- for (const resource of updatedResources)
- await dispatchUpdatedResourceJob(resource);
+ await Promise.all(updatedResources.map(dispatchUpdatedResourceJob));This preserves de-dup logic, avoids bulk-add removal regressions, and restores near-original performance.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for (const resource of updatedResources) | |
| await dispatchUpdatedResourceJob(resource); | |
| await Promise.all(updatedResources.map(dispatchUpdatedResourceJob)); |
🤖 Prompt for AI Agents
In packages/events/src/resource-provider-scan/handle-provider-scan.ts at lines
105-106, the sequential await inside the for...of loop causes multiple Redis
round-trips and slows down throughput. Refactor the code to dispatch all updated
resource jobs in parallel by collecting the promises from
dispatchUpdatedResourceJob calls into an array and then await them all together
using Promise.all. This maintains the de-duplication logic while improving
performance by reducing the number of Redis calls.
| await dispatchUpdatedResourceJob(resource); | ||
|
|
There was a problem hiding this comment.
Incorrect payload: sends pre-update snapshot
dispatchUpdatedResourceJob is invoked with resource (the old record), not res (the freshly upserted one).
Down-stream consumers will miss the latest changes.
- await dispatchUpdatedResourceJob(resource);
+ await dispatchUpdatedResourceJob(res);This maintains previous semantics (job fired after update) while ensuring the queue carries the current state.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| await dispatchUpdatedResourceJob(resource); | |
| await dispatchUpdatedResourceJob(res); |
🤖 Prompt for AI Agents
In apps/webservice/src/app/api/v1/resources/[resourceId]/route.ts around lines
160 to 161, the function dispatchUpdatedResourceJob is called with the old
resource object instead of the updated one. To fix this, replace the argument
from resource to res so that the job receives the latest upserted resource
state, ensuring downstream consumers get the current data after the update.
| export const dispatchUpdatedResourceJob = async (resource: schema.Resource) => { | ||
| const q = getQueue(Channel.UpdatedResource); | ||
| const waiting = await q.getWaiting(); | ||
| const isAlreadyQueued = waiting.some((job) => job.data.id === resource.id); | ||
| if (isAlreadyQueued) return; | ||
| await q.add(resource.id, resource); | ||
| }; |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Inefficient scan & race-condition – leverage BullMQ’s jobId for dedup
Fetching the entire waiting list on every call is O(n) and still misses duplicates in active, delayed, etc. Two concurrent workers can also race between the getWaiting() and add() calls and enqueue a duplicate.
BullMQ already prevents duplicates when jobId is supplied; we can rely on that and avoid the scan altogether:
- const q = getQueue(Channel.UpdatedResource);
- const waiting = await q.getWaiting();
- const isAlreadyQueued = waiting.some((job) => job.data.id === resource.id);
- if (isAlreadyQueued) return;
- await q.add(resource.id, resource);
+ const q = getQueue(Channel.UpdatedResource);
+
+ // `jobId` guarantees idempotency across ALL job states and is O(1).
+ await q.add(resource.id, resource, {
+ jobId: resource.id, // de-dupe key
+ jobIdDuplicateBehavior: "ignore", // BullMQ ≥3.0 – silently skip if exists
+ });Benefits
• O(1) lookup instead of O(n) scan
• Covers waiting, delayed, active, etc.
• Eliminates race windows between lookup and add
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| export const dispatchUpdatedResourceJob = async (resource: schema.Resource) => { | |
| const q = getQueue(Channel.UpdatedResource); | |
| const waiting = await q.getWaiting(); | |
| const isAlreadyQueued = waiting.some((job) => job.data.id === resource.id); | |
| if (isAlreadyQueued) return; | |
| await q.add(resource.id, resource); | |
| }; | |
| export const dispatchUpdatedResourceJob = async (resource: schema.Resource) => { | |
| const q = getQueue(Channel.UpdatedResource); | |
| // `jobId` guarantees idempotency across ALL job states and is O(1). | |
| await q.add(resource.id, resource, { | |
| jobId: resource.id, // de-dupe key | |
| jobIdDuplicateBehavior: "ignore", // BullMQ ≥3.0 – silently skip if exists | |
| }); | |
| }; |
🤖 Prompt for AI Agents
In packages/events/src/updated-resource/dispatch-job.ts around lines 5 to 11,
the current code fetches all waiting jobs and scans them to avoid duplicates,
which is inefficient and prone to race conditions. To fix this, remove the
getWaiting() call and the duplicate check, and instead pass the resource.id as
the jobId parameter when calling q.add(). This leverages BullMQ's built-in
deduplication by jobId, providing O(1) lookup and preventing duplicates across
all job states without race conditions.
There was a problem hiding this comment.
BullMQ's deduplication will skip jobs if the job is actively being processed by a worker, which we don't want. We specifically only want to skip if its waiting, if its actively processing we still want to add it.
There was a problem hiding this comment.
@adityachoudhari26 Understood—your requirement is “skip only when the job is still waiting, but enqueue a new one once the previous job is already being processed.”
BullMQ unfortunately ties its jobId de-duplication to all states (waiting, delayed, active, etc.). If a job with the same jobId is active, a second add() call with that jobId is ignored, so the built-in mechanism can’t express your “waiting-only” rule.
A middle-ground that avoids the O(n) full-queue scan but still checks only the waiting list is to ask BullMQ directly for the job by jobId and inspect its state:
export const dispatchUpdatedResourceJob = async (resource: schema.Resource) => {
const q = getQueue(Channel.UpdatedResource);
// Give the waiting job a predictable id so we can query it in O(1)
const jobId = `waiting:${resource.id}`;
// Look up any existing job with this id.
const existing = await q.getJob(jobId);
// If it exists **and** is still waiting, we’re done—otherwise enqueue again.
if (existing && (await existing.getState()) === "waiting") return;
await q.add(resource.id, resource, { jobId });
};Why this helps
• queue.getJob(jobId) is a direct Redis hash lookup—O(1), no full list scan
• We only skip when state === “waiting”, satisfying your requirement
• No race window: if two callers run concurrently, only one will win the add() because the other’s getJob() will see the new job in waiting state.
Trade-offs
- The jobId needs a prefix (e.g.,
waiting:) so simultaneous active jobs can still have independent automatically-generated IDs. - If you ever change queue settings (e.g.,
LIFO, priorities), make sure the logic still reflects the desired behaviour.
If you prefer to stay with the original scan for now, that’s perfectly valid; just wanted to surface this lighter alternative. Let me know which direction you’d like to take!
Summary by CodeRabbit
New Features
Refactor