Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 91 additions & 66 deletions apps/event-worker/src/workers/evaluate-release-target.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import type { Tx } from "@ctrlplane/db";
import type { VersionEvaluateOptions } from "@ctrlplane/rule-engine";
import _ from "lodash";

import { and, desc, eq, sql, takeFirst } from "@ctrlplane/db";
import { and, desc, eq, sql, takeFirst, takeFirstOrNull } from "@ctrlplane/db";
import { db } from "@ctrlplane/db/client";
import { createReleaseJob } from "@ctrlplane/db/queries";
import * as schema from "@ctrlplane/db/schema";
Expand All @@ -22,6 +22,35 @@ const tracer = trace.getTracer("evaluate-release-target");
const withSpan = makeWithSpan(tracer);
const log = logger.child({ module: "evaluate-release-target" });

const getReleaseTarget = async (
tx: Tx,
identifier: {
resourceId: string;
environmentId: string;
deploymentId: string;
},
) => {
const releaseTarget = await tx.query.releaseTarget.findFirst({
where: and(
eq(schema.releaseTarget.resourceId, identifier.resourceId),
eq(schema.releaseTarget.environmentId, identifier.environmentId),
eq(schema.releaseTarget.deploymentId, identifier.deploymentId),
),
with: {
resource: true,
environment: true,
deployment: true,
},
});

if (releaseTarget == null)
throw new Error(
`Release target not found: resourceId=${identifier.resourceId} environmentId=${identifier.environmentId} deploymentId=${identifier.deploymentId}`,
);

return releaseTarget;
};

/**
* Handles version release evaluation and creation for a release target
* @param releaseTarget - Release target to evaluate
Expand All @@ -45,9 +74,7 @@ const handleVersionRelease = withSpan(
...releaseTarget,
workspaceId,
});

const { chosenCandidate } = await vrm.evaluate(versionEvaluateOptions);

if (!chosenCandidate) return null;

const { release: versionRelease } = await vrm.upsertRelease(
Expand Down Expand Up @@ -96,13 +123,58 @@ const acquireReleaseTargetLock = async (tx: Tx, releaseTargetId: string) =>
);

/**
* Gets the latest version release for a specific release target
* Gets the current release for a specific release target
*/
const getLatestVersionRelease = (tx: Tx, releaseTargetId: string) =>
tx.query.versionRelease.findFirst({
where: eq(schema.versionRelease.releaseTargetId, releaseTargetId),
orderBy: desc(schema.versionRelease.createdAt),
});
const getCurrentRelease = async (tx: Tx, releaseTargetId: string) => {
const currentRelease = await tx
.select()
.from(schema.release)
.innerJoin(
schema.versionRelease,
eq(schema.release.versionReleaseId, schema.versionRelease.id),
)
.innerJoin(
schema.variableSetRelease,
eq(schema.release.variableReleaseId, schema.variableSetRelease.id),
)
.where(eq(schema.versionRelease.releaseTargetId, releaseTargetId))
.orderBy(desc(schema.release.createdAt))
.limit(1)
.then(takeFirstOrNull);

if (currentRelease == null) return null;

return {
...currentRelease.release,
currentVersionRelease: currentRelease.version_release,
currentVariableRelease: currentRelease.variable_set_release,
};
};

const getHasAnythingChanged = (
currentRelease: {
currentVersionRelease: { id: string };
currentVariableRelease: { id: string };
},
newRelease: { versionReleaseId: string; variableReleaseId: string },
) => {
const isVersionUnchanged =
currentRelease.currentVersionRelease.id === newRelease.versionReleaseId;
const areVariablesUnchanged =
currentRelease.currentVariableRelease.id === newRelease.variableReleaseId;
return !isVersionUnchanged || !areVariablesUnchanged;
};

const insertNewRelease = async (
tx: Tx,
versionReleaseId: string,
variableReleaseId: string,
) =>
tx
.insert(schema.release)
.values({ versionReleaseId, variableReleaseId })
.returning()
.then(takeFirst);

/**
* Worker that evaluates a release target and creates necessary releases and jobs
Expand All @@ -122,38 +194,9 @@ export const evaluateReleaseTargetWorker = createWorker(

try {
const release = await db.transaction(async (tx) => {
const releaseTarget = await tx.query.releaseTarget.findFirst({
where: and(
eq(schema.releaseTarget.resourceId, data.resourceId),
eq(schema.releaseTarget.environmentId, data.environmentId),
eq(schema.releaseTarget.deploymentId, data.deploymentId),
),
with: {
resource: true,
environment: true,
deployment: true,
},
});

if (releaseTarget == null)
throw new Error("Failed to get release target");

const releaseTarget = await getReleaseTarget(tx, data);
await acquireReleaseTargetLock(tx, releaseTarget.id);

const latestVersionRelease = await getLatestVersionRelease(
tx,
releaseTarget.id,
);

const existingVariableRelease =
await tx.query.variableSetRelease.findFirst({
where: eq(
schema.variableSetRelease.releaseTargetId,
releaseTarget.id,
),
orderBy: desc(schema.variableSetRelease.createdAt),
});

const { versionEvaluateOptions } = data;
const [versionRelease, variableRelease] = await Promise.all([
handleVersionRelease(tx, releaseTarget, versionEvaluateOptions),
Expand All @@ -162,36 +205,18 @@ export const evaluateReleaseTargetWorker = createWorker(

if (versionRelease == null) return;

// Check if version and variables are unchanged from previous release
const isVersionUnchanged =
latestVersionRelease?.id === versionRelease.id;
const areVariablesUnchanged =
existingVariableRelease?.id === variableRelease.id;

const hasAnythingChanged =
!isVersionUnchanged || !areVariablesUnchanged;

// If nothing changed, return existing release
if (!hasAnythingChanged) {
return tx.query.release.findFirst({
where: and(
eq(schema.release.versionReleaseId, versionRelease.id),
eq(schema.release.variableReleaseId, variableRelease.id),
),
});
}

// Otherwise create new release with updated version/variables
const newRelease = {
const currentRelease = await getCurrentRelease(tx, releaseTarget.id);
if (currentRelease == null)
return insertNewRelease(tx, versionRelease.id, variableRelease.id);

const hasAnythingChanged = getHasAnythingChanged(currentRelease, {
versionReleaseId: versionRelease.id,
variableReleaseId: variableRelease.id,
};
});

if (!hasAnythingChanged) return currentRelease;

return tx
.insert(schema.release)
.values(newRelease)
.returning()
.then(takeFirst);
return insertNewRelease(tx, versionRelease.id, variableRelease.id);
});

if (release == null) return;
Expand Down
Loading