Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions packages/api/src/router/release-deploy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ export const releaseDeployRouter = createTRPCRouter({
.then(input.isForcedRelease ? () => {} : createJobApprovals)
.insert();

console.log("releaseJobTriggers", releaseJobTriggers);

await dispatchReleaseJobTriggers(ctx.db)
.releaseTriggers(releaseJobTriggers)
.filter(
Expand Down
124 changes: 66 additions & 58 deletions packages/job-dispatch/src/policies/release-sequencing.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import type { Tx } from "@ctrlplane/db";
import { isAfter } from "date-fns";
import _ from "lodash";
import { isPresent } from "ts-is-present";

import {
and,
desc,
eq,
inArray,
isNull,
notExists,
notInArray,
sql,
takeFirstOrNull,
} from "@ctrlplane/db";
import * as schema from "@ctrlplane/db/schema";
import { activeStatus, JobStatus } from "@ctrlplane/validators/jobs";
Expand Down Expand Up @@ -79,20 +80,41 @@ export const isPassingNoActiveJobsPolicy: ReleaseIdPolicyChecker = async (
.value();
};

const latestActiveReleaseSubQuery = (db: Tx) =>
db
.select({
id: schema.release.id,
deploymentId: schema.release.deploymentId,
version: schema.release.version,
createdAt: schema.release.createdAt,
name: schema.release.name,
config: schema.release.config,
environmentId: schema.releaseJobTrigger.environmentId,
rank: sql<number>`ROW_NUMBER() OVER (PARTITION BY ${schema.release.deploymentId}, ${schema.releaseJobTrigger.environmentId} ORDER BY ${schema.release.createdAt} DESC)`.as(
"rank",
const isReleaseLatestActiveForEnvironment = async (
db: Tx,
release: schema.Release,
environmentId: string,
) => {
const environment = await db
.select()
.from(schema.environment)
.innerJoin(
schema.environmentPolicy,
eq(schema.environment.policyId, schema.environmentPolicy.id),
)
.leftJoin(
schema.environmentPolicyReleaseChannel,
eq(
schema.environmentPolicyReleaseChannel.policyId,
schema.environmentPolicy.id,
),
})
)
.leftJoin(
schema.releaseChannel,
and(
eq(
schema.environmentPolicyReleaseChannel.channelId,
schema.releaseChannel.id,
),
eq(schema.releaseChannel.deploymentId, release.deploymentId),
),
)
.where(eq(schema.environment.id, environmentId))
.then(takeFirstOrNull);
if (!environment) return false;

const latestActiveRelease = await db
.select()
.from(schema.release)
.innerJoin(
schema.releaseJobTrigger,
Expand All @@ -111,9 +133,25 @@ const latestActiveReleaseSubQuery = (db: Tx) =>
JobStatus.Cancelled,
]),
isNull(schema.resource.deletedAt),
eq(schema.release.deploymentId, release.deploymentId),
eq(schema.releaseJobTrigger.environmentId, environmentId),
schema.releaseMatchesCondition(
db,
environment.release_channel?.releaseFilter,
),
),
)
.as("active_releases");
.orderBy(desc(schema.release.createdAt))
.limit(1)
.then(takeFirstOrNull);

if (!latestActiveRelease) return true;

return (
release.id === latestActiveRelease.release.id ||
isAfter(release.createdAt, latestActiveRelease.release.createdAt)
);
};

/**
* This policy checks if the release is newer than the last release that was deployed for a deployment/environment.
Expand All @@ -125,58 +163,28 @@ export const isPassingNewerThanLastActiveReleasePolicy: ReleaseIdPolicyChecker =
async (db, releaseJobTriggers) => {
if (releaseJobTriggers.length === 0) return [];

const activeRelease = latestActiveReleaseSubQuery(db);

const releaseIds = releaseJobTriggers.map((rjt) => rjt.releaseId);
const releases = await db
.select()
.from(schema.release)
.where(inArray(schema.release.id, releaseIds));

const deploymentIds = _.uniq(releases.map((r) => r.deploymentId));
const deployments = await db
.select()
.from(schema.deployment)
.leftJoin(
activeRelease,
and(
eq(activeRelease.deploymentId, schema.deployment.id),
eq(activeRelease.rank, 1),
),
)
.where(inArray(schema.deployment.id, deploymentIds))
.then((rows) =>
_.chain(rows)
.groupBy((r) => r.deployment.id)
.map((r) => ({
...r[0]!.deployment,
activeReleases: r.map((r) => r.active_releases).filter(isPresent),
}))
.value(),
);

return _.chain(releaseJobTriggers)
.groupBy((rjt) => {
const release = releases.find((r) => r.id === rjt.releaseId);
if (!release) return null;
return [rjt.environmentId, rjt.releaseId];
})
.filter(isPresent)
.map((t) => {
const release = releases.find((r) => r.id === t[0]!.releaseId);
if (!release) return null;
const deployment = deployments.find(
(d) => d.id === release.deploymentId,
.groupBy((rjt) => [rjt.releaseId, rjt.environmentId])
.map(async (groupedTriggers) => {
const release = releases.find(
(r) => r.id === groupedTriggers[0]!.releaseId,
);
if (!deployment) return null;
const activeRelease = deployment.activeReleases.find(
(r) => r.environmentId === t[0]!.environmentId,
if (!release) return [];
const { environmentId } = groupedTriggers[0]!;
const isLatestActive = await isReleaseLatestActiveForEnvironment(
db,
release,
environmentId,
);
if (!activeRelease) return t;
if (release.id === activeRelease.id) return t;
return isAfter(release.createdAt, activeRelease.createdAt) ? t : null;
return isLatestActive ? groupedTriggers : [];
})
.filter(isPresent)
.thru((promises) => Promise.all(promises))
.value()
.flat();
.then((triggers) => triggers.flat());
};
7 changes: 1 addition & 6 deletions packages/job-dispatch/src/policy-checker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,7 @@ export const isPassingAllPolicies = async (
];
let passingJobs = releaseJobTriggers;

for (const check of checks) {
passingJobs = await check(db, passingJobs);
console.log(
`After ${check.name}: ${passingJobs.filter((rjt) => rjt.releaseId === "dcfb27db-4792-47dc-b9cc-e34b02482973").length} passing jobs`,
);
}
for (const check of checks) passingJobs = await check(db, passingJobs);

return passingJobs;
};
Expand Down
Loading