diff --git a/apps/event-worker/package.json b/apps/event-worker/package.json index 445b38ff5..682bc25ec 100644 --- a/apps/event-worker/package.json +++ b/apps/event-worker/package.json @@ -43,11 +43,9 @@ "cron": "^3.1.7", "dotenv": "^16.4.5", "google-auth-library": "^9.13.0", - "ioredis": "^5.4.1", "js-yaml": "^4.1.0", "lodash": "catalog:", "ms": "^2.1.3", - "redis-semaphore": "^5.6.2", "semver": "catalog:", "ts-is-present": "^1.2.2", "uuid": "^10.0.0", diff --git a/apps/event-worker/src/index.ts b/apps/event-worker/src/index.ts index 3487c9fbf..ca3342432 100644 --- a/apps/event-worker/src/index.ts +++ b/apps/event-worker/src/index.ts @@ -1,7 +1,6 @@ import { logger } from "@ctrlplane/logger"; import { register } from "./instrumentation.js"; -import { redis } from "./redis.js"; import { workers } from "./workers/index.js"; console.log("Registering instrumentation..."); @@ -9,10 +8,9 @@ await register(); const shutdown = () => { logger.warn("Exiting..."); - Promise.all(Object.values(workers).map((w) => w?.close())).then(async () => { - await redis.quit(); - process.exit(0); - }); + Promise.all(Object.values(workers).map((w) => w?.close())).then(() => + process.exit(0), + ); }; process.on("SIGTERM", shutdown); diff --git a/apps/event-worker/src/redis.ts b/apps/event-worker/src/redis.ts deleted file mode 100644 index 1e9c3849d..000000000 --- a/apps/event-worker/src/redis.ts +++ /dev/null @@ -1,7 +0,0 @@ -import IORedis from "ioredis"; - -import { env } from "./config.js"; - -export const redis = new IORedis(env.REDIS_URL, { - maxRetriesPerRequest: null, -}); diff --git a/apps/event-worker/src/workers/releases/deployment-resources.ts b/apps/event-worker/src/workers/deployment-resources.ts similarity index 100% rename from apps/event-worker/src/workers/releases/deployment-resources.ts rename to apps/event-worker/src/workers/deployment-resources.ts diff --git a/apps/event-worker/src/workers/index.ts b/apps/event-worker/src/workers/index.ts index 7edc83344..885efd837 100644 --- a/apps/event-worker/src/workers/index.ts +++ b/apps/event-worker/src/workers/index.ts @@ -4,8 +4,8 @@ import type { Worker } from "bullmq"; import { Channel } from "@ctrlplane/events"; import { dispatchJobWorker } from "./job-dispatch/index.js"; +import { newDeploymentVersionWorker } from "./new-deployment-version.js"; import { newDeploymentWorker } from "./new-deployment.js"; -import { newDeploymentVersionWorker } from "./releases/new-deployment-version.js"; import { resourceScanWorker } from "./resource-scan/index.js"; type Workers = { diff --git a/apps/event-worker/src/workers/releases/new-deployment-version.ts b/apps/event-worker/src/workers/new-deployment-version.ts similarity index 100% rename from apps/event-worker/src/workers/releases/new-deployment-version.ts rename to apps/event-worker/src/workers/new-deployment-version.ts diff --git a/apps/event-worker/src/workers/releases/evaluate/index.ts b/apps/event-worker/src/workers/releases/evaluate/index.ts deleted file mode 100644 index 3e31a9437..000000000 --- a/apps/event-worker/src/workers/releases/evaluate/index.ts +++ /dev/null @@ -1,50 +0,0 @@ -import type { ReleaseEvaluateEvent } from "@ctrlplane/validators/events"; -import { Worker } from "bullmq"; - -import { db } from "@ctrlplane/db/client"; -import { evaluate, getReleasesFromDb } from "@ctrlplane/rule-engine"; -import { createCtx, getApplicablePolicies } from "@ctrlplane/rule-engine/db"; -import { Channel } from "@ctrlplane/validators/events"; - -import { redis } from "../../../redis.js"; -import { ReleaseRepositoryMutex } from "../mutex.js"; - -export const createReleaseEvaluateWorker = () => - new Worker( - Channel.ReleaseEvaluate, - async (job) => { - job.log( - `Evaluating release for deployment ${job.data.deploymentId} and resource ${job.data.resourceId}`, - ); - - const mutex = await ReleaseRepositoryMutex.lock(job.data); - - try { - const ctx = await createCtx(db, job.data); - if (ctx == null) { - job.log( - `Resource ${job.data.resourceId} not found for deployment ${job.data.deploymentId} and environment ${job.data.environmentId}`, - ); - return; - } - - const { workspaceId } = ctx.resource; - const policy = await getApplicablePolicies(db, workspaceId, job.data); - - const result = await evaluate(policy, ctx, getReleasesFromDb(db)); - console.log(result); - } catch (error) { - const message = - error instanceof Error ? error.message : "Unknown error"; - job.log(`Error evaluating release: ${message}`); - } finally { - await mutex.unlock(); - } - }, - { - connection: redis, - removeOnComplete: { age: 1 * 60 * 60, count: 5000 }, - removeOnFail: { age: 12 * 60 * 60, count: 5000 }, - concurrency: 100, - }, - ); diff --git a/apps/event-worker/src/workers/releases/mutex.ts b/apps/event-worker/src/workers/releases/mutex.ts deleted file mode 100644 index 7d236e281..000000000 --- a/apps/event-worker/src/workers/releases/mutex.ts +++ /dev/null @@ -1,30 +0,0 @@ -import type { ReleaseRepository } from "@ctrlplane/rule-engine"; -import type { Mutex as RedisMutex } from "redis-semaphore"; -import { Mutex as RedisSemaphoreMutex } from "redis-semaphore"; - -import { redis } from "../../redis.js"; - -export class ReleaseRepositoryMutex { - static async lock(repo: ReleaseRepository) { - const mutex = new ReleaseRepositoryMutex(repo); - await mutex.lock(); - return mutex; - } - - private mutex: RedisMutex; - - constructor(repo: ReleaseRepository) { - const key = `release-repository-mutex-${repo.deploymentId}-${repo.resourceId}-${repo.environmentId}`; - this.mutex = new RedisSemaphoreMutex(redis, key, {}); - } - - async lock(): Promise { - if (this.mutex.isAcquired) throw new Error("Mutex is already locked"); - await this.mutex.acquire(); - } - - async unlock(): Promise { - if (!this.mutex.isAcquired) throw new Error("Mutex is not locked"); - await this.mutex.release(); - } -} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 277506a15..6b890a48b 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -217,9 +217,6 @@ importers: google-auth-library: specifier: ^9.13.0 version: 9.14.2 - ioredis: - specifier: ^5.4.1 - version: 5.4.1 js-yaml: specifier: ^4.1.0 version: 4.1.0 @@ -229,9 +226,6 @@ importers: ms: specifier: ^2.1.3 version: 2.1.3 - redis-semaphore: - specifier: ^5.6.2 - version: 5.6.2(ioredis@5.4.1) semver: specifier: 'catalog:' version: 7.7.1 @@ -10075,15 +10069,6 @@ packages: resolution: {integrity: sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A==} engines: {node: '>=4'} - redis-semaphore@5.6.2: - resolution: {integrity: sha512-Oh1zOqNa51VC14mwYcmdOyjHpb+y8N1ieqpGxITjkrqPiO8IoCYiXGrSyKEmXH5+UEsl/7OAnju2e0x1TY5Jhg==} - engines: {node: '>= 14.17.0'} - peerDependencies: - ioredis: ^4.1.0 || ^5 - peerDependenciesMeta: - ioredis: - optional: true - redux-immutable@4.0.0: resolution: {integrity: sha512-SchSn/DWfGb3oAejd+1hhHx01xUoxY+V7TeK0BKqpkLKiQPVFf7DYzEaKmrEVxsWxielKfSK9/Xq66YyxgR1cg==} peerDependencies: @@ -21510,14 +21495,6 @@ snapshots: dependencies: redis-errors: 1.2.0 - redis-semaphore@5.6.2(ioredis@5.4.1): - dependencies: - debug: 4.4.0 - optionalDependencies: - ioredis: 5.4.1 - transitivePeerDependencies: - - supports-color - redux-immutable@4.0.0(immutable@3.8.2): dependencies: immutable: 3.8.2