diff --git a/pkg/api/authn_test.go b/pkg/api/authn_test.go index 6c01978b3c..adb1c1a325 100644 --- a/pkg/api/authn_test.go +++ b/pkg/api/authn_test.go @@ -935,7 +935,7 @@ func TestCookiestoreCleanup(t *testing.T) { Convey("Test cookiestore cleanup works", t, func() { taskScheduler := scheduler.NewScheduler(config.New(), metrics, log) taskScheduler.RateLimit = 50 * time.Millisecond - taskScheduler.RunScheduler(context.Background()) + taskScheduler.RunScheduler() rootDir := t.TempDir() @@ -973,7 +973,7 @@ func TestCookiestoreCleanup(t *testing.T) { Convey("Test cookiestore cleanup without permissions on rootDir", t, func() { taskScheduler := scheduler.NewScheduler(config.New(), metrics, log) taskScheduler.RateLimit = 50 * time.Millisecond - taskScheduler.RunScheduler(context.Background()) + taskScheduler.RunScheduler() rootDir := t.TempDir() diff --git a/pkg/api/controller.go b/pkg/api/controller.go index 358feaa441..96c2dd74e2 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -94,12 +94,12 @@ func (c *Controller) GetPort() int { return c.chosenPort } -func (c *Controller) Run(reloadCtx context.Context) error { +func (c *Controller) Run() error { if err := c.initCookieStore(); err != nil { return err } - c.StartBackgroundTasks(reloadCtx) + c.StartBackgroundTasks() // setup HTTP API router engine := mux.NewRouter() @@ -216,7 +216,7 @@ func (c *Controller) Run(reloadCtx context.Context) error { return server.Serve(listener) } -func (c *Controller) Init(reloadCtx context.Context) error { +func (c *Controller) Init() error { // print the current configuration, but strip secrets c.Log.Info().Interface("params", c.Config.Sanitize()).Msg("configuration settings") @@ -237,7 +237,7 @@ func (c *Controller) Init(reloadCtx context.Context) error { return err } - if err := c.InitMetaDB(reloadCtx); err != nil { + if err := c.InitMetaDB(); err != nil { return err } @@ -280,7 +280,7 @@ func (c *Controller) initCookieStore() error { return nil } -func (c *Controller) InitMetaDB(reloadCtx context.Context) error { +func (c *Controller) InitMetaDB() error { // init metaDB if search is enabled or we need to store user profiles, api keys or signatures if c.Config.IsSearchEnabled() || c.Config.IsBasicAuthnEnabled() || c.Config.IsImageTrustEnabled() || c.Config.IsRetentionEnabled() { @@ -310,7 +310,7 @@ func (c *Controller) InitMetaDB(reloadCtx context.Context) error { return nil } -func (c *Controller) LoadNewConfig(reloadCtx context.Context, newConfig *config.Config) { +func (c *Controller) LoadNewConfig(newConfig *config.Config) { // reload access control config c.Config.HTTP.AccessControl = newConfig.HTTP.AccessControl @@ -364,21 +364,24 @@ func (c *Controller) LoadNewConfig(reloadCtx context.Context, newConfig *config. c.InitCVEInfo() - c.StartBackgroundTasks(reloadCtx) - c.Log.Info().Interface("reloaded params", c.Config.Sanitize()). Msg("loaded new configuration settings") } func (c *Controller) Shutdown() { - c.taskScheduler.Shutdown() + c.StopBackgroundTasks() ctx := context.Background() _ = c.Server.Shutdown(ctx) } -func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) { +// Will stop scheduler and wait for all tasks to finish their work. +func (c *Controller) StopBackgroundTasks() { + c.taskScheduler.Shutdown() +} + +func (c *Controller) StartBackgroundTasks() { c.taskScheduler = scheduler.NewScheduler(c.Config, c.Metrics, c.Log) - c.taskScheduler.RunScheduler(reloadCtx) + c.taskScheduler.RunScheduler() // Enable running garbage-collect periodically for DefaultStore if c.Config.Storage.GC { diff --git a/pkg/api/controller_test.go b/pkg/api/controller_test.go index a5a849ae32..bb1c2cfcbf 100644 --- a/pkg/api/controller_test.go +++ b/pkg/api/controller_test.go @@ -300,10 +300,10 @@ func TestRunAlreadyRunningServer(t *testing.T) { cm.StartAndWait(port) defer cm.StopServer() - err := ctlr.Init(context.Background()) + err := ctlr.Init() So(err, ShouldNotBeNil) - err = ctlr.Run(context.Background()) + err = ctlr.Run() So(err, ShouldNotBeNil) }) } @@ -377,7 +377,7 @@ func TestObjectStorageController(t *testing.T) { ctlr := makeController(conf, tmp) So(ctlr, ShouldNotBeNil) - err := ctlr.Init(context.Background()) + err := ctlr.Init() So(err, ShouldNotBeNil) }) @@ -1218,7 +1218,7 @@ func TestMultipleInstance(t *testing.T) { } ctlr := api.NewController(conf) ctlr.Log.Info().Int64("seedUser", seedUser).Int64("seedPass", seedPass).Msg("random seed for username & password") - err := ctlr.Init(context.Background()) + err := ctlr.Init() So(err, ShouldEqual, errors.ErrImgStoreNotFound) globalDir := t.TempDir() @@ -1311,7 +1311,7 @@ func TestMultipleInstance(t *testing.T) { ctlr.Config.Storage.SubPaths = subPathMap - err := ctlr.Init(context.Background()) + err := ctlr.Init() So(err, ShouldNotBeNil) // subpath root directory does not exist. @@ -1320,7 +1320,7 @@ func TestMultipleInstance(t *testing.T) { ctlr.Config.Storage.SubPaths = subPathMap - err = ctlr.Init(context.Background()) + err = ctlr.Init() So(err, ShouldNotBeNil) subPathMap["/a"] = config.StorageConfig{RootDirectory: subDir, Dedupe: true, GC: true} @@ -1328,7 +1328,7 @@ func TestMultipleInstance(t *testing.T) { ctlr.Config.Storage.SubPaths = subPathMap - err = ctlr.Init(context.Background()) + err = ctlr.Init() So(err, ShouldNotBeNil) }) } @@ -1826,12 +1826,12 @@ func TestTSLFailedReadingOfCACert(t *testing.T) { defer cancel() ctlr := makeController(conf, t.TempDir()) - err = ctlr.Init(ctx) + err = ctlr.Init() So(err, ShouldBeNil) errChan := make(chan error, 1) go func() { - err = ctlr.Run(ctx) + err = ctlr.Run() errChan <- err }() @@ -1866,12 +1866,12 @@ func TestTSLFailedReadingOfCACert(t *testing.T) { defer cancel() ctlr := makeController(conf, t.TempDir()) - err = ctlr.Init(ctx) + err = ctlr.Init() So(err, ShouldBeNil) errChan := make(chan error, 1) go func() { - err = ctlr.Run(ctx) + err = ctlr.Run() errChan <- err }() diff --git a/pkg/cli/client/cve_cmd_test.go b/pkg/cli/client/cve_cmd_test.go index 43b08c3e51..6fb0714b2f 100644 --- a/pkg/cli/client/cve_cmd_test.go +++ b/pkg/cli/client/cve_cmd_test.go @@ -163,16 +163,14 @@ func TestNegativeServerResponse(t *testing.T) { ctlr := api.NewController(conf) ctlr.Log.Logger = ctlr.Log.Output(writers) - ctx := context.Background() - - if err := ctlr.Init(ctx); err != nil { + if err := ctlr.Init(); err != nil { panic(err) } ctlr.CveScanner = getMockCveScanner(ctlr.MetaDB) go func() { - if err := ctlr.Run(ctx); !errors.Is(err, http.ErrServerClosed) { + if err := ctlr.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } }() @@ -238,16 +236,14 @@ func TestServerCVEResponse(t *testing.T) { ctlr := api.NewController(conf) ctlr.Log.Logger = ctlr.Log.Output(writers) - ctx := context.Background() - - if err := ctlr.Init(ctx); err != nil { + if err := ctlr.Init(); err != nil { panic(err) } ctlr.CveScanner = getMockCveScanner(ctlr.MetaDB) go func() { - if err := ctlr.Run(ctx); !errors.Is(err, http.ErrServerClosed) { + if err := ctlr.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } }() @@ -576,9 +572,7 @@ func TestCVESort(t *testing.T) { t.FailNow() } - ctx := context.Background() - - if err := ctlr.Init(ctx); err != nil { + if err := ctlr.Init(); err != nil { panic(err) } @@ -615,7 +609,7 @@ func TestCVESort(t *testing.T) { } go func() { - if err := ctlr.Run(ctx); !errors.Is(err, http.ErrServerClosed) { + if err := ctlr.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } }() diff --git a/pkg/cli/client/image_cmd_test.go b/pkg/cli/client/image_cmd_test.go index 307d6c548f..0df92c7104 100644 --- a/pkg/cli/client/image_cmd_test.go +++ b/pkg/cli/client/image_cmd_test.go @@ -866,7 +866,7 @@ func TestServerResponseGQLWithoutPermissions(t *testing.T) { } ctlr := api.NewController(conf) - if err := ctlr.Init(context.Background()); err != nil { + if err := ctlr.Init(); err != nil { So(err, ShouldNotBeNil) } }) diff --git a/pkg/cli/server/config_reloader.go b/pkg/cli/server/config_reloader.go index 2c90499ea1..bfd42124db 100644 --- a/pkg/cli/server/config_reloader.go +++ b/pkg/cli/server/config_reloader.go @@ -1,7 +1,6 @@ package server import ( - "context" "os" "os/signal" "syscall" @@ -35,28 +34,22 @@ func NewHotReloader(ctlr *api.Controller, filePath string) (*HotReloader, error) return hotReloader, nil } -func signalHandler(ctlr *api.Controller, sigCh chan os.Signal, ctx context.Context, cancel context.CancelFunc) { - select { +func signalHandler(ctlr *api.Controller, sigCh chan os.Signal) { // if signal then shutdown - case sig := <-sigCh: - defer cancel() - + if sig, ok := <-sigCh; ok { ctlr.Log.Info().Interface("signal", sig).Msg("received signal") // gracefully shutdown http server ctlr.Shutdown() //nolint: contextcheck close(sigCh) - // if reload then return - case <-ctx.Done(): - return } } -func initShutDownRoutine(ctlr *api.Controller, ctx context.Context, cancel context.CancelFunc) { +func initShutDownRoutine(ctlr *api.Controller) { sigCh := make(chan os.Signal, 1) - go signalHandler(ctlr, sigCh, ctx, cancel) + go signalHandler(ctlr, sigCh) // block all async signals to this server signal.Ignore() @@ -65,12 +58,10 @@ func initShutDownRoutine(ctlr *api.Controller, ctx context.Context, cancel conte signal.Notify(sigCh, syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP) } -func (hr *HotReloader) Start() context.Context { - reloadCtx, cancelFunc := context.WithCancel(context.Background()) - +func (hr *HotReloader) Start() { done := make(chan bool) - initShutDownRoutine(hr.ctlr, reloadCtx, cancelFunc) + initShutDownRoutine(hr.ctlr) // run watcher go func() { @@ -92,16 +83,15 @@ func (hr *HotReloader) Start() context.Context { continue } - // if valid config then reload - cancelFunc() - // create new context - reloadCtx, cancelFunc = context.WithCancel(context.Background()) + // stop background tasks gracefully + hr.ctlr.StopBackgroundTasks() - // init shutdown routine - initShutDownRoutine(hr.ctlr, reloadCtx, cancelFunc) + // load new config + hr.ctlr.LoadNewConfig(newConfig) - hr.ctlr.LoadNewConfig(reloadCtx, newConfig) + // start background tasks based on new loaded config + hr.ctlr.StartBackgroundTasks() } // watch for errors case err := <-hr.watcher.Errors: @@ -116,6 +106,4 @@ func (hr *HotReloader) Start() context.Context { <-done }() - - return reloadCtx } diff --git a/pkg/cli/server/root.go b/pkg/cli/server/root.go index a73ef3573e..58b65a02d9 100644 --- a/pkg/cli/server/root.go +++ b/pkg/cli/server/root.go @@ -61,17 +61,15 @@ func newServeCmd(conf *config.Config) *cobra.Command { return err } - /* context used to cancel go routines so that - we can change their config on the fly (restart routines with different config) */ - reloaderCtx := hotReloader.Start() + hotReloader.Start() - if err := ctlr.Init(reloaderCtx); err != nil { + if err := ctlr.Init(); err != nil { ctlr.Log.Error().Err(err).Msg("failed to init controller") return err } - if err := ctlr.Run(reloaderCtx); err != nil { + if err := ctlr.Run(); err != nil { log.Error().Err(err).Msg("unable to start controller, exiting") } diff --git a/pkg/compliance/v1_0_0/check_test.go b/pkg/compliance/v1_0_0/check_test.go index 3b104f3a35..1ed06f74e5 100644 --- a/pkg/compliance/v1_0_0/check_test.go +++ b/pkg/compliance/v1_0_0/check_test.go @@ -81,12 +81,12 @@ func startServer(t *testing.T) (*api.Controller, string) { ctrl.Config.Storage.SubPaths = subPaths go func() { - if err := ctrl.Init(context.Background()); err != nil { + if err := ctrl.Init(); err != nil { return } // this blocks - if err := ctrl.Run(context.Background()); err != nil { + if err := ctrl.Run(); err != nil { return } }() diff --git a/pkg/exporter/api/controller_test.go b/pkg/exporter/api/controller_test.go index 387470b81f..f9bd085218 100644 --- a/pkg/exporter/api/controller_test.go +++ b/pkg/exporter/api/controller_test.go @@ -140,12 +140,12 @@ func TestNewExporter(t *testing.T) { dir := t.TempDir() serverController.Config.Storage.RootDirectory = dir go func(ctrl *zotapi.Controller) { - if err := ctrl.Init(context.Background()); err != nil { + if err := ctrl.Init(); err != nil { panic(err) } // this blocks - if err := ctrl.Run(context.Background()); !errors.Is(err, http.ErrServerClosed) { + if err := ctrl.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } }(serverController) diff --git a/pkg/extensions/monitoring/monitoring_test.go b/pkg/extensions/monitoring/monitoring_test.go index 1a9a4ced36..eb6cd584a8 100644 --- a/pkg/extensions/monitoring/monitoring_test.go +++ b/pkg/extensions/monitoring/monitoring_test.go @@ -4,7 +4,6 @@ package monitoring_test import ( - "context" "fmt" "io" "math/rand" @@ -463,8 +462,7 @@ func TestPopulateStorageMetrics(t *testing.T) { metrics := monitoring.NewMetricsServer(true, ctlr.Log) sch := scheduler.NewScheduler(conf, metrics, ctlr.Log) - ctx, cancel := context.WithCancel(context.Background()) - sch.RunScheduler(ctx) + sch.RunScheduler() generator := &common.StorageMetricsInitGenerator{ ImgStore: ctlr.StoreController.DefaultStore, @@ -485,7 +483,7 @@ func TestPopulateStorageMetrics(t *testing.T) { So(err, ShouldBeNil) So(found, ShouldBeTrue) - cancel() + sch.Shutdown() alpineSize, err := monitoring.GetDirSize(path.Join(rootDir, "alpine")) So(err, ShouldBeNil) busyboxSize, err := monitoring.GetDirSize(path.Join(rootDir, "busybox")) diff --git a/pkg/extensions/search/cve/scan_test.go b/pkg/extensions/search/cve/scan_test.go index f9aa8ed7ff..c6638b20e9 100644 --- a/pkg/extensions/search/cve/scan_test.go +++ b/pkg/extensions/search/cve/scan_test.go @@ -432,11 +432,9 @@ func TestScanGeneratorWithMockedData(t *testing.T) { //nolint: gocyclo sch.SubmitGenerator(generator, 10*time.Second, scheduler.MediumPriority) - ctx, cancel := context.WithCancel(context.Background()) + sch.RunScheduler() - sch.RunScheduler(ctx) - - defer cancel() + defer sch.Shutdown() // Make sure the scanner generator has completed despite errors found, err := test.ReadLogFileAndSearchString(logPath, @@ -529,11 +527,9 @@ func TestScanGeneratorWithRealData(t *testing.T) { // Start the generator sch.SubmitGenerator(generator, 120*time.Second, scheduler.MediumPriority) - ctx, cancel := context.WithCancel(context.Background()) - - sch.RunScheduler(ctx) + sch.RunScheduler() - defer cancel() + defer sch.Shutdown() // Make sure the scanner generator has completed found, err := test.ReadLogFileAndSearchString(logPath, diff --git a/pkg/extensions/search/cve/update_test.go b/pkg/extensions/search/cve/update_test.go index 47f9d1f3b2..ca52c6ee5e 100644 --- a/pkg/extensions/search/cve/update_test.go +++ b/pkg/extensions/search/cve/update_test.go @@ -63,11 +63,9 @@ func TestCVEDBGenerator(t *testing.T) { sch.SubmitGenerator(generator, 12000*time.Millisecond, scheduler.HighPriority) - ctx, cancel := context.WithCancel(context.Background()) + sch.RunScheduler() - sch.RunScheduler(ctx) - - defer cancel() + defer sch.Shutdown() // Wait for trivy db to download found, err := test.ReadLogFileAndCountStringOccurence(logPath, diff --git a/pkg/extensions/search/search_test.go b/pkg/extensions/search/search_test.go index e0101c8ed9..d297480aa0 100644 --- a/pkg/extensions/search/search_test.go +++ b/pkg/extensions/search/search_test.go @@ -681,16 +681,14 @@ func TestRepoListWithNewestImage(t *testing.T) { ctlr := api.NewController(conf) ctlr.Log.Logger = ctlr.Log.Output(writers) - ctx := context.Background() - - if err := ctlr.Init(ctx); err != nil { + if err := ctlr.Init(); err != nil { panic(err) } ctlr.CveScanner = getMockCveScanner(ctlr.MetaDB) go func() { - if err := ctlr.Run(ctx); !errors.Is(err, http.ErrServerClosed) { + if err := ctlr.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } }() @@ -3372,16 +3370,14 @@ func TestGlobalSearch(t *testing.T) { ctlr := api.NewController(conf) ctlr.Log.Logger = ctlr.Log.Output(writers) - ctx := context.Background() - - if err := ctlr.Init(ctx); err != nil { + if err := ctlr.Init(); err != nil { panic(err) } ctlr.CveScanner = getMockCveScanner(ctlr.MetaDB) go func() { - if err := ctlr.Run(ctx); !errors.Is(err, http.ErrServerClosed) { + if err := ctlr.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } }() @@ -6203,16 +6199,14 @@ func TestImageSummary(t *testing.T) { configDigest := godigest.FromBytes(configBlob) So(errConfig, ShouldBeNil) // marshall success, config is valid JSON - ctx := context.Background() - - if err := ctlr.Init(ctx); err != nil { + if err := ctlr.Init(); err != nil { panic(err) } ctlr.CveScanner = getMockCveScanner(ctlr.MetaDB) go func() { - if err := ctlr.Run(ctx); !errors.Is(err, http.ErrServerClosed) { + if err := ctlr.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } }() diff --git a/pkg/extensions/sync/sync_test.go b/pkg/extensions/sync/sync_test.go index 9ee9154ff6..4eec8f90bd 100644 --- a/pkg/extensions/sync/sync_test.go +++ b/pkg/extensions/sync/sync_test.go @@ -1901,15 +1901,15 @@ func TestConfigReloader(t *testing.T) { hotReloader, err := cli.NewHotReloader(dctlr, cfgfile.Name()) So(err, ShouldBeNil) - reloadCtx := hotReloader.Start() + hotReloader.Start() go func() { // this blocks - if err := dctlr.Init(reloadCtx); err != nil { + if err := dctlr.Init(); err != nil { return } - if err := dctlr.Run(reloadCtx); err != nil { + if err := dctlr.Run(); err != nil { return } }() @@ -2051,15 +2051,15 @@ func TestConfigReloader(t *testing.T) { hotReloader, err := cli.NewHotReloader(dctlr, cfgfile.Name()) So(err, ShouldBeNil) - reloadCtx := hotReloader.Start() + hotReloader.Start() go func() { // this blocks - if err := dctlr.Init(reloadCtx); err != nil { + if err := dctlr.Init(); err != nil { return } - if err := dctlr.Run(reloadCtx); err != nil { + if err := dctlr.Run(); err != nil { return } }() diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 9d71460f53..5c43ba4277 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -83,12 +83,7 @@ type Scheduler struct { workerWg *sync.WaitGroup isShuttingDown atomic.Bool metricServer monitoring.MetricServer - - // ensure the scheduler can only be stopped once - stop sync.Once - - // close to signal the workers to stop working - quit chan struct{} + cancelFunc context.CancelFunc } func NewScheduler(cfg *config.Config, ms monitoring.MetricServer, logC log.Logger) *Scheduler { //nolint: varnamelen @@ -119,8 +114,6 @@ func NewScheduler(cfg *config.Config, ms monitoring.MetricServer, logC log.Logge workerChan: make(chan Task, numWorkers), metricsChan: make(chan struct{}, 1), workerWg: new(sync.WaitGroup), - stop: sync.Once{}, - quit: make(chan struct{}), } } @@ -219,11 +212,8 @@ func (scheduler *Scheduler) metricsWorker() { } /* -Scheduler can be stopped either by stopping the context provided in scheduler.RunScheduler(ctx context.Context) - - or by calling this function or both. - -Shutdown() will wait for all tasks being run to finish their work before exiting. +Scheduler can be stopped by calling Shutdown(). +it will wait for all tasks being run to finish their work before exiting. */ func (scheduler *Scheduler) Shutdown() { defer scheduler.workerWg.Wait() @@ -238,12 +228,10 @@ func (scheduler *Scheduler) inShutdown() bool { } func (scheduler *Scheduler) shutdown() { - scheduler.stop.Do(func() { - scheduler.isShuttingDown.Store(true) + scheduler.isShuttingDown.Store(true) - close(scheduler.metricsChan) - close(scheduler.quit) - }) + scheduler.cancelFunc() + close(scheduler.metricsChan) } /* @@ -251,7 +239,10 @@ func (scheduler *Scheduler) shutdown() { canceling the context will stop scheduler and also it will notify tasks to finish their work gracefully. */ -func (scheduler *Scheduler) RunScheduler(ctx context.Context) { +func (scheduler *Scheduler) RunScheduler() { + ctx, cancel := context.WithCancel(context.Background()) + scheduler.cancelFunc = cancel + throttle := time.NewTicker(rateLimit).C numWorkers := scheduler.NumWorkers @@ -271,7 +262,6 @@ func (scheduler *Scheduler) RunScheduler(ctx context.Context) { for { select { - // can be stopped either by closing from outside the ctx given to RunScheduler() case <-ctx.Done(): if !scheduler.inShutdown() { scheduler.shutdown() @@ -279,11 +269,6 @@ func (scheduler *Scheduler) RunScheduler(ctx context.Context) { scheduler.log.Debug().Msg("scheduler: received stop signal, gracefully shutting down...") - return - // or by running scheduler.Shutdown() - case <-scheduler.quit: - scheduler.log.Debug().Msg("scheduler: received stop signal, gracefully shutting down...") - return default: // we don't want to block on sending task in workerChan. diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 78e391b088..984571e7e1 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -132,11 +132,9 @@ func TestScheduler(t *testing.T) { // interval has to be higher than throttle value to simulate sch.SubmitGenerator(genH, 6*time.Second, scheduler.HighPriority) - ctx, cancel := context.WithCancel(context.Background()) - sch.RunScheduler(ctx) - + sch.RunScheduler() time.Sleep(7 * time.Second) - cancel() + sch.Shutdown() data, err := os.ReadFile(logFile.Name()) So(err, ShouldBeNil) @@ -164,12 +162,9 @@ func TestScheduler(t *testing.T) { genH := &generator{log: logger, priority: "high priority"} sch.SubmitGenerator(genH, time.Duration(0), scheduler.HighPriority) - ctx, cancel := context.WithCancel(context.Background()) - - sch.RunScheduler(ctx) - + sch.RunScheduler() time.Sleep(4 * time.Second) - cancel() + sch.Shutdown() data, err := os.ReadFile(logFile.Name()) So(err, ShouldBeNil) @@ -193,11 +188,9 @@ func TestScheduler(t *testing.T) { t := &task{log: logger, msg: "", err: true} sch.SubmitTask(t, scheduler.MediumPriority) - ctx, cancel := context.WithCancel(context.Background()) - sch.RunScheduler(ctx) - + sch.RunScheduler() time.Sleep(500 * time.Millisecond) - cancel() + sch.Shutdown() data, err := os.ReadFile(logFile.Name()) So(err, ShouldBeNil) @@ -218,11 +211,9 @@ func TestScheduler(t *testing.T) { genL := &generator{log: logger, priority: "low priority"} sch.SubmitGenerator(genL, 20*time.Millisecond, scheduler.LowPriority) - ctx, cancel := context.WithCancel(context.Background()) - sch.RunScheduler(ctx) - + sch.RunScheduler() time.Sleep(4 * time.Second) - cancel() + sch.Shutdown() data, err := os.ReadFile(logFile.Name()) So(err, ShouldBeNil) @@ -258,10 +249,8 @@ func TestScheduler(t *testing.T) { metrics := monitoring.NewMetricsServer(true, logger) sch := scheduler.NewScheduler(config.New(), metrics, logger) - ctx, cancel := context.WithCancel(context.Background()) - - sch.RunScheduler(ctx) - cancel() + sch.RunScheduler() + sch.Shutdown() time.Sleep(500 * time.Millisecond) t := &task{log: logger, msg: "", err: false} @@ -272,7 +261,7 @@ func TestScheduler(t *testing.T) { So(string(data), ShouldNotContainSubstring, "scheduler: adding a new task") }) - Convey("Test stopping scheduler by cancelling the context", t, func() { + Convey("Test stopping scheduler by calling Shutdown()", t, func() { logFile, err := os.CreateTemp("", "zot-log*.txt") So(err, ShouldBeNil) @@ -285,13 +274,9 @@ func TestScheduler(t *testing.T) { genL := &generator{log: logger, priority: "medium priority"} sch.SubmitGenerator(genL, 20*time.Millisecond, scheduler.MediumPriority) - ctx, cancel := context.WithCancel(context.Background()) - sch.RunScheduler(ctx) - - time.Sleep(1 * time.Second) - cancel() - + sch.RunScheduler() time.Sleep(4 * time.Second) + sch.Shutdown() data, err := os.ReadFile(logFile.Name()) So(err, ShouldBeNil) @@ -300,73 +285,6 @@ func TestScheduler(t *testing.T) { So(string(data), ShouldContainSubstring, "scheduler: received stop signal, gracefully shutting down...") }) - Convey("Test stopping scheduler by calling scheduler.Shutdown()", t, func() { - logFile, err := os.CreateTemp("", "zot-log*.txt") - So(err, ShouldBeNil) - - defer os.Remove(logFile.Name()) // clean up - - logger := log.NewLogger("debug", logFile.Name()) - metrics := monitoring.NewMetricsServer(true, logger) - sch := scheduler.NewScheduler(config.New(), metrics, logger) - - genL := &generator{log: logger, priority: "high priority"} - sch.SubmitGenerator(genL, 20*time.Millisecond, scheduler.HighPriority) - - sch.RunScheduler(context.Background()) - - time.Sleep(4 * time.Second) - - sch.Shutdown() - - data, err := os.ReadFile(logFile.Name()) - t.Log(string(data)) - So(err, ShouldBeNil) - So(string(data), ShouldContainSubstring, "executing high priority task; index: 1") - So(string(data), ShouldContainSubstring, "executing high priority task; index: 2") - So(string(data), ShouldContainSubstring, "scheduler: received stop signal, gracefully shutting down...") - }) - - Convey("Test stopping scheduler by both calling scheduler.Shutdown() and cancelling context", t, func() { - logFile, err := os.CreateTemp("", "zot-log*.txt") - So(err, ShouldBeNil) - - defer os.Remove(logFile.Name()) // clean up - - logger := log.NewLogger("debug", logFile.Name()) - metrics := monitoring.NewMetricsServer(true, logger) - sch := scheduler.NewScheduler(config.New(), metrics, logger) - - genL := &generator{log: logger, priority: "high priority"} - sch.SubmitGenerator(genL, 1*time.Millisecond, scheduler.HighPriority) - - ctx, cancel := context.WithCancel(context.Background()) - sch.RunScheduler(ctx) - - time.Sleep(4 * time.Second) - - go func() { - cancel() - sch.Shutdown() - }() - - go func() { - sch.Shutdown() - }() - - // will wait for scheduler to finish all tasks - sch.Shutdown() - - sch.Shutdown() - - data, err := os.ReadFile(logFile.Name()) - t.Log(string(data)) - So(err, ShouldBeNil) - So(string(data), ShouldContainSubstring, "executing high priority task; index: 1") - So(string(data), ShouldContainSubstring, "executing high priority task; index: 2") - So(string(data), ShouldContainSubstring, "scheduler: received stop signal, gracefully shutting down...") - }) - Convey("Test scheduler Priority.String() method", t, func() { var p scheduler.Priority //nolint: varnamelen // test invalid priority diff --git a/pkg/storage/local/local_test.go b/pkg/storage/local/local_test.go index 2b81ce9705..4efc5f6745 100644 --- a/pkg/storage/local/local_test.go +++ b/pkg/storage/local/local_test.go @@ -62,16 +62,15 @@ var DeleteReferrers = config.ImageRetention{ //nolint: gochecknoglobals var errCache = errors.New("new cache error") -func runAndGetScheduler() (*scheduler.Scheduler, context.CancelFunc) { +func runAndGetScheduler() *scheduler.Scheduler { log := zlog.Logger{} metrics := monitoring.NewMetricsServer(true, log) taskScheduler := scheduler.NewScheduler(config.New(), metrics, log) taskScheduler.RateLimit = 50 * time.Millisecond - ctx, cancel := context.WithCancel(context.Background()) - taskScheduler.RunScheduler(ctx) + taskScheduler.RunScheduler() - return taskScheduler, cancel + return taskScheduler } func TestStorageFSAPIs(t *testing.T) { @@ -1195,14 +1194,14 @@ func TestDedupeLinks(t *testing.T) { // run on empty image store // switch dedupe to true from false - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() // rebuild with dedupe true imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) // wait until rebuild finishes time.Sleep(1 * time.Second) - cancel() + taskScheduler.Shutdown() // manifest1 upload, err := imgStore.NewBlobUpload("dedupe1") @@ -1367,7 +1366,7 @@ func TestDedupeLinks(t *testing.T) { Convey("Intrerrupt rebuilding and restart, checking idempotency", func() { for i := 0; i < 10; i++ { - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() // rebuild with dedupe true imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, cacheDriver) @@ -1375,10 +1374,10 @@ func TestDedupeLinks(t *testing.T) { sleepValue := i * 5 time.Sleep(time.Duration(sleepValue) * time.Millisecond) - cancel() + taskScheduler.Shutdown() } - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() // rebuild with dedupe true imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, cacheDriver) @@ -1387,7 +1386,7 @@ func TestDedupeLinks(t *testing.T) { // wait until rebuild finishes time.Sleep(10 * time.Second) - cancel() + taskScheduler.Shutdown() fi1, err := os.Stat(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1)) So(err, ShouldBeNil) @@ -1398,7 +1397,7 @@ func TestDedupeLinks(t *testing.T) { Convey("rebuild dedupe index error cache nil", func() { // switch dedupe to true from false - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, nil) @@ -1408,7 +1407,7 @@ func TestDedupeLinks(t *testing.T) { time.Sleep(3 * time.Second) - cancel() + taskScheduler.Shutdown() fi1, err := os.Stat(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1)) So(err, ShouldBeNil) @@ -1420,7 +1419,7 @@ func TestDedupeLinks(t *testing.T) { Convey("rebuild dedupe index cache error on original blob", func() { // switch dedupe to true from false - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, &mocks.CacheMock{ HasBlobFn: func(digest godigest.Digest, path string) bool { @@ -1436,7 +1435,7 @@ func TestDedupeLinks(t *testing.T) { time.Sleep(10 * time.Second) - cancel() + taskScheduler.Shutdown() fi1, err := os.Stat(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1)) So(err, ShouldBeNil) @@ -1448,7 +1447,7 @@ func TestDedupeLinks(t *testing.T) { Convey("rebuild dedupe index cache error on duplicate blob", func() { // switch dedupe to true from false - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, &mocks.CacheMock{ HasBlobFn: func(digest godigest.Digest, path string) bool { @@ -1468,7 +1467,7 @@ func TestDedupeLinks(t *testing.T) { time.Sleep(15 * time.Second) - cancel() + taskScheduler.Shutdown() fi1, err := os.Stat(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1)) So(err, ShouldBeNil) diff --git a/pkg/storage/s3/s3_test.go b/pkg/storage/s3/s3_test.go index d9e3b9079a..ecd9ab809b 100644 --- a/pkg/storage/s3/s3_test.go +++ b/pkg/storage/s3/s3_test.go @@ -186,16 +186,15 @@ func createObjectsStoreDynamo(rootDir string, cacheDir string, dedupe bool, tabl return store, il, err } -func runAndGetScheduler() (*scheduler.Scheduler, context.CancelFunc) { +func runAndGetScheduler() *scheduler.Scheduler { logger := log.Logger{} metrics := monitoring.NewMetricsServer(false, logger) taskScheduler := scheduler.NewScheduler(config.New(), metrics, logger) taskScheduler.RateLimit = 50 * time.Millisecond - ctx, cancel := context.WithCancel(context.Background()) - taskScheduler.RunScheduler(ctx) + taskScheduler.RunScheduler() - return taskScheduler, cancel + return taskScheduler } type FileInfoMock struct { @@ -1587,7 +1586,7 @@ func TestS3Dedupe(t *testing.T) { }) Convey("rebuild s3 dedupe index from true to false", func() { //nolint: dupl - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), false) defer cleanupStorage(storeDriver, testDir) @@ -1598,7 +1597,7 @@ func TestS3Dedupe(t *testing.T) { time.Sleep(10 * time.Second) - cancel() + taskScheduler.Shutdown() fi1, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe1", "blobs", "sha256", blobDigest1.Encoded())) @@ -1615,9 +1614,7 @@ func TestS3Dedupe(t *testing.T) { So(len(blobContent), ShouldEqual, fi1.Size()) Convey("rebuild s3 dedupe index from false to true", func() { - taskScheduler, cancel := runAndGetScheduler() - - defer cancel() + taskScheduler := runAndGetScheduler() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), true) defer cleanupStorage(storeDriver, testDir) @@ -1628,6 +1625,8 @@ func TestS3Dedupe(t *testing.T) { time.Sleep(10 * time.Second) + taskScheduler.Shutdown() + fi2, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", blobDigest2.Encoded())) So(err, ShouldBeNil) @@ -1816,7 +1815,7 @@ func TestS3Dedupe(t *testing.T) { }) Convey("rebuild s3 dedupe index from true to false", func() { //nolint: dupl - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), false) defer cleanupStorage(storeDriver, testDir) @@ -1827,7 +1826,7 @@ func TestS3Dedupe(t *testing.T) { time.Sleep(10 * time.Second) - cancel() + taskScheduler.Shutdown() fi1, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe1", "blobs", "sha256", blobDigest1.Encoded())) @@ -1861,7 +1860,7 @@ func TestS3Dedupe(t *testing.T) { }) Convey("rebuild s3 dedupe index from false to true", func() { - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), true) defer cleanupStorage(storeDriver, testDir) @@ -1872,7 +1871,7 @@ func TestS3Dedupe(t *testing.T) { time.Sleep(10 * time.Second) - cancel() + taskScheduler.Shutdown() fi2, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", blobDigest2.Encoded())) @@ -2055,9 +2054,7 @@ func TestRebuildDedupeIndex(t *testing.T) { taskScheduler := scheduler.NewScheduler(config.New(), metrics, logger) taskScheduler.RateLimit = 1 * time.Millisecond - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - - taskScheduler.RunScheduler(ctx) + taskScheduler.RunScheduler() storeDriver, imgStore, _ = createObjectsStore(testDir, t.TempDir(), false) defer cleanupStorage(storeDriver, testDir) @@ -2067,17 +2064,17 @@ func TestRebuildDedupeIndex(t *testing.T) { sleepValue := i * 5 time.Sleep(time.Duration(sleepValue) * time.Millisecond) - cancel() + taskScheduler.Shutdown() } - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) // wait until rebuild finishes time.Sleep(10 * time.Second) - cancel() + taskScheduler.Shutdown() fi2, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", blobDigest2.Encoded())) @@ -2096,10 +2093,7 @@ func TestRebuildDedupeIndex(t *testing.T) { taskScheduler := scheduler.NewScheduler(config.New(), metrics, logger) taskScheduler.RateLimit = 1 * time.Millisecond - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() - - taskScheduler.RunScheduler(ctx) + taskScheduler.RunScheduler() storeDriver, imgStore, _ = createObjectsStore(testDir, t.TempDir(), true) defer cleanupStorage(storeDriver, testDir) @@ -2110,10 +2104,10 @@ func TestRebuildDedupeIndex(t *testing.T) { sleepValue := i * 5 time.Sleep(time.Duration(sleepValue) * time.Millisecond) - cancel() + taskScheduler.Shutdown() } - taskScheduler, cancel = runAndGetScheduler() + taskScheduler = runAndGetScheduler() // rebuild with dedupe false, should have all blobs with content imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) @@ -2121,7 +2115,7 @@ func TestRebuildDedupeIndex(t *testing.T) { // wait until rebuild finishes time.Sleep(10 * time.Second) - cancel() + taskScheduler.Shutdown() fi2, err = storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", blobDigest2.Encoded())) @@ -2140,18 +2134,18 @@ func TestRebuildDedupeIndex(t *testing.T) { storeDriver, imgStore, _ := createObjectsStore(testDir, tdir, true) defer cleanupStorage(storeDriver, testDir) - taskScheduler, cancel := runAndGetScheduler() - defer cancel() + taskScheduler := runAndGetScheduler() imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) // wait until rebuild finishes time.Sleep(3 * time.Second) + + taskScheduler.Shutdown() }) Convey("Rebuild dedupe index already rebuilt", func() { - taskScheduler, cancel := runAndGetScheduler() - defer cancel() + taskScheduler := runAndGetScheduler() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), true) defer cleanupStorage(storeDriver, testDir) @@ -2160,6 +2154,8 @@ func TestRebuildDedupeIndex(t *testing.T) { // wait until rebuild finishes time.Sleep(5 * time.Second) + + taskScheduler.Shutdown() }) Convey("Trigger Stat error while getting original blob", func() { @@ -2171,13 +2167,14 @@ func TestRebuildDedupeIndex(t *testing.T) { err := storeDriver.PutContent(context.Background(), fi1.Path(), []byte{}) So(err, ShouldBeNil) - taskScheduler, cancel := runAndGetScheduler() - defer cancel() + taskScheduler := runAndGetScheduler() imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) // wait until rebuild finishes time.Sleep(5 * time.Second) + + taskScheduler.Shutdown() }) Convey("Trigger ErrDedupeRebuild while statting original blob", func() { @@ -2185,8 +2182,7 @@ func TestRebuildDedupeIndex(t *testing.T) { err := storeDriver.Delete(context.Background(), fi1.Path()) So(err, ShouldBeNil) - taskScheduler, cancel := runAndGetScheduler() - defer cancel() + taskScheduler := runAndGetScheduler() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), true) defer cleanupStorage(storeDriver, testDir) @@ -2195,6 +2191,8 @@ func TestRebuildDedupeIndex(t *testing.T) { // wait until rebuild finishes time.Sleep(5 * time.Second) + + taskScheduler.Shutdown() }) Convey("Trigger ErrDedupeRebuild when original blob has 0 size", func() { @@ -2202,8 +2200,7 @@ func TestRebuildDedupeIndex(t *testing.T) { err := storeDriver.PutContent(context.Background(), fi1.Path(), []byte{}) So(err, ShouldBeNil) - taskScheduler, cancel := runAndGetScheduler() - defer cancel() + taskScheduler := runAndGetScheduler() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), true) defer cleanupStorage(storeDriver, testDir) @@ -2213,6 +2210,8 @@ func TestRebuildDedupeIndex(t *testing.T) { // wait until rebuild finishes time.Sleep(5 * time.Second) + + taskScheduler.Shutdown() }) Convey("Trigger GetNextDigestWithBlobPaths path not found err", func() { @@ -2224,19 +2223,19 @@ func TestRebuildDedupeIndex(t *testing.T) { err := storeDriver.Delete(context.Background(), imgStore.RootDir()) So(err, ShouldBeNil) - taskScheduler, cancel := runAndGetScheduler() - defer cancel() + taskScheduler := runAndGetScheduler() // rebuild with dedupe false, should have all blobs with content imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) // wait until rebuild finishes time.Sleep(5 * time.Second) + + taskScheduler.Shutdown() }) Convey("Rebuild from true to false", func() { - taskScheduler, cancel := runAndGetScheduler() - defer cancel() + taskScheduler := runAndGetScheduler() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), false) defer cleanupStorage(storeDriver, testDir) @@ -2247,6 +2246,8 @@ func TestRebuildDedupeIndex(t *testing.T) { // wait until rebuild finishes time.Sleep(10 * time.Second) + taskScheduler.Shutdown() + fi2, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", blobDigest2.Encoded())) So(err, ShouldBeNil) diff --git a/pkg/test/common/utils.go b/pkg/test/common/utils.go index 51e17b721b..d2f1d57c04 100644 --- a/pkg/test/common/utils.go +++ b/pkg/test/common/utils.go @@ -1,7 +1,6 @@ package common import ( - "context" "errors" "fmt" "math/rand" @@ -62,44 +61,34 @@ func Location(baseURL string, resp *resty.Response) string { } type Controller interface { - Init(ctx context.Context) error - Run(ctx context.Context) error + Init() error + Run() error Shutdown() GetPort() int } type ControllerManager struct { controller Controller - // used to stop background tasks(goroutines) - cancelRoutinesFunc context.CancelFunc } -func (cm *ControllerManager) RunServer(ctx context.Context) { +func (cm *ControllerManager) RunServer() { // Useful to be able to call in the same goroutine for testing purposes - if err := cm.controller.Run(ctx); !errors.Is(err, http.ErrServerClosed) { + if err := cm.controller.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } } func (cm *ControllerManager) StartServer() { - ctx, cancel := context.WithCancel(context.Background()) - cm.cancelRoutinesFunc = cancel - - if err := cm.controller.Init(ctx); err != nil { + if err := cm.controller.Init(); err != nil { panic(err) } go func() { - cm.RunServer(ctx) + cm.RunServer() }() } func (cm *ControllerManager) StopServer() { - // stop background tasks - if cm.cancelRoutinesFunc != nil { - cm.cancelRoutinesFunc() - } - cm.controller.Shutdown() } diff --git a/pkg/test/common/utils_test.go b/pkg/test/common/utils_test.go index f9e485b85e..9c547d1027 100644 --- a/pkg/test/common/utils_test.go +++ b/pkg/test/common/utils_test.go @@ -1,7 +1,6 @@ package common_test import ( - "context" "os" "path" "testing" @@ -53,11 +52,9 @@ func TestControllerManager(t *testing.T) { ctlr := api.NewController(conf) ctlrManager := tcommon.NewControllerManager(ctlr) - ctx := context.Background() - - err := ctlr.Init(ctx) + err := ctlr.Init() So(err, ShouldBeNil) - So(func() { ctlrManager.RunServer(ctx) }, ShouldPanic) + So(func() { ctlrManager.RunServer() }, ShouldPanic) }) }