Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion apps/event-worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
"ioredis": "^5.4.1",
"lodash": "^4.17.21",
"ms": "^2.1.3",
"p-retry": "^6.2.0",
"semver": "^7.6.2",
"zod": "catalog:"
},
Expand Down
10 changes: 0 additions & 10 deletions apps/event-worker/src/github-utils.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,8 @@
import { createAppAuth } from "@octokit/auth-app";
import { Octokit } from "@octokit/rest";

import { JobStatus as JStatus } from "@ctrlplane/validators/jobs";

import { env } from "./config.js";

export const convertStatus = (status: string): JStatus => {
if (status === "success" || status === "neutral") return JStatus.Completed;
if (status === "queued" || status === "requested" || status === "waiting")
return JStatus.InProgress;
if (status === "timed_out" || status === "stale") return JStatus.Failure;
return status as JStatus;
};

export const getInstallationOctokit = (installationId: number) =>
env.GITHUB_BOT_APP_ID &&
env.GITHUB_BOT_PRIVATE_KEY &&
Expand Down
53 changes: 1 addition & 52 deletions apps/event-worker/src/job-dispatch/github.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import type { Job } from "@ctrlplane/db/schema";
import pRetry from "p-retry";

import { and, eq, takeFirstOrNull } from "@ctrlplane/db";
import { db } from "@ctrlplane/db/client";
Expand All @@ -15,7 +14,7 @@ import { logger } from "@ctrlplane/logger";
import { configSchema } from "@ctrlplane/validators/github";
import { JobStatus } from "@ctrlplane/validators/jobs";

import { convertStatus, getInstallationOctokit } from "../github-utils.js";
import { getInstallationOctokit } from "../github-utils.js";

export const dispatchGithubJob = async (je: Job) => {
logger.info(`Dispatching github job ${je.id}...`);
Expand Down Expand Up @@ -100,54 +99,4 @@ export const dispatchGithubJob = async (je: Job) => {
authorization: `Bearer ${installationToken.token}`,
},
});

let runId: number | null = null;
let status: string | null = null;

try {
const { runId: runId_, status: status_ } = await pRetry(
async () => {
const runs = await octokit.actions.listWorkflowRuns({
owner: parsed.data.owner,
repo: parsed.data.repo,
workflow_id: parsed.data.workflowId,
branch: ghOrg.branch,
});

const run = runs.data.workflow_runs.find((run) =>
run.name?.includes(je.id),
);

if (run == null) throw new Error("Run not found");

logger.info(`Run found for job ${je.id}`, { run });

return { runId: run.id, status: run.status, url: run.html_url };
},
{ retries: 15, minTimeout: 1000 },
);

runId = runId_;
status = status_;
} catch (error) {
logger.error(`Job ${je.id} dispatch to GitHub failed`, { error });
await db.update(job).set({
status: JobStatus.ExternalRunNotFound,
message: `Run ID not found for job ${je.id}`,
});
return;
}

logger.info(`Job ${je.id} dispatched to GitHub`, {
runId,
status,
});

await db
.update(job)
.set({
externalId: runId.toString(),
status: convertStatus(status ?? JobStatus.InProgress),
})
.where(eq(job.id, je.id));
};
53 changes: 0 additions & 53 deletions apps/event-worker/src/job-sync/github.ts

This file was deleted.

45 changes: 34 additions & 11 deletions apps/webservice/src/app/api/github/webhook/workflow/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,54 @@ const convertStatus = (
): schema.JobStatus =>
status === JobStatus.Completed ? JobStatus.Completed : JobStatus.InProgress;

const extractUuid = (str: string) => {
const uuidRegex =
/\b[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\b/;
const match = uuidRegex.exec(str);
return match ? match[0] : null;
};

const getJob = async (externalId: number, name: string) => {
const jobFromExternalId = await db
.select()
.from(schema.job)
.where(eq(schema.job.externalId, externalId.toString()))
.then(takeFirstOrNull);

if (jobFromExternalId != null) return jobFromExternalId;

const uuid = extractUuid(name);
if (uuid == null) return null;

return db
.select()
.from(schema.job)
.where(eq(schema.job.id, uuid))
.then(takeFirstOrNull);
};
Comment on lines +31 to +48
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for database operations.

The job retrieval logic is well-structured with a good fallback strategy. However, database operations could throw errors that should be handled gracefully.

 const getJob = async (externalId: number, name: string) => {
+  try {
     const jobFromExternalId = await db
       .select()
       .from(schema.job)
       .where(eq(schema.job.externalId, externalId.toString()))
       .then(takeFirstOrNull);

     if (jobFromExternalId != null) return jobFromExternalId;

     const uuid = extractUuid(name);
     if (uuid == null) return null;

     return db
       .select()
       .from(schema.job)
       .where(eq(schema.job.id, uuid))
       .then(takeFirstOrNull);
+  } catch (error) {
+    console.error('Failed to retrieve job:', error);
+    return null;
+  }
 };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const getJob = async (externalId: number, name: string) => {
const jobFromExternalId = await db
.select()
.from(schema.job)
.where(eq(schema.job.externalId, externalId.toString()))
.then(takeFirstOrNull);
if (jobFromExternalId != null) return jobFromExternalId;
const uuid = extractUuid(name);
if (uuid == null) return null;
return db
.select()
.from(schema.job)
.where(eq(schema.job.id, uuid))
.then(takeFirstOrNull);
};
const getJob = async (externalId: number, name: string) => {
try {
const jobFromExternalId = await db
.select()
.from(schema.job)
.where(eq(schema.job.externalId, externalId.toString()))
.then(takeFirstOrNull);
if (jobFromExternalId != null) return jobFromExternalId;
const uuid = extractUuid(name);
if (uuid == null) return null;
return db
.select()
.from(schema.job)
.where(eq(schema.job.id, uuid))
.then(takeFirstOrNull);
} catch (error) {
console.error('Failed to retrieve job:', error);
return null;
}
};


export const handleWorkflowWebhookEvent = async (event: WorkflowRunEvent) => {
const {
id,
status: externalStatus,
conclusion,
repository,
name,
} = event.workflow_run;

const job = await getJob(id, name);
if (job == null) return;

const status =
conclusion != null
? convertConclusion(conclusion)
: convertStatus(externalStatus);

const job = await db
const externalId = id.toString();
await db
.update(schema.job)
.set({ status })
.where(eq(schema.job.externalId, id.toString()))
.returning()
.then(takeFirstOrNull);

// Addressing a race condition: When the job is created externally on GitHub,
// it triggers a webhook event. However, our system hasn't updated the job with
// the externalRunId yet, as it depends on the job's instantiation. Therefore,
// the first event lacks the run ID, so we skip it and wait for the next event.
if (job == null) return;
.set({ status, externalId })
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can also generate the url at this point? I think

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are generating the URL below

.where(eq(schema.job.id, job.id));
Comment on lines +67 to +71
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider using a transaction for atomic updates.

The status update and subsequent metadata update should be atomic to prevent potential race conditions.

-  await db
-    .update(schema.job)
-    .set({ status, externalId })
-    .where(eq(schema.job.id, job.id));
+  await db.transaction(async (tx) => {
+    await tx
+      .update(schema.job)
+      .set({ status, externalId })
+      .where(eq(schema.job.id, job.id));
+
+    const existingUrlMetadata = await tx
+      .select()
+      .from(schema.jobMetadata)
+      // ... rest of the metadata query ...
+
+    // ... metadata update logic ...
+    await tx
+      .insert(schema.jobMetadata)
+      // ... rest of the metadata insert ...
+  });

Committable suggestion was skipped due to low confidence.


const existingUrlMetadata = await db
.select()
Expand Down
3 changes: 0 additions & 3 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.