Skip to content

Commit

Permalink
fix: Latest active release check takes into account release channels (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
adityachoudhari26 authored Feb 28, 2025
1 parent c103110 commit 14e43c4
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 66 deletions.
2 changes: 0 additions & 2 deletions packages/api/src/router/release-deploy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ export const releaseDeployRouter = createTRPCRouter({
.then(input.isForcedRelease ? () => {} : createJobApprovals)
.insert();

console.log("releaseJobTriggers", releaseJobTriggers);

await dispatchReleaseJobTriggers(ctx.db)
.releaseTriggers(releaseJobTriggers)
.filter(
Expand Down
124 changes: 66 additions & 58 deletions packages/job-dispatch/src/policies/release-sequencing.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import type { Tx } from "@ctrlplane/db";
import { isAfter } from "date-fns";
import _ from "lodash";
import { isPresent } from "ts-is-present";

import {
and,
desc,
eq,
inArray,
isNull,
notExists,
notInArray,
sql,
takeFirstOrNull,
} from "@ctrlplane/db";
import * as schema from "@ctrlplane/db/schema";
import { activeStatus, JobStatus } from "@ctrlplane/validators/jobs";
Expand Down Expand Up @@ -79,20 +80,41 @@ export const isPassingNoActiveJobsPolicy: ReleaseIdPolicyChecker = async (
.value();
};

const latestActiveReleaseSubQuery = (db: Tx) =>
db
.select({
id: schema.release.id,
deploymentId: schema.release.deploymentId,
version: schema.release.version,
createdAt: schema.release.createdAt,
name: schema.release.name,
config: schema.release.config,
environmentId: schema.releaseJobTrigger.environmentId,
rank: sql<number>`ROW_NUMBER() OVER (PARTITION BY ${schema.release.deploymentId}, ${schema.releaseJobTrigger.environmentId} ORDER BY ${schema.release.createdAt} DESC)`.as(
"rank",
const isReleaseLatestActiveForEnvironment = async (
db: Tx,
release: schema.Release,
environmentId: string,
) => {
const environment = await db
.select()
.from(schema.environment)
.innerJoin(
schema.environmentPolicy,
eq(schema.environment.policyId, schema.environmentPolicy.id),
)
.leftJoin(
schema.environmentPolicyReleaseChannel,
eq(
schema.environmentPolicyReleaseChannel.policyId,
schema.environmentPolicy.id,
),
})
)
.leftJoin(
schema.releaseChannel,
and(
eq(
schema.environmentPolicyReleaseChannel.channelId,
schema.releaseChannel.id,
),
eq(schema.releaseChannel.deploymentId, release.deploymentId),
),
)
.where(eq(schema.environment.id, environmentId))
.then(takeFirstOrNull);
if (!environment) return false;

const latestActiveRelease = await db
.select()
.from(schema.release)
.innerJoin(
schema.releaseJobTrigger,
Expand All @@ -111,9 +133,25 @@ const latestActiveReleaseSubQuery = (db: Tx) =>
JobStatus.Cancelled,
]),
isNull(schema.resource.deletedAt),
eq(schema.release.deploymentId, release.deploymentId),
eq(schema.releaseJobTrigger.environmentId, environmentId),
schema.releaseMatchesCondition(
db,
environment.release_channel?.releaseFilter,
),
),
)
.as("active_releases");
.orderBy(desc(schema.release.createdAt))
.limit(1)
.then(takeFirstOrNull);

if (!latestActiveRelease) return true;

return (
release.id === latestActiveRelease.release.id ||
isAfter(release.createdAt, latestActiveRelease.release.createdAt)
);
};

/**
* This policy checks if the release is newer than the last release that was deployed for a deployment/environment.
Expand All @@ -125,58 +163,28 @@ export const isPassingNewerThanLastActiveReleasePolicy: ReleaseIdPolicyChecker =
async (db, releaseJobTriggers) => {
if (releaseJobTriggers.length === 0) return [];

const activeRelease = latestActiveReleaseSubQuery(db);

const releaseIds = releaseJobTriggers.map((rjt) => rjt.releaseId);
const releases = await db
.select()
.from(schema.release)
.where(inArray(schema.release.id, releaseIds));

const deploymentIds = _.uniq(releases.map((r) => r.deploymentId));
const deployments = await db
.select()
.from(schema.deployment)
.leftJoin(
activeRelease,
and(
eq(activeRelease.deploymentId, schema.deployment.id),
eq(activeRelease.rank, 1),
),
)
.where(inArray(schema.deployment.id, deploymentIds))
.then((rows) =>
_.chain(rows)
.groupBy((r) => r.deployment.id)
.map((r) => ({
...r[0]!.deployment,
activeReleases: r.map((r) => r.active_releases).filter(isPresent),
}))
.value(),
);

return _.chain(releaseJobTriggers)
.groupBy((rjt) => {
const release = releases.find((r) => r.id === rjt.releaseId);
if (!release) return null;
return [rjt.environmentId, rjt.releaseId];
})
.filter(isPresent)
.map((t) => {
const release = releases.find((r) => r.id === t[0]!.releaseId);
if (!release) return null;
const deployment = deployments.find(
(d) => d.id === release.deploymentId,
.groupBy((rjt) => [rjt.releaseId, rjt.environmentId])
.map(async (groupedTriggers) => {
const release = releases.find(
(r) => r.id === groupedTriggers[0]!.releaseId,
);
if (!deployment) return null;
const activeRelease = deployment.activeReleases.find(
(r) => r.environmentId === t[0]!.environmentId,
if (!release) return [];
const { environmentId } = groupedTriggers[0]!;
const isLatestActive = await isReleaseLatestActiveForEnvironment(
db,
release,
environmentId,
);
if (!activeRelease) return t;
if (release.id === activeRelease.id) return t;
return isAfter(release.createdAt, activeRelease.createdAt) ? t : null;
return isLatestActive ? groupedTriggers : [];
})
.filter(isPresent)
.thru((promises) => Promise.all(promises))
.value()
.flat();
.then((triggers) => triggers.flat());
};
7 changes: 1 addition & 6 deletions packages/job-dispatch/src/policy-checker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,7 @@ export const isPassingAllPolicies = async (
];
let passingJobs = releaseJobTriggers;

for (const check of checks) {
passingJobs = await check(db, passingJobs);
console.log(
`After ${check.name}: ${passingJobs.filter((rjt) => rjt.releaseId === "dcfb27db-4792-47dc-b9cc-e34b02482973").length} passing jobs`,
);
}
for (const check of checks) passingJobs = await check(db, passingJobs);

return passingJobs;
};
Expand Down

0 comments on commit 14e43c4

Please sign in to comment.