diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index 860c08d71..e050cc6d6 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -47,7 +47,12 @@ func Start() { } } - for _, j := range []*job.Job{topologyJobs.CleanupDeletedTopologyComponents, topologyJobs.SyncTopology, canaryJobs.SyncCanaryJobs, canaryJobs.CleanupDeletedCanaryChecks, dutyQuery.SyncComponentCacheJob} { + miscJobs := []*job.Job{ + topologyJobs.CleanupDeletedTopologyComponents, topologyJobs.SyncTopology, + topologyJobs.TopologyCRDReconcile, canaryJobs.SyncCanaryJobs, + canaryJobs.CleanupDeletedCanaryChecks, dutyQuery.SyncComponentCacheJob, + } + for _, j := range miscJobs { job := j job.Context = context.DefaultContext if err := job.AddToScheduler(FuncScheduler); err != nil { diff --git a/pkg/jobs/topology/topology_jobs.go b/pkg/jobs/topology/topology_jobs.go index 094ab4a0c..cff43db35 100644 --- a/pkg/jobs/topology/topology_jobs.go +++ b/pkg/jobs/topology/topology_jobs.go @@ -3,6 +3,7 @@ package topology import ( "fmt" "reflect" + "slices" "sync" canaryCtx "github.com/flanksource/canary-checker/api/context" @@ -17,6 +18,9 @@ import ( "github.com/flanksource/duty/job" "github.com/flanksource/duty/models" "github.com/robfig/cron/v3" + "github.com/samber/lo" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" ) var TopologyScheduler = cron.New() @@ -157,3 +161,51 @@ var CleanupDeletedTopologyComponents = &job.Job{ return nil }, } + +var TopologyCRDReconcile = &job.Job{ + Name: "TopologyCRDReconcile", + Schedule: "@every 8h", + Singleton: true, + JobHistory: true, + Retention: job.RetentionBalanced, + Fn: func(ctx job.JobRuntime) error { + var topologies []models.Topology + + if err := ctx.DB(). + Select("id", "name", "namespace"). + Where("source IN (?, ?)", models.SourceCRD, models.SourceTopology). + Where(duty.LocalFilter). + Find(&topologies).Error; err != nil { + return err + } + + client, err := ctx.KubernetesDynamicClient().GetClientByKind("Topology") + if err != nil { + return fmt.Errorf("error creating dynamic client for Topology: %w", err) + } + + objs, err := client.List(ctx, metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("error listing Topology kind: %w", err) + } + k8sIDs := lo.Map(objs.Items, func(obj unstructured.Unstructured, _ int) string { return string(obj.GetUID()) }) + if len(k8sIDs) == 0 { + // a misconfiguration or intermittent error could cause unnecessary deletion of all topologies + ctx.Warnf("Skipping topology CRD cleanup due to zero topologies returned") + return nil + } + for _, t := range topologies { + if !slices.Contains(k8sIDs, t.ID.String()) { + // id mismatch or object not found in k8s, delete current topology + if err := db.DeleteTopology(ctx.DB(), t.ID.String()); err != nil { + ctx.History.AddErrorf("error deleting topology[%s]: %v", t.ID, err) + continue + } + DeleteTopologyJob(t.ID.String()) + } + + ctx.History.IncrSuccess() + } + return nil + }, +}