diff --git a/apps/event-worker/src/utils/dispatch-evaluate-jobs.ts b/apps/event-worker/src/utils/dispatch-evaluate-jobs.ts new file mode 100644 index 000000000..e494ec53f --- /dev/null +++ b/apps/event-worker/src/utils/dispatch-evaluate-jobs.ts @@ -0,0 +1,11 @@ +import type * as schema from "@ctrlplane/db/schema"; + +import { Channel, getQueue } from "@ctrlplane/events"; + +export const dispatchEvaluateJobs = async (rts: schema.ReleaseTarget[]) => { + const jobs = rts.map((rt) => ({ + name: `${rt.resourceId}-${rt.environmentId}-${rt.deploymentId}`, + data: rt, + })); + await getQueue(Channel.EvaluateReleaseTarget).addBulk(jobs); +}; diff --git a/apps/event-worker/src/utils/replace-release-targets.ts b/apps/event-worker/src/utils/replace-release-targets.ts deleted file mode 100644 index 305ec949b..000000000 --- a/apps/event-worker/src/utils/replace-release-targets.ts +++ /dev/null @@ -1,68 +0,0 @@ -import type { Tx } from "@ctrlplane/db"; - -import { and, eq } from "@ctrlplane/db"; -import * as SCHEMA from "@ctrlplane/db/schema"; -import { makeWithSpan, trace } from "@ctrlplane/logger"; - -const tracer = trace.getTracer("upsert-release-targets"); -const withSpan = makeWithSpan(tracer); - -export const replaceReleaseTargets = withSpan( - "replaceReleaseTargets", - async (span, db: Tx, resource: SCHEMA.Resource) => { - span.setAttribute("resource.id", resource.id); - span.setAttribute("resource.name", resource.name); - span.setAttribute("workspace.id", resource.workspaceId); - - return db.transaction(async (db) => { - await db - .delete(SCHEMA.releaseTarget) - .where(eq(SCHEMA.releaseTarget.resourceId, resource.id)); - - const rows = await db - .select() - .from(SCHEMA.computedEnvironmentResource) - .innerJoin( - SCHEMA.environment, - eq( - SCHEMA.computedEnvironmentResource.environmentId, - SCHEMA.environment.id, - ), - ) - .innerJoin( - SCHEMA.deployment, - eq(SCHEMA.deployment.systemId, SCHEMA.environment.systemId), - ) - .leftJoin( - SCHEMA.computedDeploymentResource, - and( - eq( - SCHEMA.computedDeploymentResource.deploymentId, - SCHEMA.deployment.id, - ), - eq(SCHEMA.computedDeploymentResource.resourceId, resource.id), - ), - ) - .where(eq(SCHEMA.computedEnvironmentResource.resourceId, resource.id)); - - const targets = rows - .filter( - (r) => - r.deployment.resourceSelector == null || - r.computed_deployment_resource != null, - ) - .map((r) => ({ - environmentId: r.environment.id, - deploymentId: r.deployment.id, - resourceId: resource.id, - })); - - if (targets.length === 0) return []; - return db - .insert(SCHEMA.releaseTarget) - .values(targets) - .onConflictDoNothing() - .returning(); - }); - }, -); diff --git a/apps/event-worker/src/workers/new-deployment-version.ts b/apps/event-worker/src/workers/new-deployment-version.ts index 2c894fb4b..545257773 100644 --- a/apps/event-worker/src/workers/new-deployment-version.ts +++ b/apps/event-worker/src/workers/new-deployment-version.ts @@ -1,7 +1,9 @@ import { eq } from "@ctrlplane/db"; import { db } from "@ctrlplane/db/client"; import * as schema from "@ctrlplane/db/schema"; -import { Channel, createWorker, getQueue } from "@ctrlplane/events"; +import { Channel, createWorker } from "@ctrlplane/events"; + +import { dispatchEvaluateJobs } from "../utils/dispatch-evaluate-jobs.js"; /** * Worker that processes new deployment version events. @@ -18,12 +20,6 @@ export const newDeploymentVersionWorker = createWorker( const releaseTargets = await db.query.releaseTarget.findMany({ where: eq(schema.releaseTarget.deploymentId, version.deploymentId), }); - - await getQueue(Channel.EvaluateReleaseTarget).addBulk( - releaseTargets.map((rt) => ({ - name: `${rt.resourceId}-${rt.environmentId}-${rt.deploymentId}`, - data: rt, - })), - ); + await dispatchEvaluateJobs(releaseTargets); }, ); diff --git a/apps/event-worker/src/workers/new-deployment.ts b/apps/event-worker/src/workers/new-deployment.ts index 41a9bfecf..212ee178d 100644 --- a/apps/event-worker/src/workers/new-deployment.ts +++ b/apps/event-worker/src/workers/new-deployment.ts @@ -1,13 +1,22 @@ -import { eq, selector, takeFirst } from "@ctrlplane/db"; +import { eq, selector } from "@ctrlplane/db"; import { db } from "@ctrlplane/db/client"; import * as schema from "@ctrlplane/db/schema"; -import { Channel, createWorker, getQueue } from "@ctrlplane/events"; +import { Channel, createWorker } from "@ctrlplane/events"; import { logger } from "@ctrlplane/logger"; -import { replaceReleaseTargets } from "../utils/replace-release-targets.js"; +import { dispatchEvaluateJobs } from "../utils/dispatch-evaluate-jobs.js"; const log = logger.child({ module: "new-deployment" }); +const recomputeReleaseTargets = async ( + deployment: schema.Deployment, + resources: schema.Resource[], +) => { + const computeBuilder = selector().compute(); + await computeBuilder.deployments([deployment]).resourceSelectors(); + return computeBuilder.resources(resources).releaseTargets(); +}; + /** * Worker that processes new deployment events. * @@ -24,43 +33,17 @@ export const newDeploymentWorker = createWorker( Channel.NewDeployment, async (job) => { try { - await selector() - .compute() - .deployments([job.data.id]) - .resourceSelectors() - .replace(); - - const system = await db - .select() - .from(schema.system) - .where(eq(schema.system.id, job.data.systemId)) - .then(takeFirst); - const { workspaceId } = system; + const environments = await db.query.environment.findMany({ + where: eq(schema.environment.systemId, job.data.systemId), + with: { computedResources: { with: { resource: true } } }, + }); - const computedDeploymentResources = await db - .select() - .from(schema.computedDeploymentResource) - .innerJoin( - schema.resource, - eq(schema.computedDeploymentResource.resourceId, schema.resource.id), - ) - .where(eq(schema.computedDeploymentResource.deploymentId, job.data.id)); - const resources = computedDeploymentResources.map((r) => r.resource); - - const releaseTargetPromises = resources.map(async (r) => - replaceReleaseTargets(db, r), + const resources = environments.flatMap((e) => + e.computedResources.map((r) => r.resource), ); - const fulfilled = await Promise.all(releaseTargetPromises); - const rts = fulfilled.flat(); - - await selector() - .compute() - .allPolicies(workspaceId) - .releaseTargetSelectors() - .replace(); - const evaluateJobs = rts.map((rt) => ({ name: rt.id, data: rt })); - await getQueue(Channel.EvaluateReleaseTarget).addBulk(evaluateJobs); + const releaseTargets = await recomputeReleaseTargets(job.data, resources); + await dispatchEvaluateJobs(releaseTargets); } catch (error) { log.error("Error upserting release targets", { error }); throw error; diff --git a/apps/event-worker/src/workers/new-resource.ts b/apps/event-worker/src/workers/new-resource.ts index e5c925a2f..a8d5e2e00 100644 --- a/apps/event-worker/src/workers/new-resource.ts +++ b/apps/event-worker/src/workers/new-resource.ts @@ -1,10 +1,16 @@ +import type * as schema from "@ctrlplane/db/schema"; + import { selector } from "@ctrlplane/db"; -import { db } from "@ctrlplane/db/client"; -import { Channel, createWorker, getQueue } from "@ctrlplane/events"; +import { Channel, createWorker } from "@ctrlplane/events"; -import { replaceReleaseTargets } from "../utils/replace-release-targets.js"; +import { dispatchEvaluateJobs } from "../utils/dispatch-evaluate-jobs.js"; -const queue = getQueue(Channel.EvaluateReleaseTarget); +const recomputeReleaseTargets = async (resource: schema.Resource) => { + const computeBuilder = selector().compute(); + const { workspaceId } = resource; + await computeBuilder.allResourceSelectors(workspaceId); + return computeBuilder.resources([resource]).releaseTargets(); +}; /** * Worker that processes new resource events. @@ -21,23 +27,7 @@ const queue = getQueue(Channel.EvaluateReleaseTarget); export const newResourceWorker = createWorker( Channel.NewResource, async ({ data: resource }) => { - const cb = selector().compute(); - - await Promise.all([ - cb.allEnvironments(resource.workspaceId).resourceSelectors().replace(), - cb.allDeployments(resource.workspaceId).resourceSelectors().replace(), - ]); - - const rts = await replaceReleaseTargets(db, resource); - await cb - .allPolicies(resource.workspaceId) - .releaseTargetSelectors() - .replace(); - - const jobs = rts.map((rt) => ({ - name: `${rt.resourceId}-${rt.environmentId}-${rt.deploymentId}`, - data: rt, - })); - await queue.addBulk(jobs); + const releaseTargets = await recomputeReleaseTargets(resource); + await dispatchEvaluateJobs(releaseTargets); }, ); diff --git a/apps/event-worker/src/workers/update-deployment-variable.ts b/apps/event-worker/src/workers/update-deployment-variable.ts index f1747023f..ed15eff81 100644 --- a/apps/event-worker/src/workers/update-deployment-variable.ts +++ b/apps/event-worker/src/workers/update-deployment-variable.ts @@ -1,7 +1,9 @@ import { eq } from "@ctrlplane/db"; import { db } from "@ctrlplane/db/client"; import * as schema from "@ctrlplane/db/schema"; -import { Channel, createWorker, getQueue } from "@ctrlplane/events"; +import { Channel, createWorker } from "@ctrlplane/events"; + +import { dispatchEvaluateJobs } from "../utils/dispatch-evaluate-jobs.js"; /** * Worker that handles deployment variable changes @@ -27,11 +29,6 @@ export const updateDeploymentVariableWorker = createWorker( where: eq(schema.releaseTarget.deploymentId, variable.deploymentId), }); - await getQueue(Channel.EvaluateReleaseTarget).addBulk( - releaseTargets.map((rt) => ({ - name: `${rt.resourceId}-${rt.environmentId}-${rt.deploymentId}`, - data: rt, - })), - ); + await dispatchEvaluateJobs(releaseTargets); }, ); diff --git a/apps/event-worker/src/workers/update-deployment.ts b/apps/event-worker/src/workers/update-deployment.ts index 9c92b6ef0..47cdeb2ad 100644 --- a/apps/event-worker/src/workers/update-deployment.ts +++ b/apps/event-worker/src/workers/update-deployment.ts @@ -1,19 +1,17 @@ -import type { Tx } from "@ctrlplane/db"; import _ from "lodash"; -import { eq, inArray, selector, takeFirst } from "@ctrlplane/db"; +import { eq, selector } from "@ctrlplane/db"; import { db } from "@ctrlplane/db/client"; import * as schema from "@ctrlplane/db/schema"; -import { Channel, createWorker, getQueue } from "@ctrlplane/events"; +import { Channel, createWorker } from "@ctrlplane/events"; import { handleEvent } from "@ctrlplane/job-dispatch"; import { logger } from "@ctrlplane/logger"; -import { replaceReleaseTargets } from "../utils/replace-release-targets.js"; +import { dispatchEvaluateJobs } from "../utils/dispatch-evaluate-jobs.js"; const log = logger.child({ module: "update-deployment" }); const dispatchExitHooks = async ( - db: Tx, deployment: schema.Deployment, exitedResources: schema.Resource[], ) => { @@ -26,107 +24,20 @@ const dispatchExitHooks = async ( await Promise.allSettled(handleEventPromises); }; -/** - * Extracted into its own function to solve for the following edge case - - * if are are setting a resource selector on a deployment to null - * then beacuse we do not store computed resources for deployments with no - * resource selector, we need to compute the resources based on the environments - * in the system that the deployment is in. - * - * Otherwise, just use the computed resources for the deployment if it is - * not null. - * - * @param {Tx} db - The database transaction - * @param {Deployment} deployment - The deployment to get the computed resources for - * @returns {Promise} A promise that resolves to the computed resources - */ -const getNewDeploymentComputedResources = async ( - db: Tx, - deployment: schema.Deployment, -) => { - if (deployment.resourceSelector != null) - return db - .select() - .from(schema.computedDeploymentResource) - .innerJoin( - schema.resource, - eq(schema.computedDeploymentResource.resourceId, schema.resource.id), - ) - .where(eq(schema.computedDeploymentResource.deploymentId, deployment.id)) - .then((rows) => rows.map((r) => r.resource)); - - const system = await db.query.system.findFirst({ - where: eq(schema.system.id, deployment.systemId), - with: { environments: true }, - }); - if (system == null) throw new Error("System not found"); - - const releaseTargets = await db.query.releaseTarget.findMany({ - where: inArray( - schema.releaseTarget.environmentId, - system.environments.map((e) => e.id), - ), - with: { resource: true }, - }); - - return releaseTargets.map((rt) => rt.resource); -}; - -/** - * Recomputes the resources for a deployment and returns the difference between - * the current and new resources. - * - * @param {Tx} db - The database transaction - * @param {Deployment} deployment - The deployment to recompute the resources for - * @returns {Promise<{ newResources: Resource[], exitedResources: Resource[] }>} A promise that resolves to the new and exited resources - */ -const recomputeResourcesAndReturnDiff = async ( - db: Tx, - deployment: schema.Deployment, +const recomputeReleaseTargets = async ( + deployment: schema.Deployment & { system: schema.System }, ) => { - /* - we use the release targest instead of the computed resources - because a deployment with no resource selector technically matches all - deployments but won't have any computed entries. Hence if you add a - resource selector after the fact, if you used the previous computed resources - to calculate the diff it would not be picked up - */ - const currentComputedResources = await db - .selectDistinctOn([schema.releaseTarget.resourceId]) - .from(schema.releaseTarget) - .innerJoin( - schema.resource, - eq(schema.releaseTarget.resourceId, schema.resource.id), - ) - .where(eq(schema.releaseTarget.deploymentId, deployment.id)); - const currentResources = currentComputedResources.map((r) => r.resource); - - await selector() - .compute() - .deployments([deployment.id]) - .resourceSelectors() - .replace(); - - const newResources = await getNewDeploymentComputedResources(db, deployment); - - const exitedResources = currentResources.filter( - (r) => !newResources.some((nr) => nr.id === r.id), - ); - - return { newResources, exitedResources }; + const computeBuilder = selector().compute(); + await computeBuilder.deployments([deployment]).resourceSelectors(); + const { system } = deployment; + const { workspaceId } = system; + return computeBuilder.allResources(workspaceId).releaseTargets(); }; /** - * Worker that updates a deployment - * - * When a deployment is updated and the resource selector is changed, perform the following steps: - * 1. Recompute the resources for the deployment and return which resources - * have been added and which have been removed - * 2. For all affected resources, replace the release targets based on new computations - * 3. Recompute all policy targets' computed release targets based on the new release targets - * 4. Add all replaced release targets to the evaluation queue - * 5. Dispatch exit hooks for the exited resources - * + * Worker that does the post-processing after a deployment is updated + * 1. Grab the current release targets for the deployment, with resources + * 2. For the current resources, recompute the release targets * * @param {Job} job - The deployment data * @returns {Promise} A promise that resolves when processing is complete @@ -138,31 +49,29 @@ export const updateDeploymentWorker = createWorker( const { oldSelector, resourceSelector } = data; if (_.isEqual(oldSelector, resourceSelector)) return; - const { newResources, exitedResources } = - await recomputeResourcesAndReturnDiff(db, data); - - const system = await db - .select() - .from(schema.system) - .where(eq(schema.system.id, data.systemId)) - .then(takeFirst); - const { workspaceId } = system; - const allResources = [...newResources, ...exitedResources]; - const releaseTargetPromises = allResources.map(async (r) => - replaceReleaseTargets(db, r), - ); - const fulfilled = await Promise.all(releaseTargetPromises); - const rts = fulfilled.flat(); - await selector() - .compute() - .allPolicies(workspaceId) - .releaseTargetSelectors() - .replace(); - - const evaluateJobs = rts.map((rt) => ({ name: rt.id, data: rt })); - await getQueue(Channel.EvaluateReleaseTarget).addBulk(evaluateJobs); - - await dispatchExitHooks(db, data, exitedResources); + const deployment = await db.query.deployment.findFirst({ + where: eq(schema.deployment.id, data.id), + with: { system: true, releaseTargets: { with: { resource: true } } }, + }); + if (deployment == null) + throw new Error(`Deployment not found: ${data.id}`); + + const { releaseTargets } = deployment; + const currentResources = releaseTargets.map((rt) => rt.resource); + + const rts = await recomputeReleaseTargets(deployment); + await dispatchEvaluateJobs(rts); + + const exitedResources = _.chain(currentResources) + .filter( + (r) => + !rts.some( + (rt) => rt.resourceId === r.id && rt.deploymentId === data.id, + ), + ) + .uniqBy((r) => r.id) + .value(); + await dispatchExitHooks(data, exitedResources); } catch (error) { log.error("Error updating deployment", { error }); throw error; diff --git a/apps/event-worker/src/workers/update-environment.ts b/apps/event-worker/src/workers/update-environment.ts index 2e324a6d7..605719043 100644 --- a/apps/event-worker/src/workers/update-environment.ts +++ b/apps/event-worker/src/workers/update-environment.ts @@ -1,14 +1,14 @@ import type { Tx } from "@ctrlplane/db"; import _ from "lodash"; -import { eq, selector, takeFirst } from "@ctrlplane/db"; +import { eq, selector } from "@ctrlplane/db"; import { db } from "@ctrlplane/db/client"; import * as schema from "@ctrlplane/db/schema"; -import { Channel, createWorker, getQueue } from "@ctrlplane/events"; +import { Channel, createWorker } from "@ctrlplane/events"; import { handleEvent } from "@ctrlplane/job-dispatch"; import { logger } from "@ctrlplane/logger"; -import { replaceReleaseTargets } from "../utils/replace-release-targets.js"; +import { dispatchEvaluateJobs } from "../utils/dispatch-evaluate-jobs.js"; const log = logger.child({ module: "env-selector-update", @@ -36,42 +36,14 @@ const dispatchExitHooks = async ( await Promise.allSettled(handleEventPromises); }; -const recomputeResourcesAndReturnDiff = async ( - db: Tx, - environmentId: string, +const recomputeReleaseTargets = async ( + environment: schema.Environment & { system: schema.System }, ) => { - const currentComputedResources = await db - .select() - .from(schema.computedEnvironmentResource) - .innerJoin( - schema.resource, - eq(schema.computedEnvironmentResource.resourceId, schema.resource.id), - ) - .where(eq(schema.computedEnvironmentResource.environmentId, environmentId)); - const currentResources = currentComputedResources.map((r) => r.resource); - - await selector() - .compute() - .environments([environmentId]) - .resourceSelectors() - .replace(); - - const newComputedResources = await db - .select() - .from(schema.computedEnvironmentResource) - .innerJoin( - schema.resource, - eq(schema.computedEnvironmentResource.resourceId, schema.resource.id), - ) - .where(eq(schema.computedEnvironmentResource.environmentId, environmentId)); - - const newResources = newComputedResources.map((r) => r.resource); - - const exitedResources = currentResources.filter( - (r) => !newResources.some((nr) => nr.id === r.id), - ); - - return { newResources, exitedResources }; + const computeBuilder = selector().compute(); + await computeBuilder.environments([environment]).resourceSelectors(); + const { system } = environment; + const { workspaceId } = system; + return computeBuilder.allResources(workspaceId).releaseTargets(); }; /** @@ -93,34 +65,35 @@ export const updateEnvironmentWorker = createWorker( Channel.UpdateEnvironment, async (job) => { try { - const { oldSelector, ...environment } = job.data; - if (_.isEqual(oldSelector, environment.resourceSelector)) return; - - const { newResources, exitedResources } = - await recomputeResourcesAndReturnDiff(db, environment.id); - const allResources = [...newResources, ...exitedResources]; - const releaseTargetPromises = allResources.map(async (r) => - replaceReleaseTargets(db, r), - ); - const fulfilled = await Promise.all(releaseTargetPromises); - const rts = fulfilled.flat(); - - const system = await db - .select() - .from(schema.system) - .where(eq(schema.system.id, environment.systemId)) - .then(takeFirst); - const { workspaceId } = system; - - await selector() - .compute() - .allPolicies(workspaceId) - .releaseTargetSelectors() - .replace(); - - const evaluateJobs = rts.map((rt) => ({ name: rt.id, data: rt })); - await getQueue(Channel.EvaluateReleaseTarget).addBulk(evaluateJobs); - + const { oldSelector, resourceSelector } = job.data; + if (_.isEqual(oldSelector, resourceSelector)) return; + + const environment = await db.query.environment.findFirst({ + where: eq(schema.environment.id, job.data.id), + with: { system: true, releaseTargets: { with: { resource: true } } }, + }); + if (environment == null) + throw new Error(`Environment not found: ${job.data.id}`); + + const { releaseTargets } = environment; + const currentResources = _.chain(releaseTargets) + .map((rt) => rt.resource) + .uniqBy((r) => r.id) + .value(); + + const rts = await recomputeReleaseTargets(environment); + await dispatchEvaluateJobs(rts); + + const exitedResources = _.chain(currentResources) + .filter( + (r) => + !rts.some( + (rt) => + rt.resourceId === r.id && rt.environmentId === environment.id, + ), + ) + .uniqBy((r) => r.id) + .value(); await dispatchExitHooks(db, environment.systemId, exitedResources); } catch (error) { log.error("Error updating environment", { error }); diff --git a/apps/event-worker/src/workers/update-resource-variable.ts b/apps/event-worker/src/workers/update-resource-variable.ts index 79f3384c0..030a6f505 100644 --- a/apps/event-worker/src/workers/update-resource-variable.ts +++ b/apps/event-worker/src/workers/update-resource-variable.ts @@ -1,13 +1,35 @@ -import { eq, selector, takeFirst } from "@ctrlplane/db"; +import _ from "lodash"; + +import { eq, selector } from "@ctrlplane/db"; import { db } from "@ctrlplane/db/client"; import * as schema from "@ctrlplane/db/schema"; -import { Channel, createWorker, getQueue } from "@ctrlplane/events"; +import { Channel, createWorker } from "@ctrlplane/events"; +import { handleEvent } from "@ctrlplane/job-dispatch"; import { logger } from "@ctrlplane/logger"; -import { replaceReleaseTargets } from "../utils/replace-release-targets.js"; +import { dispatchEvaluateJobs } from "../utils/dispatch-evaluate-jobs.js"; const log = logger.child({ module: "update-resource-variable" }); +const dispatchExitHooks = async ( + deployments: schema.Deployment[], + exitedResource: schema.Resource, +) => { + const events = deployments.map((deployment) => ({ + action: "deployment.resource.removed" as const, + payload: { deployment, resource: exitedResource }, + })); + + const handleEventPromises = events.map(handleEvent); + await Promise.allSettled(handleEventPromises); +}; + +const recomputeReleaseTargets = async (resource: schema.Resource) => { + const computeBuilder = selector().compute(); + await computeBuilder.allResourceSelectors(resource.workspaceId); + return computeBuilder.resources([resource]).releaseTargets(); +}; + /** * Worker that updates a resource variable * @@ -26,27 +48,24 @@ export const updateResourceVariableWorker = createWorker( try { const { data } = job; const { resourceId } = data; + const resource = await db.query.resource.findFirst({ + where: eq(schema.resource.id, resourceId), + with: { releaseTargets: { with: { deployment: true } } }, + }); + if (resource == null) + throw new Error(`Resource not found: ${resourceId}`); + const currentDeployments = resource.releaseTargets.map( + (rt) => rt.deployment, + ); + + const rts = await recomputeReleaseTargets(resource); + await dispatchEvaluateJobs(rts); - const resource = await db - .select() - .from(schema.resource) - .where(eq(schema.resource.id, resourceId)) - .then(takeFirst); - const { workspaceId } = resource; - - const cb = selector().compute(); - - await Promise.all([ - cb.allEnvironments(workspaceId).resourceSelectors().replace(), - cb.allDeployments(workspaceId).resourceSelectors().replace(), - ]); - const rts = await replaceReleaseTargets(db, resource); - await cb.allPolicies(workspaceId).releaseTargetSelectors().replace(); - const jobs = rts.map((rt) => ({ - name: `${rt.resourceId}-${rt.environmentId}-${rt.deploymentId}`, - data: rt, - })); - await getQueue(Channel.EvaluateReleaseTarget).addBulk(jobs); + const exitedDeployments = _.chain(currentDeployments) + .filter((d) => !rts.some((rt) => rt.deploymentId === d.id)) + .uniqBy((d) => d.id) + .value(); + await dispatchExitHooks(exitedDeployments, resource); } catch (error) { log.error("Error updating resource variable", { error }); throw error; diff --git a/apps/event-worker/src/workers/updated-resources/dispatch-exit-hooks.ts b/apps/event-worker/src/workers/updated-resources/dispatch-exit-hooks.ts deleted file mode 100644 index f0d843a51..000000000 --- a/apps/event-worker/src/workers/updated-resources/dispatch-exit-hooks.ts +++ /dev/null @@ -1,104 +0,0 @@ -import type { Tx } from "@ctrlplane/db"; -import type { ReleaseTargetIdentifier } from "@ctrlplane/rule-engine"; - -import { and, eq, inArray, isNotNull, notInArray, or } from "@ctrlplane/db"; -import * as SCHEMA from "@ctrlplane/db/schema"; -import { handleEvent } from "@ctrlplane/job-dispatch"; -import { HookAction } from "@ctrlplane/validators/events"; - -import { withSpan } from "./span.js"; - -const getSystemsForUnmatchedEnvs = withSpan( - "getSystemsForUnmatchedEnvs", - async ( - _, - db: Tx, - previousReleaseTargets: ReleaseTargetIdentifier[], - newReleaseTargets: ReleaseTargetIdentifier[], - ) => { - const previousEnvIds = new Set( - previousReleaseTargets.map((rt) => rt.environmentId), - ); - const newEnvIds = new Set( - newReleaseTargets.map((rt) => rt.environmentId), - ); - const unmatchedEnvs = Array.from(previousEnvIds).filter( - (envId) => !newEnvIds.has(envId), - ); - - const envs = await db.query.environment.findMany({ - where: inArray(SCHEMA.environment.id, unmatchedEnvs), - }); - - return db.query.system.findMany({ - where: inArray( - SCHEMA.system.id, - envs.map((e) => e.systemId), - ), - with: { - deployments: true, - environments: { - where: and( - isNotNull(SCHEMA.environment.resourceSelector), - notInArray(SCHEMA.environment.id, unmatchedEnvs), - ), - }, - }, - }); - }, -); - -const dispatchExitHooksIfExitedSystem = withSpan( - "dispatchExitHooksIfExitedSystem", - async ( - _, - db: Tx, - resource: SCHEMA.Resource, - system: { - deployments: SCHEMA.Deployment[]; - environments: SCHEMA.Environment[]; - }, - ) => { - const { deployments, environments } = system; - const matchedResource = await db.query.resource.findFirst({ - where: and( - eq(SCHEMA.resource.id, resource.id), - or( - ...environments.map((e) => - SCHEMA.resourceMatchesMetadata(db, e.resourceSelector), - ), - ), - ), - }); - if (matchedResource == null) return; - - const events = deployments.map((deployment) => ({ - action: HookAction.DeploymentResourceRemoved, - payload: { deployment, resource }, - })); - - const handleEventPromises = events.map(handleEvent); - await Promise.allSettled(handleEventPromises); - }, -); - -export const dispatchExitHooks = withSpan( - "dispatchExitHooks", - async ( - _, - db: Tx, - resource: SCHEMA.Resource, - currentReleaseTargets: ReleaseTargetIdentifier[], - newReleaseTargets: ReleaseTargetIdentifier[], - ) => { - const systems = await getSystemsForUnmatchedEnvs( - db, - currentReleaseTargets, - newReleaseTargets, - ); - const dispatchExitHooksPromises = systems.map((system) => - dispatchExitHooksIfExitedSystem(db, resource, system), - ); - await Promise.allSettled(dispatchExitHooksPromises); - }, -); diff --git a/apps/event-worker/src/workers/updated-resources/index.ts b/apps/event-worker/src/workers/updated-resources/index.ts index 5548a070c..5946f15dc 100644 --- a/apps/event-worker/src/workers/updated-resources/index.ts +++ b/apps/event-worker/src/workers/updated-resources/index.ts @@ -1,13 +1,27 @@ -import { eq, inArray, selector } from "@ctrlplane/db"; +import _ from "lodash"; + +import { eq, selector } from "@ctrlplane/db"; import { db } from "@ctrlplane/db/client"; import * as SCHEMA from "@ctrlplane/db/schema"; -import { Channel, createWorker, getQueue } from "@ctrlplane/events"; -import { logger } from "@ctrlplane/logger"; +import { Channel, createWorker } from "@ctrlplane/events"; +import { handleEvent } from "@ctrlplane/job-dispatch"; -import { replaceReleaseTargets } from "../../utils/replace-release-targets.js"; -import { dispatchExitHooks } from "./dispatch-exit-hooks.js"; +import { dispatchEvaluateJobs } from "../../utils/dispatch-evaluate-jobs.js"; import { withSpan } from "./span.js"; +const dispatchExitHooks = async ( + deployments: SCHEMA.Deployment[], + exitedResource: SCHEMA.Resource, +) => { + const events = deployments.map((deployment) => ({ + action: "deployment.resource.removed" as const, + payload: { deployment, resource: exitedResource }, + })); + + const handleEventPromises = events.map(handleEvent); + await Promise.allSettled(handleEventPromises); +}; + export const updatedResourceWorker = createWorker( Channel.UpdatedResource, withSpan("updatedResourceWorker", async (span, { data: resource }) => { @@ -17,52 +31,21 @@ export const updatedResourceWorker = createWorker( const currentReleaseTargets = await db.query.releaseTarget.findMany({ where: eq(SCHEMA.releaseTarget.resourceId, resource.id), + with: { deployment: true }, }); + const currentDeployments = currentReleaseTargets.map((rt) => rt.deployment); - const cb = selector().compute(); - await Promise.all([ - cb.allEnvironments(resource.workspaceId).resourceSelectors().replace(), - cb.allDeployments(resource.workspaceId).resourceSelectors().replace(), - ]); - const upsertedReleaseTargets = await replaceReleaseTargets(db, resource); - await cb - .allPolicies(resource.workspaceId) - .releaseTargetSelectors() - .replace(); - - const releaseTargetsToDelete = currentReleaseTargets.filter( - (rt) => !upsertedReleaseTargets.some((nrt) => nrt.id === rt.id), - ); - await db.delete(SCHEMA.releaseTarget).where( - inArray( - SCHEMA.releaseTarget.id, - releaseTargetsToDelete.map((rt) => rt.id), - ), - ); - - const dispatchExitHooksPromise = dispatchExitHooks( - db, - resource, - currentReleaseTargets, - upsertedReleaseTargets, - ); - - logger.info( - `dispatching ${upsertedReleaseTargets.length} evaluations for release targets of resource ${resource.id}`, - ); + const rts = await selector() + .compute() + .resources([resource]) + .releaseTargets(); - const addToEvaluateQueuePromise = getQueue( - Channel.EvaluateReleaseTarget, - ).addBulk( - upsertedReleaseTargets.map((rt) => ({ - name: `${rt.resourceId}-${rt.environmentId}-${rt.deploymentId}`, - data: rt, - })), - ); + await dispatchEvaluateJobs(rts); - await Promise.allSettled([ - dispatchExitHooksPromise, - addToEvaluateQueuePromise, - ]); + const exitedDeployments = _.chain(currentDeployments) + .filter((d) => !rts.some((nrt) => nrt.deploymentId === d.id)) + .uniqBy((d) => d.id) + .value(); + await dispatchExitHooks(exitedDeployments, resource); }), ); diff --git a/apps/pty-proxy/src/controller/agent-socket.ts b/apps/pty-proxy/src/controller/agent-socket.ts index 8ec86204a..32537d5e0 100644 --- a/apps/pty-proxy/src/controller/agent-socket.ts +++ b/apps/pty-proxy/src/controller/agent-socket.ts @@ -9,11 +9,10 @@ import type WebSocket from "ws"; import type { MessageEvent } from "ws"; import { can, getUser } from "@ctrlplane/auth/utils"; -import { eq, selector, upsertResources } from "@ctrlplane/db"; +import { eq, upsertResources } from "@ctrlplane/db"; import { db } from "@ctrlplane/db/client"; import * as schema from "@ctrlplane/db/schema"; import { Channel, getQueue } from "@ctrlplane/events"; -import { replaceReleaseTargetsAndDispatchExitHooks } from "@ctrlplane/job-dispatch"; import { logger } from "@ctrlplane/logger"; import { Permission } from "@ctrlplane/validators/auth"; import { agentConnect, agentHeartbeat } from "@ctrlplane/validators/session"; @@ -139,13 +138,6 @@ export class AgentSocket { ]); const res = all.at(0); if (res == null) throw new Error("Failed to create resource"); - const cb = selector().compute(); - await Promise.all([ - cb.allEnvironments(this.workspaceId).resourceSelectors().replace(), - cb.allDeployments(this.workspaceId).resourceSelectors().replace(), - ]); - await replaceReleaseTargetsAndDispatchExitHooks(db, res); - await cb.allPolicies(this.workspaceId).releaseTargetSelectors().replace(); await getQueue(Channel.UpdatedResource).add(res.id, res); diff --git a/apps/webservice/src/app/api/v1/resources/[resourceId]/route.ts b/apps/webservice/src/app/api/v1/resources/[resourceId]/route.ts index 0c2db21bc..39917e308 100644 --- a/apps/webservice/src/app/api/v1/resources/[resourceId]/route.ts +++ b/apps/webservice/src/app/api/v1/resources/[resourceId]/route.ts @@ -5,10 +5,7 @@ import { z } from "zod"; import { and, eq, isNull, selector, upsertResources } from "@ctrlplane/db"; import * as schema from "@ctrlplane/db/schema"; import { Channel, getQueue } from "@ctrlplane/events"; -import { - deleteResources, - replaceReleaseTargetsAndDispatchExitHooks, -} from "@ctrlplane/job-dispatch"; +import { deleteResources } from "@ctrlplane/job-dispatch"; import { logger } from "@ctrlplane/logger"; import { variablesAES256 } from "@ctrlplane/secrets"; import { Permission } from "@ctrlplane/validators/auth"; @@ -100,24 +97,16 @@ export const PATCH = request() { error: "Resource not found" }, { status: 404 }, ); - const { workspaceId } = resource; const all = await upsertResources(db, [_.merge(resource, body)]); const res = all.at(0); if (res == null) throw new Error("Failed to update resource"); - const cb = selector().compute(); - await Promise.all([ - cb.allEnvironments(workspaceId).resourceSelectors().replace(), - cb.allDeployments(workspaceId).resourceSelectors().replace(), - ]); - await replaceReleaseTargetsAndDispatchExitHooks(db, res); - await cb - .allPolicies(resource.workspaceId) - .releaseTargetSelectors() - .replace(); - await getQueue(Channel.UpdatedResource).add(res.id, res); + selector() + .compute() + .allResourceSelectors(res.workspaceId) + .then(() => getQueue(Channel.UpdatedResource).add(res.id, res)); return NextResponse.json(res); } catch (err) { const error = err instanceof Error ? err : new Error(String(err)); diff --git a/apps/webservice/src/app/api/v1/resources/route.ts b/apps/webservice/src/app/api/v1/resources/route.ts index c7798b4de..fa027ff34 100644 --- a/apps/webservice/src/app/api/v1/resources/route.ts +++ b/apps/webservice/src/app/api/v1/resources/route.ts @@ -6,10 +6,7 @@ import { selector, upsertResources } from "@ctrlplane/db"; import { db } from "@ctrlplane/db/client"; import { createResource } from "@ctrlplane/db/schema"; import { Channel, getQueue } from "@ctrlplane/events"; -import { - groupResourcesByHook, - replaceReleaseTargetsAndDispatchExitHooks, -} from "@ctrlplane/job-dispatch"; +import { groupResourcesByHook } from "@ctrlplane/job-dispatch"; import { logger } from "@ctrlplane/logger"; import { Permission } from "@ctrlplane/validators/auth"; @@ -85,23 +82,13 @@ export const POST = request() data: r, })); - const cb = selector().compute(); - - await Promise.all([ - cb.allEnvironments(workspaceId).resourceSelectors().replace(), - cb.allDeployments(workspaceId).resourceSelectors().replace(), - ]); - - await Promise.all( - [...insertedResources, ...updatedResources].map((r) => - replaceReleaseTargetsAndDispatchExitHooks(db, r), - ), - ); - - await cb.allPolicies(workspaceId).releaseTargetSelectors().replace(); - - await getQueue(Channel.NewResource).addBulk(insertJobs); - await getQueue(Channel.UpdatedResource).addBulk(updateJobs); + selector() + .compute() + .allResourceSelectors(workspaceId) + .then(() => { + getQueue(Channel.NewResource).addBulk(insertJobs); + getQueue(Channel.UpdatedResource).addBulk(updateJobs); + }); const count = insertedResources.length + updatedResources.length; return NextResponse.json({ count }); diff --git a/package.json b/package.json index b3268c9cc..243e7eeff 100644 --- a/package.json +++ b/package.json @@ -38,7 +38,13 @@ "onlyBuiltDependencies": [ "@nestjs/core", "@openapitools/openapi-generator-cli", + "@scarf/scarf", + "@tree-sitter-grammars/tree-sitter-yaml", "core-js-pure", + "esbuild", + "msgpackr-extract", + "protobufjs", + "sharp", "tree-sitter", "tree-sitter-json", "tree-sitter-yaml" diff --git a/packages/db/src/policies/create.ts b/packages/db/src/policies/create.ts index 229266286..54b85d3bf 100644 --- a/packages/db/src/policies/create.ts +++ b/packages/db/src/policies/create.ts @@ -154,7 +154,6 @@ export const createPolicyInTx = async (tx: Tx, input: CreatePolicyInput) => { .compute() .policies([policyId]) .releaseTargetSelectors() - .replace() .catch((e) => log.error( e, diff --git a/packages/db/src/policies/update.ts b/packages/db/src/policies/update.ts index f4ec19626..42f48e05a 100644 --- a/packages/db/src/policies/update.ts +++ b/packages/db/src/policies/update.ts @@ -202,7 +202,6 @@ export const updatePolicyInTx = async ( .compute() .policies([policy.id]) .releaseTargetSelectors() - .replace() .catch((e) => log.error( e, @@ -211,5 +210,4 @@ export const updatePolicyInTx = async ( ); return updatedPolicy; - return updatedPolicy; }; diff --git a/packages/db/src/schema/deployment.ts b/packages/db/src/schema/deployment.ts index f5f1ef9df..283686e71 100644 --- a/packages/db/src/schema/deployment.ts +++ b/packages/db/src/schema/deployment.ts @@ -22,6 +22,7 @@ import { import { ColumnOperatorFn } from "../common.js"; import { jobAgent } from "./job-agent.js"; +import { releaseTarget } from "./release.js"; import { resource } from "./resource.js"; import { system } from "./system.js"; @@ -103,7 +104,7 @@ export const updateDeployment = deploymentInsert.partial(); export type UpdateDeployment = z.infer; export type Deployment = InferSelectModel; -export const deploymentRelations = relations(deployment, ({ one }) => ({ +export const deploymentRelations = relations(deployment, ({ one, many }) => ({ system: one(system, { fields: [deployment.systemId], references: [system.id], @@ -112,6 +113,8 @@ export const deploymentRelations = relations(deployment, ({ one }) => ({ fields: [deployment.jobAgentId], references: [jobAgent.id], }), + computedResources: many(computedDeploymentResource), + releaseTargets: many(releaseTarget), })); export const computedDeploymentResource = pgTable( @@ -127,6 +130,20 @@ export const computedDeploymentResource = pgTable( (t) => ({ pk: primaryKey({ columns: [t.deploymentId, t.resourceId] }) }), ); +export const computedDeploymentResourceRelations = relations( + computedDeploymentResource, + ({ one }) => ({ + deployment: one(deployment, { + fields: [computedDeploymentResource.deploymentId], + references: [deployment.id], + }), + resource: one(resource, { + fields: [computedDeploymentResource.resourceId], + references: [resource.id], + }), + }), +); + const buildCondition = (cond: DeploymentCondition): SQL => { if (cond.type === "name") return ColumnOperatorFn[cond.operator](deployment.name, cond.value); diff --git a/packages/db/src/schema/environment-relations.ts b/packages/db/src/schema/environment-relations.ts index c6fdf0c0c..5552a9c11 100644 --- a/packages/db/src/schema/environment-relations.ts +++ b/packages/db/src/schema/environment-relations.ts @@ -1,10 +1,13 @@ import { relations } from "drizzle-orm"; import { + computedEnvironmentResource, environment, environmentMetadata, environmentPolicy, } from "./environment.js"; +import { releaseTarget } from "./release.js"; +import { resource } from "./resource.js"; import { system } from "./system.js"; import { variableSetEnvironment } from "./variable-sets.js"; @@ -19,6 +22,8 @@ export const environmentRelations = relations(environment, ({ many, one }) => ({ references: [system.id], }), metadata: many(environmentMetadata), + computedResources: many(computedEnvironmentResource), + releaseTargets: many(releaseTarget), })); export const environmentMetadataRelations = relations( @@ -30,3 +35,17 @@ export const environmentMetadataRelations = relations( }), }), ); + +export const computedEnvironmentResourceRelations = relations( + computedEnvironmentResource, + ({ one }) => ({ + environment: one(environment, { + fields: [computedEnvironmentResource.environmentId], + references: [environment.id], + }), + resource: one(resource, { + fields: [computedEnvironmentResource.resourceId], + references: [resource.id], + }), + }), +); diff --git a/packages/db/src/selectors/compute/compute.ts b/packages/db/src/selectors/compute/compute.ts index 335e42ea8..da4e0ef45 100644 --- a/packages/db/src/selectors/compute/compute.ts +++ b/packages/db/src/selectors/compute/compute.ts @@ -1,4 +1,5 @@ import type { Tx } from "../../common.js"; +import type * as schema from "../../schema/index.js"; import { DeploymentBuilder, WorkspaceDeploymentBuilder, @@ -8,24 +9,35 @@ import { WorkspaceEnvironmentBuilder, } from "./environment-builder.js"; import { PolicyBuilder, WorkspacePolicyBuilder } from "./policy-builder.js"; +import { + ResourceBuilder, + WorkspaceResourceBuilder, +} from "./resource-builder.js"; export class ComputeBuilder { constructor(private readonly tx: Tx) {} + allResourceSelectors(workspaceId: string) { + return Promise.all([ + this.allEnvironments(workspaceId).resourceSelectors(), + this.allDeployments(workspaceId).resourceSelectors(), + ]); + } + allEnvironments(workspaceId: string) { return new WorkspaceEnvironmentBuilder(this.tx, workspaceId); } - environments(ids: string[]) { - return new EnvironmentBuilder(this.tx, ids); + environments(environments: schema.Environment[]) { + return new EnvironmentBuilder(this.tx, environments); } allDeployments(workspaceId: string) { return new WorkspaceDeploymentBuilder(this.tx, workspaceId); } - deployments(ids: string[]) { - return new DeploymentBuilder(this.tx, ids); + deployments(deployments: schema.Deployment[]) { + return new DeploymentBuilder(this.tx, deployments); } allPolicies(workspaceId: string) { @@ -35,4 +47,12 @@ export class ComputeBuilder { policies(ids: string[]) { return new PolicyBuilder(this.tx, ids); } + + resources(resources: schema.Resource[]) { + return new ResourceBuilder(this.tx, resources); + } + + allResources(workspaceId: string) { + return new WorkspaceResourceBuilder(this.tx, workspaceId); + } } diff --git a/packages/db/src/selectors/compute/deployment-builder.ts b/packages/db/src/selectors/compute/deployment-builder.ts index bfd2c3805..a2d9f9b05 100644 --- a/packages/db/src/selectors/compute/deployment-builder.ts +++ b/packages/db/src/selectors/compute/deployment-builder.ts @@ -3,123 +3,144 @@ import { and, eq, inArray, isNotNull } from "drizzle-orm/pg-core/expressions"; import type { Tx } from "../../common.js"; import * as SCHEMA from "../../schema/index.js"; import { QueryBuilder } from "../query/builder.js"; -import { ReplaceBuilder } from "./replace-builder.js"; export class DeploymentBuilder { - private readonly _queryBuilder; constructor( private readonly tx: Tx, - private readonly ids: string[], - ) { - this._queryBuilder = new QueryBuilder(tx); + private readonly deployments: SCHEMA.Deployment[], + ) {} + + private get deploymentIds() { + return this.deployments.map((d) => d.id); + } + + private async deleteExistingComputedResources(tx: Tx) { + await tx + .delete(SCHEMA.computedDeploymentResource) + .where( + inArray( + SCHEMA.computedDeploymentResource.deploymentId, + this.deploymentIds, + ), + ); + } + + private async findMatchingResourcesForDeployments(tx: Tx) { + const deployments = await tx.query.deployment.findMany({ + where: inArray(SCHEMA.deployment.id, this.deploymentIds), + with: { system: true }, + }); + + const promises = deployments.map(async (d) => { + const { system } = d; + const { workspaceId } = system; + if (d.resourceSelector == null) return []; + const qb = new QueryBuilder(tx); + const resources = await tx.query.resource.findMany({ + where: and( + eq(SCHEMA.resource.workspaceId, workspaceId), + qb.resources().where(d.resourceSelector).sql(), + ), + }); + + return resources.map((r) => ({ + deploymentId: d.id, + resourceId: r.id, + })); + }); + + const fulfilled = await Promise.all(promises); + return fulfilled.flat(); } resourceSelectors() { - return new ReplaceBuilder( - this.tx, - SCHEMA.computedDeploymentResource, - async (tx) => { - await tx - .delete(SCHEMA.computedDeploymentResource) - .where( - inArray(SCHEMA.computedDeploymentResource.deploymentId, this.ids), - ); - }, - async (tx) => { - const deployments = await tx.query.deployment.findMany({ - where: inArray(SCHEMA.deployment.id, this.ids), - with: { system: true }, - }); - - const promises = deployments.map(async (d) => { - const { system } = d; - const { workspaceId } = system; - if (d.resourceSelector == null) return []; - const resources = await tx.query.resource.findMany({ - where: and( - eq(SCHEMA.resource.workspaceId, workspaceId), - this._queryBuilder.resources().where(d.resourceSelector).sql(), - ), - }); - - return resources.map((r) => ({ - deploymentId: d.id, - resourceId: r.id, - })); - }); - - const fulfilled = await Promise.all(promises); - return fulfilled.flat(); - }, - ); + return this.tx.transaction(async (tx) => { + await this.deleteExistingComputedResources(tx); + const computedResourceInserts = + await this.findMatchingResourcesForDeployments(tx); + if (computedResourceInserts.length === 0) return []; + return tx + .insert(SCHEMA.computedDeploymentResource) + .values(computedResourceInserts) + .onConflictDoNothing() + .returning(); + }); } } -const getDeploymentsInWorkspace = async (tx: Tx, workspaceId: string) => { - const workspace = await tx.query.workspace.findFirst({ - where: eq(SCHEMA.workspace.id, workspaceId), - with: { - systems: { - with: { - deployments: { - where: isNotNull(SCHEMA.deployment.resourceSelector), - }, - }, - }, - }, - }); - if (workspace == null) throw new Error(`Workspace not found: ${workspaceId}`); - return workspace.systems.flatMap((s) => s.deployments); -}; - export class WorkspaceDeploymentBuilder { - private readonly _queryBuilder; constructor( private readonly tx: Tx, private readonly workspaceId: string, - ) { - this._queryBuilder = new QueryBuilder(tx); + ) {} + + private getDeploymentsInWorkspace(tx: Tx) { + return tx + .select() + .from(SCHEMA.deployment) + .innerJoin( + SCHEMA.system, + eq(SCHEMA.deployment.systemId, SCHEMA.system.id), + ) + .where( + and( + eq(SCHEMA.system.workspaceId, this.workspaceId), + isNotNull(SCHEMA.deployment.resourceSelector), + ), + ) + .then((m) => m.map((d) => d.deployment)); } - resourceSelectors() { - return new ReplaceBuilder( - this.tx, - SCHEMA.computedDeploymentResource, - async (tx) => { - const deployments = await getDeploymentsInWorkspace( - tx, - this.workspaceId, - ); - await tx.delete(SCHEMA.computedDeploymentResource).where( - inArray( - SCHEMA.computedDeploymentResource.deploymentId, - deployments.map((d) => d.id), - ), - ); - }, - async (tx) => { - const deployments = await getDeploymentsInWorkspace( - tx, - this.workspaceId, - ); - const promises = deployments.map(async (d) => { - if (d.resourceSelector == null) return []; - const resources = await tx.query.resource.findMany({ - where: and( - eq(SCHEMA.resource.workspaceId, this.workspaceId), - this._queryBuilder.resources().where(d.resourceSelector).sql(), - ), - }); - - return resources.map((r) => ({ - deploymentId: d.id, - resourceId: r.id, - })); - }); - - const fulfilled = await Promise.all(promises); - return fulfilled.flat(); - }, + private async deleteExistingComputedResources( + tx: Tx, + deployments: SCHEMA.Deployment[], + ) { + await tx.delete(SCHEMA.computedDeploymentResource).where( + inArray( + SCHEMA.computedDeploymentResource.deploymentId, + deployments.map((d) => d.id), + ), ); } + + private async findMatchingResourcesForDeployments( + tx: Tx, + deployments: SCHEMA.Deployment[], + ) { + const promises = deployments.map(async (d) => { + if (d.resourceSelector == null) return []; + const qb = new QueryBuilder(tx); + const resources = await tx.query.resource.findMany({ + where: and( + eq(SCHEMA.resource.workspaceId, this.workspaceId), + qb.resources().where(d.resourceSelector).sql(), + ), + }); + + return resources.map((r) => ({ + deploymentId: d.id, + resourceId: r.id, + })); + }); + + const fulfilled = await Promise.all(promises); + return fulfilled.flat(); + } + + async resourceSelectors() { + return this.tx.transaction(async (tx) => { + const deployments = await this.getDeploymentsInWorkspace(tx); + await this.deleteExistingComputedResources(tx, deployments); + const computedResourceInserts = + await this.findMatchingResourcesForDeployments(tx, deployments); + + if (computedResourceInserts.length === 0) return []; + + return tx + .insert(SCHEMA.computedDeploymentResource) + .values(computedResourceInserts) + .onConflictDoNothing() + .returning(); + }); + } } diff --git a/packages/db/src/selectors/compute/environment-builder.ts b/packages/db/src/selectors/compute/environment-builder.ts index ea7334e5c..43bc8c171 100644 --- a/packages/db/src/selectors/compute/environment-builder.ts +++ b/packages/db/src/selectors/compute/environment-builder.ts @@ -3,117 +3,143 @@ import { and, eq, inArray, isNotNull } from "drizzle-orm/pg-core/expressions"; import type { Tx } from "../../common.js"; import * as SCHEMA from "../../schema/index.js"; import { QueryBuilder } from "../query/builder.js"; -import { ReplaceBuilder } from "./replace-builder.js"; export class EnvironmentBuilder { - private readonly _queryBuilder; constructor( private readonly tx: Tx, - private readonly ids: string[], - ) { - this._queryBuilder = new QueryBuilder(tx); + private readonly environments: SCHEMA.Environment[], + ) {} + + private deleteExistingComputedResources(tx: Tx) { + return tx + .delete(SCHEMA.computedEnvironmentResource) + .where( + inArray( + SCHEMA.computedEnvironmentResource.environmentId, + this.environmentIds, + ), + ); + } + + private get environmentIds() { + return this.environments.map((e) => e.id); + } + + private async findMatchingResourcesForEnvironments(tx: Tx) { + const envs = await tx.query.environment.findMany({ + where: and( + inArray(SCHEMA.environment.id, this.environmentIds), + isNotNull(SCHEMA.environment.resourceSelector), + ), + with: { system: true }, + }); + + const promises = envs.map(async (env) => { + const environmentId = env.id; + const { system } = env; + const { workspaceId } = system; + if (env.resourceSelector == null) return []; + const qb = new QueryBuilder(tx); + const resources = await tx.query.resource.findMany({ + where: and( + eq(SCHEMA.resource.workspaceId, workspaceId), + qb.resources().where(env.resourceSelector).sql(), + ), + }); + return resources.map((r) => ({ environmentId, resourceId: r.id })); + }); + + const fulfilled = await Promise.all(promises); + return fulfilled.flat(); } resourceSelectors() { - return new ReplaceBuilder( - this.tx, - SCHEMA.computedEnvironmentResource, - async (tx) => { - await tx - .delete(SCHEMA.computedEnvironmentResource) - .where( - inArray(SCHEMA.computedEnvironmentResource.environmentId, this.ids), - ); - }, - async (tx) => { - const envs = await tx.query.environment.findMany({ - where: inArray(SCHEMA.environment.id, this.ids), - with: { system: true }, - }); - - const promises = envs.map(async (env) => { - const { system } = env; - const { workspaceId } = system; - if (env.resourceSelector == null) return []; - const resources = await this.tx.query.resource.findMany({ - where: and( - eq(SCHEMA.resource.workspaceId, workspaceId), - this._queryBuilder.resources().where(env.resourceSelector).sql(), - ), - }); - - return resources.map((r) => ({ - environmentId: env.id, - resourceId: r.id, - })); - }); - - const fulfilled = await Promise.all(promises); - return fulfilled.flat(); - }, - ); + return this.tx.transaction(async (tx) => { + await this.deleteExistingComputedResources(tx); + const vals = await this.findMatchingResourcesForEnvironments(tx); + + if (vals.length === 0) return []; + + const results = await tx + .insert(SCHEMA.computedEnvironmentResource) + .values(vals) + .onConflictDoNothing() + .returning(); + + return results; + }); } } -const getEnvsInWorkspace = async (tx: Tx, workspaceId: string) => { - const workspace = await tx.query.workspace.findFirst({ - where: eq(SCHEMA.workspace.id, workspaceId), - with: { - systems: { - with: { - environments: { - where: isNotNull(SCHEMA.environment.resourceSelector), - }, - }, - }, - }, - }); - if (workspace == null) throw new Error(`Workspace not found: ${workspaceId}`); - return workspace.systems.flatMap((s) => s.environments); -}; - export class WorkspaceEnvironmentBuilder { - private readonly _queryBuilder; constructor( private readonly tx: Tx, private readonly workspaceId: string, - ) { - this._queryBuilder = new QueryBuilder(tx); + ) {} + + private async getEnvironmentsWithSelectors(tx: Tx) { + return tx + .select({ environment: SCHEMA.environment }) + .from(SCHEMA.environment) + .innerJoin( + SCHEMA.system, + eq(SCHEMA.environment.systemId, SCHEMA.system.id), + ) + .where( + and( + eq(SCHEMA.system.workspaceId, this.workspaceId), + isNotNull(SCHEMA.environment.resourceSelector), + ), + ) + .then((m) => m.map((e) => e.environment)); } - resourceSelectors() { - return new ReplaceBuilder( - this.tx, - SCHEMA.computedEnvironmentResource, - async (tx) => { - const envs = await getEnvsInWorkspace(tx, this.workspaceId); - await tx.delete(SCHEMA.computedEnvironmentResource).where( - inArray( - SCHEMA.computedEnvironmentResource.environmentId, - envs.map((e) => e.id), - ), - ); - }, - async (tx) => { - const envs = await getEnvsInWorkspace(tx, this.workspaceId); - const promises = envs.map(async (env) => { - if (env.resourceSelector == null) return []; - const resources = await tx.query.resource.findMany({ - where: and( - eq(SCHEMA.resource.workspaceId, this.workspaceId), - this._queryBuilder.resources().where(env.resourceSelector).sql(), - ), - }); - - return resources.map((r) => ({ - environmentId: env.id, - resourceId: r.id, - })); - }); - - const fulfilled = await Promise.all(promises); - return fulfilled.flat(); - }, + private async deleteExistingComputedResources(tx: Tx) { + const envs = await this.getEnvironmentsWithSelectors(tx); + await tx.delete(SCHEMA.computedEnvironmentResource).where( + inArray( + SCHEMA.computedEnvironmentResource.environmentId, + envs.map((e) => e.id), + ), ); } + + private async findMatchingResourcesForEnvironments(tx: Tx) { + const envs = await this.getEnvironmentsWithSelectors(tx); + const promises = envs.map(async (env) => { + if (env.resourceSelector == null) return []; + const qb = new QueryBuilder(tx); + const resources = await tx.query.resource.findMany({ + where: and( + eq(SCHEMA.resource.workspaceId, this.workspaceId), + qb.resources().where(env.resourceSelector).sql(), + ), + }); + + return resources.map((r) => ({ + environmentId: env.id, + resourceId: r.id, + })); + }); + + const fulfilled = await Promise.all(promises); + return fulfilled.flat(); + } + + resourceSelectors() { + return this.tx.transaction(async (tx) => { + await this.deleteExistingComputedResources(tx); + const vals = await this.findMatchingResourcesForEnvironments(tx); + + if (vals.length === 0) return []; + + const results = await tx + .insert(SCHEMA.computedEnvironmentResource) + .values(vals) + .onConflictDoNothing() + .returning(); + + return results; + }); + } } diff --git a/packages/db/src/selectors/compute/policy-builder.ts b/packages/db/src/selectors/compute/policy-builder.ts index e31f9b3b7..3fa069db3 100644 --- a/packages/db/src/selectors/compute/policy-builder.ts +++ b/packages/db/src/selectors/compute/policy-builder.ts @@ -3,78 +3,86 @@ import { and, eq, inArray } from "drizzle-orm/pg-core/expressions"; import type { Tx } from "../../common.js"; import * as SCHEMA from "../../schema/index.js"; import { QueryBuilder } from "../query/builder.js"; -import { ReplaceBuilder } from "./replace-builder.js"; export class PolicyBuilder { + private targets: SCHEMA.PolicyTarget[]; constructor( private readonly tx: Tx, private readonly ids: string[], - ) {} + ) { + this.targets = []; + } - releaseTargetSelectors() { - return new ReplaceBuilder( - this.tx, - SCHEMA.computedPolicyTargetReleaseTarget, - async (tx) => { - const targets = await tx - .select() - .from(SCHEMA.policyTarget) - .where(inArray(SCHEMA.policyTarget.policyId, this.ids)); + private async getTargets(tx: Tx) { + this.targets = await tx + .select() + .from(SCHEMA.policyTarget) + .where(inArray(SCHEMA.policyTarget.policyId, this.ids)); + } - await tx.delete(SCHEMA.computedPolicyTargetReleaseTarget).where( - inArray( - SCHEMA.computedPolicyTargetReleaseTarget.policyTargetId, - targets.map((t) => t.id), + private async deleteExistingComputedReleaseTargets(tx: Tx) { + await tx.delete(SCHEMA.computedPolicyTargetReleaseTarget).where( + inArray( + SCHEMA.computedPolicyTargetReleaseTarget.policyTargetId, + this.targets.map((t) => t.id), + ), + ); + } + + private async findMatchingReleaseTargetsForTargets(tx: Tx) { + const qb = new QueryBuilder(tx); + const targetPromises = this.targets.map(async (t) => { + if ( + t.resourceSelector == null && + t.deploymentSelector == null && + t.environmentSelector == null + ) + return []; + const releaseTargets = await tx + .select() + .from(SCHEMA.releaseTarget) + .innerJoin( + SCHEMA.resource, + eq(SCHEMA.releaseTarget.resourceId, SCHEMA.resource.id), + ) + .innerJoin( + SCHEMA.deployment, + eq(SCHEMA.releaseTarget.deploymentId, SCHEMA.deployment.id), + ) + .innerJoin( + SCHEMA.environment, + eq(SCHEMA.releaseTarget.environmentId, SCHEMA.environment.id), + ) + .where( + and( + qb.resources().where(t.resourceSelector).sql(), + qb.deployments().where(t.deploymentSelector).sql(), + qb.environments().where(t.environmentSelector).sql(), ), ); - }, - async (tx) => { - const targets = await tx - .select() - .from(SCHEMA.policyTarget) - .where(inArray(SCHEMA.policyTarget.policyId, this.ids)); - const qb = new QueryBuilder(tx); - const targetPromises = targets.map(async (t) => { - if ( - t.resourceSelector == null && - t.deploymentSelector == null && - t.environmentSelector == null - ) - return []; - const releaseTargets = await tx - .select() - .from(SCHEMA.releaseTarget) - .innerJoin( - SCHEMA.resource, - eq(SCHEMA.releaseTarget.resourceId, SCHEMA.resource.id), - ) - .innerJoin( - SCHEMA.deployment, - eq(SCHEMA.releaseTarget.deploymentId, SCHEMA.deployment.id), - ) - .innerJoin( - SCHEMA.environment, - eq(SCHEMA.releaseTarget.environmentId, SCHEMA.environment.id), - ) - .where( - and( - qb.resources().where(t.resourceSelector).sql(), - qb.deployments().where(t.deploymentSelector).sql(), - qb.environments().where(t.environmentSelector).sql(), - ), - ); + return releaseTargets.map((rt) => ({ + policyTargetId: t.id, + releaseTargetId: rt.release_target.id, + })); + }); - return releaseTargets.map((rt) => ({ - policyTargetId: t.id, - releaseTargetId: rt.release_target.id, - })); - }); + const fulfilled = await Promise.all(targetPromises); + return fulfilled.flat(); + } - const fulfilled = await Promise.all(targetPromises); - return fulfilled.flat(); - }, - ); + releaseTargetSelectors() { + return this.tx.transaction(async (tx) => { + await this.getTargets(tx); + await this.deleteExistingComputedReleaseTargets(tx); + const computedPolicyTargetReleaseTargetInserts = + await this.findMatchingReleaseTargetsForTargets(tx); + if (computedPolicyTargetReleaseTargetInserts.length === 0) return []; + return tx + .insert(SCHEMA.computedPolicyTargetReleaseTarget) + .values(computedPolicyTargetReleaseTargetInserts) + .onConflictDoNothing(); + }); } } @@ -85,71 +93,64 @@ export class WorkspacePolicyBuilder { ) {} releaseTargetSelectors() { - return new ReplaceBuilder( - this.tx, - SCHEMA.computedPolicyTargetReleaseTarget, - async (tx) => { - const targets = await tx + return this.tx.transaction(async (tx) => { + const targets = await tx + .select() + .from(SCHEMA.policyTarget) + .innerJoin( + SCHEMA.policy, + eq(SCHEMA.policyTarget.policyId, SCHEMA.policy.id), + ) + .where(eq(SCHEMA.policy.workspaceId, this.workspaceId)) + .then((rows) => rows.map((r) => r.policy_target)); + + await tx.delete(SCHEMA.computedPolicyTargetReleaseTarget).where( + inArray( + SCHEMA.computedPolicyTargetReleaseTarget.policyTargetId, + targets.map((t) => t.id), + ), + ); + + const qb = new QueryBuilder(tx); + const targetPromises = targets.map(async (t) => { + const releaseTargets = await tx .select() - .from(SCHEMA.policyTarget) + .from(SCHEMA.releaseTarget) .innerJoin( - SCHEMA.policy, - eq(SCHEMA.policyTarget.policyId, SCHEMA.policy.id), + SCHEMA.resource, + eq(SCHEMA.releaseTarget.resourceId, SCHEMA.resource.id), ) - .where(eq(SCHEMA.policy.workspaceId, this.workspaceId)); - - await tx.delete(SCHEMA.computedPolicyTargetReleaseTarget).where( - inArray( - SCHEMA.computedPolicyTargetReleaseTarget.policyTargetId, - targets.map((t) => t.policy_target.id), - ), - ); - }, - async (tx) => { - const rows = await tx - .select() - .from(SCHEMA.policyTarget) .innerJoin( - SCHEMA.policy, - eq(SCHEMA.policyTarget.policyId, SCHEMA.policy.id), + SCHEMA.deployment, + eq(SCHEMA.releaseTarget.deploymentId, SCHEMA.deployment.id), ) - .where(eq(SCHEMA.policy.workspaceId, this.workspaceId)); - const targets = rows.map((r) => r.policy_target); + .innerJoin( + SCHEMA.environment, + eq(SCHEMA.releaseTarget.environmentId, SCHEMA.environment.id), + ) + .where( + and( + qb.resources().where(t.resourceSelector).sql(), + qb.deployments().where(t.deploymentSelector).sql(), + qb.environments().where(t.environmentSelector).sql(), + ), + ); - const qb = new QueryBuilder(tx); - const targetPromises = targets.map(async (t) => { - const releaseTargets = await tx - .select() - .from(SCHEMA.releaseTarget) - .innerJoin( - SCHEMA.resource, - eq(SCHEMA.releaseTarget.resourceId, SCHEMA.resource.id), - ) - .innerJoin( - SCHEMA.deployment, - eq(SCHEMA.releaseTarget.deploymentId, SCHEMA.deployment.id), - ) - .innerJoin( - SCHEMA.environment, - eq(SCHEMA.releaseTarget.environmentId, SCHEMA.environment.id), - ) - .where( - and( - qb.resources().where(t.resourceSelector).sql(), - qb.deployments().where(t.deploymentSelector).sql(), - qb.environments().where(t.environmentSelector).sql(), - ), - ); + return releaseTargets.map((rt) => ({ + policyTargetId: t.id, + releaseTargetId: rt.release_target.id, + })); + }); - return releaseTargets.map((rt) => ({ - policyTargetId: t.id, - releaseTargetId: rt.release_target.id, - })); - }); + const fulfilled = await Promise.all(targetPromises); + const computedPolicyTargetReleaseTargetInserts = fulfilled.flat(); - const fulfilled = await Promise.all(targetPromises); - return fulfilled.flat(); - }, - ); + if (computedPolicyTargetReleaseTargetInserts.length === 0) return []; + return tx + .insert(SCHEMA.computedPolicyTargetReleaseTarget) + .values(computedPolicyTargetReleaseTargetInserts) + .onConflictDoNothing() + .returning(); + }); } } diff --git a/packages/db/src/selectors/compute/replace-builder.ts b/packages/db/src/selectors/compute/replace-builder.ts deleted file mode 100644 index c61b70db3..000000000 --- a/packages/db/src/selectors/compute/replace-builder.ts +++ /dev/null @@ -1,26 +0,0 @@ -import type { InferInsertModel } from "drizzle-orm"; -import type { PgTableWithColumns } from "drizzle-orm/pg-core"; - -import type { Tx } from "../../common.js"; - -export class ReplaceBuilder> { - constructor( - private readonly tx: Tx, - private readonly table: T, - private readonly deletePrevious: (tx: Tx) => Promise, - private readonly values: (tx: Tx) => Promise[]>, - ) {} - - async replace() { - return this.tx.transaction(async (tx) => { - await this.deletePrevious(tx); - const vals = await this.values(tx); - if (vals.length === 0) return; - return tx - .insert(this.table) - .values(vals) - .onConflictDoNothing() - .returning(); - }); - } -} diff --git a/packages/db/src/selectors/compute/resource-builder.ts b/packages/db/src/selectors/compute/resource-builder.ts new file mode 100644 index 000000000..0214c1da1 --- /dev/null +++ b/packages/db/src/selectors/compute/resource-builder.ts @@ -0,0 +1,250 @@ +import { and, eq, inArray, isNull, or } from "drizzle-orm/pg-core/expressions"; + +import type { Tx } from "../../common.js"; +import * as SCHEMA from "../../schema/index.js"; +import { WorkspacePolicyBuilder } from "./policy-builder.js"; + +/** + * Builder class for computing release targets for a set of resources. + * + * This class handles: + * 1. Finding matching environment-deployment pairs for resources + * 2. Creating release targets for those matches + * 3. Updating policy release target selectors + * + * All resources must belong to the same workspace. + */ +export class ResourceBuilder { + private readonly workspaceId: string; + + constructor( + private readonly tx: Tx, + private readonly resources: SCHEMA.Resource[], + ) { + const workspaceIds = new Set(resources.map((r) => r.workspaceId)); + if (workspaceIds.size !== 1) + throw new Error("All resources must be in the same workspace"); + this.workspaceId = Array.from(workspaceIds)[0]!; + } + + private get resourceIds() { + return this.resources.map((r) => r.id); + } + + private deleteExistingReleaseTargets(tx: Tx) { + return tx + .delete(SCHEMA.releaseTarget) + .where(inArray(SCHEMA.releaseTarget.resourceId, this.resourceIds)); + } + + /** + * Finds matching environment-deployment pairs for the given resources. + * + * A resource matches an environment-deployment pair if: + * 1. The resource matches the environment's selector (via + * computedEnvironmentResource) + * 2. Either: + * - The deployment's resourceSelector is null (meaning it includes all + * resources that match the environment's selector), OR + * - The resource matches the deployment's selector (via + * computedDeploymentResource) + * + * The query joins: + * - Resources with their computed environment matches + * - Those environments with their system's deployments + * - Optionally joins with computed deployment matches + * + * Returns environment ID, deployment ID and resource ID for each match. + * + * @note We assume the computed environment resource selector is up-to-date. + */ + private findMatchingEnvironmentDeploymentPairs(tx: Tx) { + const isResourceMatchingEnvironment = eq( + SCHEMA.computedEnvironmentResource.resourceId, + SCHEMA.resource.id, + ); + const isResourceMatchingDeployment = or( + isNull(SCHEMA.deployment.resourceSelector), + eq(SCHEMA.computedDeploymentResource.resourceId, SCHEMA.resource.id), + ); + + return tx + .select({ + environmentId: SCHEMA.environment.id, + deploymentId: SCHEMA.deployment.id, + resourceId: SCHEMA.resource.id, + }) + .from(SCHEMA.resource) + .innerJoin( + SCHEMA.computedEnvironmentResource, + eq(SCHEMA.computedEnvironmentResource.resourceId, SCHEMA.resource.id), + ) + .innerJoin( + SCHEMA.environment, + eq( + SCHEMA.computedEnvironmentResource.environmentId, + SCHEMA.environment.id, + ), + ) + .innerJoin( + SCHEMA.deployment, + eq(SCHEMA.deployment.systemId, SCHEMA.environment.systemId), + ) + .leftJoin( + SCHEMA.computedDeploymentResource, + eq( + SCHEMA.computedDeploymentResource.deploymentId, + SCHEMA.deployment.id, + ), + ) + .where( + and( + isResourceMatchingEnvironment, + isResourceMatchingDeployment, + inArray(SCHEMA.resource.id, this.resourceIds), + ), + ); + } + + private recomputePolicyReleaseTargets(tx: Tx) { + const policyComputer = new WorkspacePolicyBuilder(tx, this.workspaceId); + return policyComputer.releaseTargetSelectors(); + } + + releaseTargets() { + return this.tx.transaction(async (tx) => { + await this.deleteExistingReleaseTargets(tx); + const vals = await this.findMatchingEnvironmentDeploymentPairs(tx); + if (vals.length === 0) return []; + + const results = await tx + .insert(SCHEMA.releaseTarget) + .values(vals) + .onConflictDoNothing() + .returning(); + + await this.recomputePolicyReleaseTargets(tx); + + return results; + }); + } +} + +export class WorkspaceResourceBuilder { + constructor( + private readonly tx: Tx, + private readonly workspaceId: string, + ) {} + + private getResourcesInWorkspace(tx: Tx) { + return tx.query.resource.findMany({ + where: eq(SCHEMA.resource.workspaceId, this.workspaceId), + }); + } + + private deleteExistingReleaseTargets(tx: Tx, resourceIds: string[]) { + return tx + .delete(SCHEMA.releaseTarget) + .where(inArray(SCHEMA.releaseTarget.resourceId, resourceIds)); + } + + /** + * Finds matching environment-deployment pairs for the given resources. + * + * A resource matches an environment-deployment pair if: + * 1. The resource matches the environment's selector (via + * computedEnvironmentResource) + * 2. Either: + * - The deployment's resourceSelector is null (meaning it includes all + * resources that match the environment's selector), OR + * - The resource matches the deployment's selector (via + * computedDeploymentResource) + * + * The query joins: + * - Resources with their computed environment matches + * - Those environments with their system's deployments + * - Optionally joins with computed deployment matches + * + * Returns environment ID, deployment ID and resource ID for each match. + * + * @note We assume the computed environment resource selector is up-to-date. + */ + private findMatchingEnvironmentDeploymentPairs( + tx: Tx, + resourceIds: string[], + ) { + const isResourceMatchingEnvironment = eq( + SCHEMA.computedEnvironmentResource.resourceId, + SCHEMA.resource.id, + ); + const isResourceMatchingDeployment = or( + isNull(SCHEMA.deployment.resourceSelector), + eq(SCHEMA.computedDeploymentResource.resourceId, SCHEMA.resource.id), + ); + + return tx + .select({ + environmentId: SCHEMA.environment.id, + deploymentId: SCHEMA.deployment.id, + resourceId: SCHEMA.resource.id, + }) + .from(SCHEMA.resource) + .innerJoin( + SCHEMA.computedEnvironmentResource, + eq(SCHEMA.computedEnvironmentResource.resourceId, SCHEMA.resource.id), + ) + .innerJoin( + SCHEMA.environment, + eq( + SCHEMA.computedEnvironmentResource.environmentId, + SCHEMA.environment.id, + ), + ) + .innerJoin( + SCHEMA.deployment, + eq(SCHEMA.deployment.systemId, SCHEMA.environment.systemId), + ) + .leftJoin( + SCHEMA.computedDeploymentResource, + eq( + SCHEMA.computedDeploymentResource.deploymentId, + SCHEMA.deployment.id, + ), + ) + .where( + and( + isResourceMatchingEnvironment, + isResourceMatchingDeployment, + inArray(SCHEMA.resource.id, resourceIds), + ), + ); + } + + private recomputePolicyReleaseTargets(tx: Tx) { + const policyComputer = new WorkspacePolicyBuilder(tx, this.workspaceId); + return policyComputer.releaseTargetSelectors(); + } + + releaseTargets() { + return this.tx.transaction(async (tx) => { + const resources = await this.getResourcesInWorkspace(tx); + const resourceIds = resources.map((r) => r.id); + await this.deleteExistingReleaseTargets(tx, resourceIds); + const vals = await this.findMatchingEnvironmentDeploymentPairs( + tx, + resourceIds, + ); + if (vals.length === 0) return []; + + const results = await tx + .insert(SCHEMA.releaseTarget) + .values(vals) + .onConflictDoNothing() + .returning(); + + await this.recomputePolicyReleaseTargets(tx); + + return results; + }); + } +} diff --git a/packages/job-dispatch/src/resource/handle-provider-scan.ts b/packages/job-dispatch/src/resource/handle-provider-scan.ts index 7c40c7456..4195bf152 100644 --- a/packages/job-dispatch/src/resource/handle-provider-scan.ts +++ b/packages/job-dispatch/src/resource/handle-provider-scan.ts @@ -7,7 +7,6 @@ import { logger } from "@ctrlplane/logger"; import { deleteResources } from "./delete.js"; import { groupResourcesByHook } from "./group-resources-by-hook.js"; -import { replaceReleaseTargetsAndDispatchExitHooks } from "./replace-release-targets.js"; const log = logger.child({ label: "upsert-resources" }); @@ -44,22 +43,12 @@ export const handleResourceProviderScan = async ( const insertJobs = insertedResources.map((r) => ({ name: r.id, data: r })); const updateJobs = updatedResources.map((r) => ({ name: r.id, data: r })); + const deleted = await deleteResources(tx, toDelete); - const cb = selector().compute(); - await Promise.all([ - cb.allEnvironments(workspaceId).resourceSelectors().replace(), - cb.allDeployments(workspaceId).resourceSelectors().replace(), - ]); - const promises = [...insertedResources, ...updatedResources].map( - (resource) => replaceReleaseTargetsAndDispatchExitHooks(tx, resource), - ); - await Promise.all(promises); - await cb.allPolicies(workspaceId).releaseTargetSelectors().replace(); - + await selector().compute().allResourceSelectors(workspaceId); await getQueue(Channel.NewResource).addBulk(insertJobs); await getQueue(Channel.UpdatedResource).addBulk(updateJobs); - const deleted = await deleteResources(tx, toDelete); log.info("completed handling resource provider scan"); return { all: [...insertedResources, ...updatedResources], deleted }; } catch (error) { diff --git a/packages/job-dispatch/src/resource/index.ts b/packages/job-dispatch/src/resource/index.ts index a05da2ce9..f8079e83f 100644 --- a/packages/job-dispatch/src/resource/index.ts +++ b/packages/job-dispatch/src/resource/index.ts @@ -2,4 +2,3 @@ export * from "./handle-provider-scan.js"; export * from "./delete.js"; export * from "./dispatch-resource.js"; export * from "./group-resources-by-hook.js"; -export * from "./replace-release-targets.js"; diff --git a/packages/job-dispatch/src/resource/replace-release-targets.ts b/packages/job-dispatch/src/resource/replace-release-targets.ts deleted file mode 100644 index f322c36b2..000000000 --- a/packages/job-dispatch/src/resource/replace-release-targets.ts +++ /dev/null @@ -1,86 +0,0 @@ -import type { Tx } from "@ctrlplane/db"; - -import { and, eq, inArray } from "@ctrlplane/db"; -import * as SCHEMA from "@ctrlplane/db/schema"; -import { HookAction } from "@ctrlplane/validators/events"; - -import { handleEvent } from "../events/index.js"; - -export const replaceReleaseTargetsAndDispatchExitHooks = async ( - db: Tx, - resource: SCHEMA.Resource, -) => { - const currReleaseTargets = await db.query.releaseTarget.findMany({ - where: eq(SCHEMA.releaseTarget.resourceId, resource.id), - }); - - const newReleaseTargets = await db.transaction(async (db) => { - await db - .delete(SCHEMA.releaseTarget) - .where(eq(SCHEMA.releaseTarget.resourceId, resource.id)); - - const rows = await db - .select() - .from(SCHEMA.computedEnvironmentResource) - .innerJoin( - SCHEMA.environment, - eq( - SCHEMA.computedEnvironmentResource.environmentId, - SCHEMA.environment.id, - ), - ) - .innerJoin( - SCHEMA.deployment, - eq(SCHEMA.deployment.systemId, SCHEMA.environment.systemId), - ) - .leftJoin( - SCHEMA.computedDeploymentResource, - and( - eq( - SCHEMA.computedDeploymentResource.deploymentId, - SCHEMA.deployment.id, - ), - eq(SCHEMA.computedDeploymentResource.resourceId, resource.id), - ), - ) - .where(eq(SCHEMA.computedEnvironmentResource.resourceId, resource.id)); - - const targets = rows - .filter( - (r) => - r.deployment.resourceSelector == null || - r.computed_deployment_resource != null, - ) - .map((r) => ({ - environmentId: r.environment.id, - deploymentId: r.deployment.id, - resourceId: resource.id, - })); - - if (targets.length === 0) return []; - return db - .insert(SCHEMA.releaseTarget) - .values(targets) - .onConflictDoNothing() - .returning(); - }); - - const previousDeploymentIds = currReleaseTargets.map((rt) => rt.deploymentId); - const newDeploymentIds = newReleaseTargets.map((t) => t.deploymentId); - const exitedDeploymentIds = previousDeploymentIds.filter( - (id) => !newDeploymentIds.includes(id), - ); - const exitedDeployments = await db.query.deployment.findMany({ - where: inArray(SCHEMA.deployment.id, exitedDeploymentIds), - }); - - const events = exitedDeployments.map((deployment) => ({ - action: HookAction.DeploymentResourceRemoved, - payload: { deployment, resource }, - })); - - const handleEventPromises = events.map(handleEvent); - await Promise.allSettled(handleEventPromises); - - return newReleaseTargets; -}; diff --git a/packages/rule-engine/src/manager/version-manager.ts b/packages/rule-engine/src/manager/version-manager.ts index ec5d7e01c..6bfce5cda 100644 --- a/packages/rule-engine/src/manager/version-manager.ts +++ b/packages/rule-engine/src/manager/version-manager.ts @@ -1,5 +1,4 @@ import type { Tx } from "@ctrlplane/db"; -import _ from "lodash"; import sizeOf from "object-sizeof"; import { @@ -8,6 +7,7 @@ import { eq, gte, lte, + selector, takeFirst, takeFirstOrNull, } from "@ctrlplane/db"; @@ -120,16 +120,18 @@ export class VersionReleaseManager implements ReleaseManager { ]); const policy = await this.getPolicy(); + const sql = selector() + .query() + .deploymentVersions() + .where(policy?.deploymentVersionSelector?.deploymentVersionSelector) + .sql(); const versions = await this.db.query.deploymentVersion.findMany({ where: and( eq( schema.deploymentVersion.deploymentId, this.releaseTarget.deploymentId, ), - schema.deploymentVersionMatchesCondition( - this.db, - policy?.deploymentVersionSelector?.deploymentVersionSelector, - ), + sql, latestDeployedVersion != null ? gte( schema.deploymentVersion.createdAt, diff --git a/packages/rule-engine/src/utils/merge-policies.ts b/packages/rule-engine/src/utils/merge-policies.ts index b4ca5cf0d..0694bf416 100644 --- a/packages/rule-engine/src/utils/merge-policies.ts +++ b/packages/rule-engine/src/utils/merge-policies.ts @@ -42,6 +42,8 @@ export const mergePolicies = (policies: Policy[]): Policy | null => { return { ...merged, - deploymentVersionSelector: mergedVersionSelector, + deploymentVersionSelector: { + deploymentVersionSelector: mergedVersionSelector, + }, }; };