diff --git a/apps/event-worker/src/target-scan/index.ts b/apps/event-worker/src/target-scan/index.ts index a16a0d83..884a5822 100644 --- a/apps/event-worker/src/target-scan/index.ts +++ b/apps/event-worker/src/target-scan/index.ts @@ -10,7 +10,7 @@ import { resourceProviderGoogle, workspace, } from "@ctrlplane/db/schema"; -import { upsertTargets } from "@ctrlplane/job-dispatch"; +import { upsertResources } from "@ctrlplane/job-dispatch"; import { logger } from "@ctrlplane/logger"; import { Channel } from "@ctrlplane/validators/events"; @@ -72,7 +72,7 @@ export const createTargetScanWorker = () => `Upserting ${targets.length} targets for provider ${tp.resource_provider.id}`, ); if (targets.length > 0) { - await upsertTargets(db, targets); + await upsertResources(db, targets); } else { logger.info( `No targets found for provider ${tp.resource_provider.id}, skipping upsert.`, diff --git a/apps/webservice/src/app/api/v1/target-providers/[providerId]/set/route.ts b/apps/webservice/src/app/api/v1/target-providers/[providerId]/set/route.ts index 25d2ebc5..93b35c5a 100644 --- a/apps/webservice/src/app/api/v1/target-providers/[providerId]/set/route.ts +++ b/apps/webservice/src/app/api/v1/target-providers/[providerId]/set/route.ts @@ -9,7 +9,7 @@ import { resourceProvider, workspace, } from "@ctrlplane/db/schema"; -import { upsertTargets } from "@ctrlplane/job-dispatch"; +import { upsertResources } from "@ctrlplane/job-dispatch"; import { Permission } from "@ctrlplane/validators/auth"; import { authn, authz } from "~/app/api/v1/auth"; @@ -77,7 +77,7 @@ export const PATCH = request() workspaceId: provider.workspaceId, })); - const targets = await upsertTargets( + const targets = await upsertResources( db, targetsToInsert.map((t) => ({ ...t, diff --git a/apps/webservice/src/app/api/v1/targets/[targetId]/route.ts b/apps/webservice/src/app/api/v1/targets/[targetId]/route.ts index 0be90b57..4446c8a8 100644 --- a/apps/webservice/src/app/api/v1/targets/[targetId]/route.ts +++ b/apps/webservice/src/app/api/v1/targets/[targetId]/route.ts @@ -4,7 +4,7 @@ import { z } from "zod"; import { eq } from "@ctrlplane/db"; import * as schema from "@ctrlplane/db/schema"; -import { upsertTargets } from "@ctrlplane/job-dispatch"; +import { upsertResources } from "@ctrlplane/job-dispatch"; import { Permission } from "@ctrlplane/validators/auth"; import { authn, authz } from "../../auth"; @@ -80,7 +80,7 @@ export const PATCH = request() if (target == null) return NextResponse.json({ error: "Target not found" }, { status: 404 }); - const t = await upsertTargets(db, [_.merge(target, body)]); + const t = await upsertResources(db, [_.merge(target, body)]); return NextResponse.json(t[0]); }); diff --git a/apps/webservice/src/app/api/v1/targets/route.ts b/apps/webservice/src/app/api/v1/targets/route.ts index 574dad28..daee446b 100644 --- a/apps/webservice/src/app/api/v1/targets/route.ts +++ b/apps/webservice/src/app/api/v1/targets/route.ts @@ -4,7 +4,7 @@ import { z } from "zod"; import { db } from "@ctrlplane/db/client"; import { createResource } from "@ctrlplane/db/schema"; -import { upsertTargets } from "@ctrlplane/job-dispatch"; +import { upsertResources } from "@ctrlplane/job-dispatch"; import { Permission } from "@ctrlplane/validators/auth"; import { authn, authz } from "../auth"; @@ -55,7 +55,7 @@ export const POST = request() { status: 400 }, ); - const targets = await upsertTargets( + const targets = await upsertResources( db, ctx.body.targets.map((t) => ({ ...t, diff --git a/packages/job-dispatch/src/index.ts b/packages/job-dispatch/src/index.ts index b43b8a79..8e478e63 100644 --- a/packages/job-dispatch/src/index.ts +++ b/packages/job-dispatch/src/index.ts @@ -6,7 +6,7 @@ export * from "./policy-checker.js"; export * from "./policy-create.js"; export * from "./release-sequencing.js"; export * from "./gradual-rollout.js"; -export * from "./new-target.js"; +export * from "./new-resource.js"; export * from "./target.js"; export * from "./lock-checker.js"; export * from "./queue.js"; diff --git a/packages/job-dispatch/src/new-target.ts b/packages/job-dispatch/src/new-resource.ts similarity index 100% rename from packages/job-dispatch/src/new-target.ts rename to packages/job-dispatch/src/new-resource.ts diff --git a/packages/job-dispatch/src/target.ts b/packages/job-dispatch/src/target.ts index c703df6a..af40cf32 100644 --- a/packages/job-dispatch/src/target.ts +++ b/packages/job-dispatch/src/target.ts @@ -22,18 +22,18 @@ import { import { logger } from "@ctrlplane/logger"; import { variablesAES256 } from "@ctrlplane/secrets"; -import { dispatchJobsForNewResources } from "./new-target.js"; +import { dispatchJobsForNewResources } from "./new-resource.js"; -const log = logger.child({ label: "upsert-targets" }); +const log = logger.child({ label: "upsert-resources" }); -const getExistingTargetsForProvider = (db: Tx, providerId: string) => +const getExistingResourcesForProvider = (db: Tx, providerId: string) => db.select().from(resource).where(eq(resource.providerId, providerId)); -const dispatchNewTargets = async (db: Tx, newTargets: Resource[]) => { - const [firstTarget] = newTargets; - if (firstTarget == null) return; +const dispatchNewResources = async (db: Tx, newResources: Resource[]) => { + const [firstResource] = newResources; + if (firstResource == null) return; - const workspaceId = firstTarget.workspaceId; + const workspaceId = firstResource.workspaceId; const workspaceEnvs = await db .select({ id: environment.id, resourceFilter: environment.resourceFilter }) @@ -46,13 +46,13 @@ const dispatchNewTargets = async (db: Tx, newTargets: Resource[]) => { ), ); - const targetIds = newTargets.map((t) => t.id); + const resourceIds = newResources.map((r) => r.id); for (const env of workspaceEnvs) { db.select() .from(resource) .where( and( - inArray(resource.id, targetIds), + inArray(resource.id, resourceIds), resourceMatchesMetadata(db, env.resourceFilter), ), ) @@ -67,30 +67,30 @@ const dispatchNewTargets = async (db: Tx, newTargets: Resource[]) => { } }; -const upsertTargetVariables = async ( +const upsertResourceVariables = async ( tx: Tx, - targets: Array< + resources: Array< Resource & { variables?: Array<{ key: string; value: any; sensitive: boolean }>; } >, ) => { - const existingTargetVariables = await tx + const existingResourceVariables = await tx .select() .from(resourceVariable) .where( inArray( resourceVariable.resourceId, - targets.map((t) => t.id), + resources.map((r) => r.id), ), ) .catch((err) => { - log.error("Error fetching existing target metadata", { error: err }); + log.error("Error fetching existing resource variables", { error: err }); throw err; }); - const targetVariablesValues = targets.flatMap((target) => { - const { id, variables = [] } = target; + const resourceVariablesValues = resources.flatMap((resource) => { + const { id, variables = [] } = resource; return variables.map(({ key, value, sensitive }) => ({ resourceId: id, key, @@ -101,10 +101,10 @@ const upsertTargetVariables = async ( })); }); - if (targetVariablesValues.length > 0) + if (resourceVariablesValues.length > 0) await tx .insert(resourceVariable) - .values(targetVariablesValues) + .values(resourceVariablesValues) .onConflictDoUpdate({ target: [resourceVariable.key, resourceVariable.resourceId], set: buildConflictUpdateColumns(resourceVariable, [ @@ -113,13 +113,13 @@ const upsertTargetVariables = async ( ]), }) .catch((err) => { - log.error("Error inserting target variables", { error: err }); + log.error("Error inserting resource variables", { error: err }); throw err; }); - const variablesToDelete = existingTargetVariables.filter( + const variablesToDelete = existingResourceVariables.filter( (variable) => - !targetVariablesValues.some( + !resourceVariablesValues.some( (newVariable) => newVariable.resourceId === variable.resourceId && newVariable.key === variable.key, @@ -136,31 +136,31 @@ const upsertTargetVariables = async ( ), ) .catch((err) => { - log.error("Error deleting target variables", { error: err }); + log.error("Error deleting resource variables", { error: err }); throw err; }); }; -const upsertTargetMetadata = async ( +const upsertResourceMetadata = async ( tx: Tx, - targets: Array }>, + resources: Array }>, ) => { - const existingTargetMetadata = await tx + const existingResourceMetadata = await tx .select() .from(resourceMetadata) .where( inArray( resourceMetadata.resourceId, - targets.map((t) => t.id), + resources.map((r) => r.id), ), ) .catch((err) => { - log.error("Error fetching existing target metadata", { error: err }); + log.error("Error fetching existing resource metadata", { error: err }); throw err; }); - const targetMetadataValues = targets.flatMap((target) => { - const { id, metadata = {} } = target; + const resourceMetadataValues = resources.flatMap((resource) => { + const { id, metadata = {} } = resource; return Object.entries(metadata).map(([key, value]) => ({ resourceId: id, @@ -169,22 +169,22 @@ const upsertTargetMetadata = async ( })); }); - if (targetMetadataValues.length > 0) + if (resourceMetadataValues.length > 0) await tx .insert(resourceMetadata) - .values(targetMetadataValues) + .values(resourceMetadataValues) .onConflictDoUpdate({ target: [resourceMetadata.resourceId, resourceMetadata.key], set: buildConflictUpdateColumns(resourceMetadata, ["value"]), }) .catch((err) => { - log.error("Error inserting target metadata", { error: err }); + log.error("Error inserting resource metadata", { error: err }); throw err; }); - const metadataToDelete = existingTargetMetadata.filter( + const metadataToDelete = existingResourceMetadata.filter( (metadata) => - !targetMetadataValues.some( + !resourceMetadataValues.some( (newMetadata) => newMetadata.resourceId === metadata.resourceId && newMetadata.key === metadata.key, @@ -201,14 +201,14 @@ const upsertTargetMetadata = async ( ), ) .catch((err) => { - log.error("Error deleting target metadata", { error: err }); + log.error("Error deleting resource metadata", { error: err }); throw err; }); }; -export const upsertTargets = async ( +export const upsertResources = async ( tx: Tx, - targetsToInsert: Array< + resourcesToInsert: Array< InsertResource & { metadata?: Record; variables?: Array<{ key: string; value: any; sensitive: boolean }>; @@ -216,18 +216,18 @@ export const upsertTargets = async ( >, ) => { try { - // Get existing targets from the database, grouped by providerId. - // - For targets without a providerId, look them up by workspaceId and + // Get existing resources from the database, grouped by providerId. + // - For resources without a providerId, look them up by workspaceId and // identifier. - // - For targets with a providerId, get all targets for that provider. - log.info("Upserting targets", { - targetsToInsertCount: targetsToInsert.length, + // - For resources with a providerId, get all resources for that provider. + log.info("Upserting resources", { + resourcesToInsertCount: resourcesToInsert.length, }); - const targetsBeforeInsertPromises = _.chain(targetsToInsert) - .groupBy((t) => t.providerId) - .filter((t) => t[0]?.providerId != null) - .map(async (targets) => { - const providerId = targets[0]?.providerId; + const resourcesBeforeInsertPromises = _.chain(resourcesToInsert) + .groupBy((r) => r.providerId) + .filter((r) => r[0]?.providerId != null) + .map(async (resources) => { + const providerId = resources[0]?.providerId; return providerId == null ? db @@ -235,25 +235,25 @@ export const upsertTargets = async ( .from(resource) .where( or( - ...targets.map((t) => + ...resources.map((r) => and( - eq(resource.workspaceId, t.workspaceId), - eq(resource.identifier, t.identifier), + eq(resource.workspaceId, r.workspaceId), + eq(resource.identifier, r.identifier), ), ), ), ) - : getExistingTargetsForProvider(tx, providerId); + : getExistingResourcesForProvider(tx, providerId); }) .value(); - const targetsBeforeInsert = await Promise.all( - targetsBeforeInsertPromises, + const resourcesBeforeInsert = await Promise.all( + resourcesBeforeInsertPromises, ).then((r) => r.flat()); - const targets = await tx + const resources = await tx .insert(resource) - .values(targetsToInsert) + .values(resourcesToInsert) .onConflictDoUpdate({ target: [resource.identifier, resource.workspaceId], set: { @@ -267,77 +267,80 @@ export const upsertTargets = async ( }, }) .returning() - .then((targets) => - targets.map((t) => ({ - ...t, - ...targetsToInsert.find( - (ti) => - ti.identifier === t.identifier && - ti.workspaceId === t.workspaceId, + .then((resources) => + resources.map((r) => ({ + ...r, + ...resourcesToInsert.find( + (ri) => + ri.identifier === r.identifier && + ri.workspaceId === r.workspaceId, ), })), ) .catch((err) => { - log.error("Error inserting targets", { error: err }); + log.error("Error inserting resources", { error: err }); throw err; }); await Promise.all([ - upsertTargetMetadata(tx, targets), - upsertTargetVariables(tx, targets), + upsertResourceMetadata(tx, resources), + upsertResourceVariables(tx, resources), ]); - const newTargets = targets.filter( - (t) => !targetsBeforeInsert.some((et) => et.identifier === t.identifier), + const newResources = resources.filter( + (r) => + !resourcesBeforeInsert.some((er) => er.identifier === r.identifier), ); - if (newTargets.length > 0) - await dispatchNewTargets(db, newTargets).catch((err) => { - log.error("Error dispatching new targets", { error: err }); + if (newResources.length > 0) + await dispatchNewResources(db, newResources).catch((err) => { + log.error("Error dispatching new resources", { error: err }); throw err; }); - const targetsToDelete = targetsBeforeInsert.filter( - (t) => - !targets.some((newTarget) => newTarget.identifier === t.identifier), + const resourcesToDelete = resourcesBeforeInsert.filter( + (r) => + !resources.some( + (newResource) => newResource.identifier === r.identifier, + ), ); - const newTargetCount = newTargets.length; - const targetsToInsertCount = targetsToInsert.length; - const targetsToDeleteCount = targetsToDelete.length; - const targetsBeforeInsertCount = targetsBeforeInsert.length; + const newResourceCount = newResources.length; + const resourcesToInsertCount = resourcesToInsert.length; + const resourcesToDeleteCount = resourcesToDelete.length; + const resourcesBeforeInsertCount = resourcesBeforeInsert.length; log.info( - `Found ${newTargetCount} new targets out of ${targetsToInsertCount} total targets`, + `Found ${newResourceCount} new resources out of ${resourcesToInsertCount} total resources`, { - newTargetCount, - targetsToInsertCount, - targetsToDeleteCount, - targetsBeforeInsertCount, + newResourceCount, + resourcesToInsertCount, + resourcesToDeleteCount, + resourcesBeforeInsertCount, }, ); - if (targetsToDelete.length > 0) { + if (resourcesToDelete.length > 0) { await tx .delete(resource) .where( inArray( resource.id, - targetsToDelete.map((t) => t.id), + resourcesToDelete.map((r) => r.id), ), ) .catch((err) => { - log.error("Error deleting targets", { error: err }); + log.error("Error deleting resources", { error: err }); throw err; }); - log.info(`Deleted ${targetsToDelete.length} targets`, { - targetsToDelete, + log.info(`Deleted ${resourcesToDelete.length} resources`, { + resourcesToDelete, }); } - return targets; + return resources; } catch (err) { - log.error("Error upserting targets", { error: err }); + log.error("Error upserting resources", { error: err }); throw err; } };