From 7d8179ad80e4da7e1eba70b77a65cfb4f2ba6c76 Mon Sep 17 00:00:00 2001 From: Petu Eusebiu Date: Thu, 7 Dec 2023 15:25:48 +0200 Subject: [PATCH] refactor(scheduler): refactor into a single shutdown before this we could stop scheduler either by closing the context provided to RunScheduler(ctx) or by running Shutdown(). simplify things by getting rid of the external context in RunScheduler(). keep an internal context in the scheduler itself and pass it down to all tasks. Signed-off-by: Petu Eusebiu --- pkg/api/authn_test.go | 4 +- pkg/api/controller.go | 25 +++-- pkg/api/controller_test.go | 22 ++-- pkg/cli/client/cve_cmd_test.go | 18 ++-- pkg/cli/client/image_cmd_test.go | 2 +- pkg/cli/server/config_reloader.go | 36 +++---- pkg/cli/server/root.go | 8 +- pkg/compliance/v1_0_0/check_test.go | 4 +- pkg/exporter/api/controller_test.go | 4 +- pkg/extensions/monitoring/monitoring_test.go | 6 +- pkg/extensions/search/cve/scan_test.go | 12 +-- pkg/extensions/search/cve/update_test.go | 6 +- pkg/extensions/search/search_test.go | 18 ++-- pkg/extensions/sync/sync_test.go | 12 +-- pkg/scheduler/scheduler.go | 35 ++---- pkg/scheduler/scheduler_test.go | 108 +++---------------- pkg/storage/local/local_test.go | 31 +++--- pkg/storage/s3/s3_test.go | 81 +++++++------- pkg/test/common/utils.go | 23 ++-- pkg/test/common/utils_test.go | 7 +- 20 files changed, 160 insertions(+), 302 deletions(-) diff --git a/pkg/api/authn_test.go b/pkg/api/authn_test.go index 6c01978b3c..adb1c1a325 100644 --- a/pkg/api/authn_test.go +++ b/pkg/api/authn_test.go @@ -935,7 +935,7 @@ func TestCookiestoreCleanup(t *testing.T) { Convey("Test cookiestore cleanup works", t, func() { taskScheduler := scheduler.NewScheduler(config.New(), metrics, log) taskScheduler.RateLimit = 50 * time.Millisecond - taskScheduler.RunScheduler(context.Background()) + taskScheduler.RunScheduler() rootDir := t.TempDir() @@ -973,7 +973,7 @@ func TestCookiestoreCleanup(t *testing.T) { Convey("Test cookiestore cleanup without permissions on rootDir", t, func() { taskScheduler := scheduler.NewScheduler(config.New(), metrics, log) taskScheduler.RateLimit = 50 * time.Millisecond - taskScheduler.RunScheduler(context.Background()) + taskScheduler.RunScheduler() rootDir := t.TempDir() diff --git a/pkg/api/controller.go b/pkg/api/controller.go index 358feaa441..96c2dd74e2 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -94,12 +94,12 @@ func (c *Controller) GetPort() int { return c.chosenPort } -func (c *Controller) Run(reloadCtx context.Context) error { +func (c *Controller) Run() error { if err := c.initCookieStore(); err != nil { return err } - c.StartBackgroundTasks(reloadCtx) + c.StartBackgroundTasks() // setup HTTP API router engine := mux.NewRouter() @@ -216,7 +216,7 @@ func (c *Controller) Run(reloadCtx context.Context) error { return server.Serve(listener) } -func (c *Controller) Init(reloadCtx context.Context) error { +func (c *Controller) Init() error { // print the current configuration, but strip secrets c.Log.Info().Interface("params", c.Config.Sanitize()).Msg("configuration settings") @@ -237,7 +237,7 @@ func (c *Controller) Init(reloadCtx context.Context) error { return err } - if err := c.InitMetaDB(reloadCtx); err != nil { + if err := c.InitMetaDB(); err != nil { return err } @@ -280,7 +280,7 @@ func (c *Controller) initCookieStore() error { return nil } -func (c *Controller) InitMetaDB(reloadCtx context.Context) error { +func (c *Controller) InitMetaDB() error { // init metaDB if search is enabled or we need to store user profiles, api keys or signatures if c.Config.IsSearchEnabled() || c.Config.IsBasicAuthnEnabled() || c.Config.IsImageTrustEnabled() || c.Config.IsRetentionEnabled() { @@ -310,7 +310,7 @@ func (c *Controller) InitMetaDB(reloadCtx context.Context) error { return nil } -func (c *Controller) LoadNewConfig(reloadCtx context.Context, newConfig *config.Config) { +func (c *Controller) LoadNewConfig(newConfig *config.Config) { // reload access control config c.Config.HTTP.AccessControl = newConfig.HTTP.AccessControl @@ -364,21 +364,24 @@ func (c *Controller) LoadNewConfig(reloadCtx context.Context, newConfig *config. c.InitCVEInfo() - c.StartBackgroundTasks(reloadCtx) - c.Log.Info().Interface("reloaded params", c.Config.Sanitize()). Msg("loaded new configuration settings") } func (c *Controller) Shutdown() { - c.taskScheduler.Shutdown() + c.StopBackgroundTasks() ctx := context.Background() _ = c.Server.Shutdown(ctx) } -func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) { +// Will stop scheduler and wait for all tasks to finish their work. +func (c *Controller) StopBackgroundTasks() { + c.taskScheduler.Shutdown() +} + +func (c *Controller) StartBackgroundTasks() { c.taskScheduler = scheduler.NewScheduler(c.Config, c.Metrics, c.Log) - c.taskScheduler.RunScheduler(reloadCtx) + c.taskScheduler.RunScheduler() // Enable running garbage-collect periodically for DefaultStore if c.Config.Storage.GC { diff --git a/pkg/api/controller_test.go b/pkg/api/controller_test.go index a5a849ae32..bb1c2cfcbf 100644 --- a/pkg/api/controller_test.go +++ b/pkg/api/controller_test.go @@ -300,10 +300,10 @@ func TestRunAlreadyRunningServer(t *testing.T) { cm.StartAndWait(port) defer cm.StopServer() - err := ctlr.Init(context.Background()) + err := ctlr.Init() So(err, ShouldNotBeNil) - err = ctlr.Run(context.Background()) + err = ctlr.Run() So(err, ShouldNotBeNil) }) } @@ -377,7 +377,7 @@ func TestObjectStorageController(t *testing.T) { ctlr := makeController(conf, tmp) So(ctlr, ShouldNotBeNil) - err := ctlr.Init(context.Background()) + err := ctlr.Init() So(err, ShouldNotBeNil) }) @@ -1218,7 +1218,7 @@ func TestMultipleInstance(t *testing.T) { } ctlr := api.NewController(conf) ctlr.Log.Info().Int64("seedUser", seedUser).Int64("seedPass", seedPass).Msg("random seed for username & password") - err := ctlr.Init(context.Background()) + err := ctlr.Init() So(err, ShouldEqual, errors.ErrImgStoreNotFound) globalDir := t.TempDir() @@ -1311,7 +1311,7 @@ func TestMultipleInstance(t *testing.T) { ctlr.Config.Storage.SubPaths = subPathMap - err := ctlr.Init(context.Background()) + err := ctlr.Init() So(err, ShouldNotBeNil) // subpath root directory does not exist. @@ -1320,7 +1320,7 @@ func TestMultipleInstance(t *testing.T) { ctlr.Config.Storage.SubPaths = subPathMap - err = ctlr.Init(context.Background()) + err = ctlr.Init() So(err, ShouldNotBeNil) subPathMap["/a"] = config.StorageConfig{RootDirectory: subDir, Dedupe: true, GC: true} @@ -1328,7 +1328,7 @@ func TestMultipleInstance(t *testing.T) { ctlr.Config.Storage.SubPaths = subPathMap - err = ctlr.Init(context.Background()) + err = ctlr.Init() So(err, ShouldNotBeNil) }) } @@ -1826,12 +1826,12 @@ func TestTSLFailedReadingOfCACert(t *testing.T) { defer cancel() ctlr := makeController(conf, t.TempDir()) - err = ctlr.Init(ctx) + err = ctlr.Init() So(err, ShouldBeNil) errChan := make(chan error, 1) go func() { - err = ctlr.Run(ctx) + err = ctlr.Run() errChan <- err }() @@ -1866,12 +1866,12 @@ func TestTSLFailedReadingOfCACert(t *testing.T) { defer cancel() ctlr := makeController(conf, t.TempDir()) - err = ctlr.Init(ctx) + err = ctlr.Init() So(err, ShouldBeNil) errChan := make(chan error, 1) go func() { - err = ctlr.Run(ctx) + err = ctlr.Run() errChan <- err }() diff --git a/pkg/cli/client/cve_cmd_test.go b/pkg/cli/client/cve_cmd_test.go index 43b08c3e51..6fb0714b2f 100644 --- a/pkg/cli/client/cve_cmd_test.go +++ b/pkg/cli/client/cve_cmd_test.go @@ -163,16 +163,14 @@ func TestNegativeServerResponse(t *testing.T) { ctlr := api.NewController(conf) ctlr.Log.Logger = ctlr.Log.Output(writers) - ctx := context.Background() - - if err := ctlr.Init(ctx); err != nil { + if err := ctlr.Init(); err != nil { panic(err) } ctlr.CveScanner = getMockCveScanner(ctlr.MetaDB) go func() { - if err := ctlr.Run(ctx); !errors.Is(err, http.ErrServerClosed) { + if err := ctlr.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } }() @@ -238,16 +236,14 @@ func TestServerCVEResponse(t *testing.T) { ctlr := api.NewController(conf) ctlr.Log.Logger = ctlr.Log.Output(writers) - ctx := context.Background() - - if err := ctlr.Init(ctx); err != nil { + if err := ctlr.Init(); err != nil { panic(err) } ctlr.CveScanner = getMockCveScanner(ctlr.MetaDB) go func() { - if err := ctlr.Run(ctx); !errors.Is(err, http.ErrServerClosed) { + if err := ctlr.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } }() @@ -576,9 +572,7 @@ func TestCVESort(t *testing.T) { t.FailNow() } - ctx := context.Background() - - if err := ctlr.Init(ctx); err != nil { + if err := ctlr.Init(); err != nil { panic(err) } @@ -615,7 +609,7 @@ func TestCVESort(t *testing.T) { } go func() { - if err := ctlr.Run(ctx); !errors.Is(err, http.ErrServerClosed) { + if err := ctlr.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } }() diff --git a/pkg/cli/client/image_cmd_test.go b/pkg/cli/client/image_cmd_test.go index 307d6c548f..0df92c7104 100644 --- a/pkg/cli/client/image_cmd_test.go +++ b/pkg/cli/client/image_cmd_test.go @@ -866,7 +866,7 @@ func TestServerResponseGQLWithoutPermissions(t *testing.T) { } ctlr := api.NewController(conf) - if err := ctlr.Init(context.Background()); err != nil { + if err := ctlr.Init(); err != nil { So(err, ShouldNotBeNil) } }) diff --git a/pkg/cli/server/config_reloader.go b/pkg/cli/server/config_reloader.go index 2c90499ea1..bfd42124db 100644 --- a/pkg/cli/server/config_reloader.go +++ b/pkg/cli/server/config_reloader.go @@ -1,7 +1,6 @@ package server import ( - "context" "os" "os/signal" "syscall" @@ -35,28 +34,22 @@ func NewHotReloader(ctlr *api.Controller, filePath string) (*HotReloader, error) return hotReloader, nil } -func signalHandler(ctlr *api.Controller, sigCh chan os.Signal, ctx context.Context, cancel context.CancelFunc) { - select { +func signalHandler(ctlr *api.Controller, sigCh chan os.Signal) { // if signal then shutdown - case sig := <-sigCh: - defer cancel() - + if sig, ok := <-sigCh; ok { ctlr.Log.Info().Interface("signal", sig).Msg("received signal") // gracefully shutdown http server ctlr.Shutdown() //nolint: contextcheck close(sigCh) - // if reload then return - case <-ctx.Done(): - return } } -func initShutDownRoutine(ctlr *api.Controller, ctx context.Context, cancel context.CancelFunc) { +func initShutDownRoutine(ctlr *api.Controller) { sigCh := make(chan os.Signal, 1) - go signalHandler(ctlr, sigCh, ctx, cancel) + go signalHandler(ctlr, sigCh) // block all async signals to this server signal.Ignore() @@ -65,12 +58,10 @@ func initShutDownRoutine(ctlr *api.Controller, ctx context.Context, cancel conte signal.Notify(sigCh, syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP) } -func (hr *HotReloader) Start() context.Context { - reloadCtx, cancelFunc := context.WithCancel(context.Background()) - +func (hr *HotReloader) Start() { done := make(chan bool) - initShutDownRoutine(hr.ctlr, reloadCtx, cancelFunc) + initShutDownRoutine(hr.ctlr) // run watcher go func() { @@ -92,16 +83,15 @@ func (hr *HotReloader) Start() context.Context { continue } - // if valid config then reload - cancelFunc() - // create new context - reloadCtx, cancelFunc = context.WithCancel(context.Background()) + // stop background tasks gracefully + hr.ctlr.StopBackgroundTasks() - // init shutdown routine - initShutDownRoutine(hr.ctlr, reloadCtx, cancelFunc) + // load new config + hr.ctlr.LoadNewConfig(newConfig) - hr.ctlr.LoadNewConfig(reloadCtx, newConfig) + // start background tasks based on new loaded config + hr.ctlr.StartBackgroundTasks() } // watch for errors case err := <-hr.watcher.Errors: @@ -116,6 +106,4 @@ func (hr *HotReloader) Start() context.Context { <-done }() - - return reloadCtx } diff --git a/pkg/cli/server/root.go b/pkg/cli/server/root.go index a73ef3573e..58b65a02d9 100644 --- a/pkg/cli/server/root.go +++ b/pkg/cli/server/root.go @@ -61,17 +61,15 @@ func newServeCmd(conf *config.Config) *cobra.Command { return err } - /* context used to cancel go routines so that - we can change their config on the fly (restart routines with different config) */ - reloaderCtx := hotReloader.Start() + hotReloader.Start() - if err := ctlr.Init(reloaderCtx); err != nil { + if err := ctlr.Init(); err != nil { ctlr.Log.Error().Err(err).Msg("failed to init controller") return err } - if err := ctlr.Run(reloaderCtx); err != nil { + if err := ctlr.Run(); err != nil { log.Error().Err(err).Msg("unable to start controller, exiting") } diff --git a/pkg/compliance/v1_0_0/check_test.go b/pkg/compliance/v1_0_0/check_test.go index 3b104f3a35..1ed06f74e5 100644 --- a/pkg/compliance/v1_0_0/check_test.go +++ b/pkg/compliance/v1_0_0/check_test.go @@ -81,12 +81,12 @@ func startServer(t *testing.T) (*api.Controller, string) { ctrl.Config.Storage.SubPaths = subPaths go func() { - if err := ctrl.Init(context.Background()); err != nil { + if err := ctrl.Init(); err != nil { return } // this blocks - if err := ctrl.Run(context.Background()); err != nil { + if err := ctrl.Run(); err != nil { return } }() diff --git a/pkg/exporter/api/controller_test.go b/pkg/exporter/api/controller_test.go index 387470b81f..f9bd085218 100644 --- a/pkg/exporter/api/controller_test.go +++ b/pkg/exporter/api/controller_test.go @@ -140,12 +140,12 @@ func TestNewExporter(t *testing.T) { dir := t.TempDir() serverController.Config.Storage.RootDirectory = dir go func(ctrl *zotapi.Controller) { - if err := ctrl.Init(context.Background()); err != nil { + if err := ctrl.Init(); err != nil { panic(err) } // this blocks - if err := ctrl.Run(context.Background()); !errors.Is(err, http.ErrServerClosed) { + if err := ctrl.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } }(serverController) diff --git a/pkg/extensions/monitoring/monitoring_test.go b/pkg/extensions/monitoring/monitoring_test.go index 1a9a4ced36..eb6cd584a8 100644 --- a/pkg/extensions/monitoring/monitoring_test.go +++ b/pkg/extensions/monitoring/monitoring_test.go @@ -4,7 +4,6 @@ package monitoring_test import ( - "context" "fmt" "io" "math/rand" @@ -463,8 +462,7 @@ func TestPopulateStorageMetrics(t *testing.T) { metrics := monitoring.NewMetricsServer(true, ctlr.Log) sch := scheduler.NewScheduler(conf, metrics, ctlr.Log) - ctx, cancel := context.WithCancel(context.Background()) - sch.RunScheduler(ctx) + sch.RunScheduler() generator := &common.StorageMetricsInitGenerator{ ImgStore: ctlr.StoreController.DefaultStore, @@ -485,7 +483,7 @@ func TestPopulateStorageMetrics(t *testing.T) { So(err, ShouldBeNil) So(found, ShouldBeTrue) - cancel() + sch.Shutdown() alpineSize, err := monitoring.GetDirSize(path.Join(rootDir, "alpine")) So(err, ShouldBeNil) busyboxSize, err := monitoring.GetDirSize(path.Join(rootDir, "busybox")) diff --git a/pkg/extensions/search/cve/scan_test.go b/pkg/extensions/search/cve/scan_test.go index f9aa8ed7ff..c6638b20e9 100644 --- a/pkg/extensions/search/cve/scan_test.go +++ b/pkg/extensions/search/cve/scan_test.go @@ -432,11 +432,9 @@ func TestScanGeneratorWithMockedData(t *testing.T) { //nolint: gocyclo sch.SubmitGenerator(generator, 10*time.Second, scheduler.MediumPriority) - ctx, cancel := context.WithCancel(context.Background()) + sch.RunScheduler() - sch.RunScheduler(ctx) - - defer cancel() + defer sch.Shutdown() // Make sure the scanner generator has completed despite errors found, err := test.ReadLogFileAndSearchString(logPath, @@ -529,11 +527,9 @@ func TestScanGeneratorWithRealData(t *testing.T) { // Start the generator sch.SubmitGenerator(generator, 120*time.Second, scheduler.MediumPriority) - ctx, cancel := context.WithCancel(context.Background()) - - sch.RunScheduler(ctx) + sch.RunScheduler() - defer cancel() + defer sch.Shutdown() // Make sure the scanner generator has completed found, err := test.ReadLogFileAndSearchString(logPath, diff --git a/pkg/extensions/search/cve/update_test.go b/pkg/extensions/search/cve/update_test.go index 47f9d1f3b2..ca52c6ee5e 100644 --- a/pkg/extensions/search/cve/update_test.go +++ b/pkg/extensions/search/cve/update_test.go @@ -63,11 +63,9 @@ func TestCVEDBGenerator(t *testing.T) { sch.SubmitGenerator(generator, 12000*time.Millisecond, scheduler.HighPriority) - ctx, cancel := context.WithCancel(context.Background()) + sch.RunScheduler() - sch.RunScheduler(ctx) - - defer cancel() + defer sch.Shutdown() // Wait for trivy db to download found, err := test.ReadLogFileAndCountStringOccurence(logPath, diff --git a/pkg/extensions/search/search_test.go b/pkg/extensions/search/search_test.go index e0101c8ed9..d297480aa0 100644 --- a/pkg/extensions/search/search_test.go +++ b/pkg/extensions/search/search_test.go @@ -681,16 +681,14 @@ func TestRepoListWithNewestImage(t *testing.T) { ctlr := api.NewController(conf) ctlr.Log.Logger = ctlr.Log.Output(writers) - ctx := context.Background() - - if err := ctlr.Init(ctx); err != nil { + if err := ctlr.Init(); err != nil { panic(err) } ctlr.CveScanner = getMockCveScanner(ctlr.MetaDB) go func() { - if err := ctlr.Run(ctx); !errors.Is(err, http.ErrServerClosed) { + if err := ctlr.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } }() @@ -3372,16 +3370,14 @@ func TestGlobalSearch(t *testing.T) { ctlr := api.NewController(conf) ctlr.Log.Logger = ctlr.Log.Output(writers) - ctx := context.Background() - - if err := ctlr.Init(ctx); err != nil { + if err := ctlr.Init(); err != nil { panic(err) } ctlr.CveScanner = getMockCveScanner(ctlr.MetaDB) go func() { - if err := ctlr.Run(ctx); !errors.Is(err, http.ErrServerClosed) { + if err := ctlr.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } }() @@ -6203,16 +6199,14 @@ func TestImageSummary(t *testing.T) { configDigest := godigest.FromBytes(configBlob) So(errConfig, ShouldBeNil) // marshall success, config is valid JSON - ctx := context.Background() - - if err := ctlr.Init(ctx); err != nil { + if err := ctlr.Init(); err != nil { panic(err) } ctlr.CveScanner = getMockCveScanner(ctlr.MetaDB) go func() { - if err := ctlr.Run(ctx); !errors.Is(err, http.ErrServerClosed) { + if err := ctlr.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } }() diff --git a/pkg/extensions/sync/sync_test.go b/pkg/extensions/sync/sync_test.go index 9ee9154ff6..4eec8f90bd 100644 --- a/pkg/extensions/sync/sync_test.go +++ b/pkg/extensions/sync/sync_test.go @@ -1901,15 +1901,15 @@ func TestConfigReloader(t *testing.T) { hotReloader, err := cli.NewHotReloader(dctlr, cfgfile.Name()) So(err, ShouldBeNil) - reloadCtx := hotReloader.Start() + hotReloader.Start() go func() { // this blocks - if err := dctlr.Init(reloadCtx); err != nil { + if err := dctlr.Init(); err != nil { return } - if err := dctlr.Run(reloadCtx); err != nil { + if err := dctlr.Run(); err != nil { return } }() @@ -2051,15 +2051,15 @@ func TestConfigReloader(t *testing.T) { hotReloader, err := cli.NewHotReloader(dctlr, cfgfile.Name()) So(err, ShouldBeNil) - reloadCtx := hotReloader.Start() + hotReloader.Start() go func() { // this blocks - if err := dctlr.Init(reloadCtx); err != nil { + if err := dctlr.Init(); err != nil { return } - if err := dctlr.Run(reloadCtx); err != nil { + if err := dctlr.Run(); err != nil { return } }() diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 9d71460f53..5c43ba4277 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -83,12 +83,7 @@ type Scheduler struct { workerWg *sync.WaitGroup isShuttingDown atomic.Bool metricServer monitoring.MetricServer - - // ensure the scheduler can only be stopped once - stop sync.Once - - // close to signal the workers to stop working - quit chan struct{} + cancelFunc context.CancelFunc } func NewScheduler(cfg *config.Config, ms monitoring.MetricServer, logC log.Logger) *Scheduler { //nolint: varnamelen @@ -119,8 +114,6 @@ func NewScheduler(cfg *config.Config, ms monitoring.MetricServer, logC log.Logge workerChan: make(chan Task, numWorkers), metricsChan: make(chan struct{}, 1), workerWg: new(sync.WaitGroup), - stop: sync.Once{}, - quit: make(chan struct{}), } } @@ -219,11 +212,8 @@ func (scheduler *Scheduler) metricsWorker() { } /* -Scheduler can be stopped either by stopping the context provided in scheduler.RunScheduler(ctx context.Context) - - or by calling this function or both. - -Shutdown() will wait for all tasks being run to finish their work before exiting. +Scheduler can be stopped by calling Shutdown(). +it will wait for all tasks being run to finish their work before exiting. */ func (scheduler *Scheduler) Shutdown() { defer scheduler.workerWg.Wait() @@ -238,12 +228,10 @@ func (scheduler *Scheduler) inShutdown() bool { } func (scheduler *Scheduler) shutdown() { - scheduler.stop.Do(func() { - scheduler.isShuttingDown.Store(true) + scheduler.isShuttingDown.Store(true) - close(scheduler.metricsChan) - close(scheduler.quit) - }) + scheduler.cancelFunc() + close(scheduler.metricsChan) } /* @@ -251,7 +239,10 @@ func (scheduler *Scheduler) shutdown() { canceling the context will stop scheduler and also it will notify tasks to finish their work gracefully. */ -func (scheduler *Scheduler) RunScheduler(ctx context.Context) { +func (scheduler *Scheduler) RunScheduler() { + ctx, cancel := context.WithCancel(context.Background()) + scheduler.cancelFunc = cancel + throttle := time.NewTicker(rateLimit).C numWorkers := scheduler.NumWorkers @@ -271,7 +262,6 @@ func (scheduler *Scheduler) RunScheduler(ctx context.Context) { for { select { - // can be stopped either by closing from outside the ctx given to RunScheduler() case <-ctx.Done(): if !scheduler.inShutdown() { scheduler.shutdown() @@ -279,11 +269,6 @@ func (scheduler *Scheduler) RunScheduler(ctx context.Context) { scheduler.log.Debug().Msg("scheduler: received stop signal, gracefully shutting down...") - return - // or by running scheduler.Shutdown() - case <-scheduler.quit: - scheduler.log.Debug().Msg("scheduler: received stop signal, gracefully shutting down...") - return default: // we don't want to block on sending task in workerChan. diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 78e391b088..984571e7e1 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -132,11 +132,9 @@ func TestScheduler(t *testing.T) { // interval has to be higher than throttle value to simulate sch.SubmitGenerator(genH, 6*time.Second, scheduler.HighPriority) - ctx, cancel := context.WithCancel(context.Background()) - sch.RunScheduler(ctx) - + sch.RunScheduler() time.Sleep(7 * time.Second) - cancel() + sch.Shutdown() data, err := os.ReadFile(logFile.Name()) So(err, ShouldBeNil) @@ -164,12 +162,9 @@ func TestScheduler(t *testing.T) { genH := &generator{log: logger, priority: "high priority"} sch.SubmitGenerator(genH, time.Duration(0), scheduler.HighPriority) - ctx, cancel := context.WithCancel(context.Background()) - - sch.RunScheduler(ctx) - + sch.RunScheduler() time.Sleep(4 * time.Second) - cancel() + sch.Shutdown() data, err := os.ReadFile(logFile.Name()) So(err, ShouldBeNil) @@ -193,11 +188,9 @@ func TestScheduler(t *testing.T) { t := &task{log: logger, msg: "", err: true} sch.SubmitTask(t, scheduler.MediumPriority) - ctx, cancel := context.WithCancel(context.Background()) - sch.RunScheduler(ctx) - + sch.RunScheduler() time.Sleep(500 * time.Millisecond) - cancel() + sch.Shutdown() data, err := os.ReadFile(logFile.Name()) So(err, ShouldBeNil) @@ -218,11 +211,9 @@ func TestScheduler(t *testing.T) { genL := &generator{log: logger, priority: "low priority"} sch.SubmitGenerator(genL, 20*time.Millisecond, scheduler.LowPriority) - ctx, cancel := context.WithCancel(context.Background()) - sch.RunScheduler(ctx) - + sch.RunScheduler() time.Sleep(4 * time.Second) - cancel() + sch.Shutdown() data, err := os.ReadFile(logFile.Name()) So(err, ShouldBeNil) @@ -258,10 +249,8 @@ func TestScheduler(t *testing.T) { metrics := monitoring.NewMetricsServer(true, logger) sch := scheduler.NewScheduler(config.New(), metrics, logger) - ctx, cancel := context.WithCancel(context.Background()) - - sch.RunScheduler(ctx) - cancel() + sch.RunScheduler() + sch.Shutdown() time.Sleep(500 * time.Millisecond) t := &task{log: logger, msg: "", err: false} @@ -272,7 +261,7 @@ func TestScheduler(t *testing.T) { So(string(data), ShouldNotContainSubstring, "scheduler: adding a new task") }) - Convey("Test stopping scheduler by cancelling the context", t, func() { + Convey("Test stopping scheduler by calling Shutdown()", t, func() { logFile, err := os.CreateTemp("", "zot-log*.txt") So(err, ShouldBeNil) @@ -285,13 +274,9 @@ func TestScheduler(t *testing.T) { genL := &generator{log: logger, priority: "medium priority"} sch.SubmitGenerator(genL, 20*time.Millisecond, scheduler.MediumPriority) - ctx, cancel := context.WithCancel(context.Background()) - sch.RunScheduler(ctx) - - time.Sleep(1 * time.Second) - cancel() - + sch.RunScheduler() time.Sleep(4 * time.Second) + sch.Shutdown() data, err := os.ReadFile(logFile.Name()) So(err, ShouldBeNil) @@ -300,73 +285,6 @@ func TestScheduler(t *testing.T) { So(string(data), ShouldContainSubstring, "scheduler: received stop signal, gracefully shutting down...") }) - Convey("Test stopping scheduler by calling scheduler.Shutdown()", t, func() { - logFile, err := os.CreateTemp("", "zot-log*.txt") - So(err, ShouldBeNil) - - defer os.Remove(logFile.Name()) // clean up - - logger := log.NewLogger("debug", logFile.Name()) - metrics := monitoring.NewMetricsServer(true, logger) - sch := scheduler.NewScheduler(config.New(), metrics, logger) - - genL := &generator{log: logger, priority: "high priority"} - sch.SubmitGenerator(genL, 20*time.Millisecond, scheduler.HighPriority) - - sch.RunScheduler(context.Background()) - - time.Sleep(4 * time.Second) - - sch.Shutdown() - - data, err := os.ReadFile(logFile.Name()) - t.Log(string(data)) - So(err, ShouldBeNil) - So(string(data), ShouldContainSubstring, "executing high priority task; index: 1") - So(string(data), ShouldContainSubstring, "executing high priority task; index: 2") - So(string(data), ShouldContainSubstring, "scheduler: received stop signal, gracefully shutting down...") - }) - - Convey("Test stopping scheduler by both calling scheduler.Shutdown() and cancelling context", t, func() { - logFile, err := os.CreateTemp("", "zot-log*.txt") - So(err, ShouldBeNil) - - defer os.Remove(logFile.Name()) // clean up - - logger := log.NewLogger("debug", logFile.Name()) - metrics := monitoring.NewMetricsServer(true, logger) - sch := scheduler.NewScheduler(config.New(), metrics, logger) - - genL := &generator{log: logger, priority: "high priority"} - sch.SubmitGenerator(genL, 1*time.Millisecond, scheduler.HighPriority) - - ctx, cancel := context.WithCancel(context.Background()) - sch.RunScheduler(ctx) - - time.Sleep(4 * time.Second) - - go func() { - cancel() - sch.Shutdown() - }() - - go func() { - sch.Shutdown() - }() - - // will wait for scheduler to finish all tasks - sch.Shutdown() - - sch.Shutdown() - - data, err := os.ReadFile(logFile.Name()) - t.Log(string(data)) - So(err, ShouldBeNil) - So(string(data), ShouldContainSubstring, "executing high priority task; index: 1") - So(string(data), ShouldContainSubstring, "executing high priority task; index: 2") - So(string(data), ShouldContainSubstring, "scheduler: received stop signal, gracefully shutting down...") - }) - Convey("Test scheduler Priority.String() method", t, func() { var p scheduler.Priority //nolint: varnamelen // test invalid priority diff --git a/pkg/storage/local/local_test.go b/pkg/storage/local/local_test.go index 2b81ce9705..4efc5f6745 100644 --- a/pkg/storage/local/local_test.go +++ b/pkg/storage/local/local_test.go @@ -62,16 +62,15 @@ var DeleteReferrers = config.ImageRetention{ //nolint: gochecknoglobals var errCache = errors.New("new cache error") -func runAndGetScheduler() (*scheduler.Scheduler, context.CancelFunc) { +func runAndGetScheduler() *scheduler.Scheduler { log := zlog.Logger{} metrics := monitoring.NewMetricsServer(true, log) taskScheduler := scheduler.NewScheduler(config.New(), metrics, log) taskScheduler.RateLimit = 50 * time.Millisecond - ctx, cancel := context.WithCancel(context.Background()) - taskScheduler.RunScheduler(ctx) + taskScheduler.RunScheduler() - return taskScheduler, cancel + return taskScheduler } func TestStorageFSAPIs(t *testing.T) { @@ -1195,14 +1194,14 @@ func TestDedupeLinks(t *testing.T) { // run on empty image store // switch dedupe to true from false - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() // rebuild with dedupe true imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) // wait until rebuild finishes time.Sleep(1 * time.Second) - cancel() + taskScheduler.Shutdown() // manifest1 upload, err := imgStore.NewBlobUpload("dedupe1") @@ -1367,7 +1366,7 @@ func TestDedupeLinks(t *testing.T) { Convey("Intrerrupt rebuilding and restart, checking idempotency", func() { for i := 0; i < 10; i++ { - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() // rebuild with dedupe true imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, cacheDriver) @@ -1375,10 +1374,10 @@ func TestDedupeLinks(t *testing.T) { sleepValue := i * 5 time.Sleep(time.Duration(sleepValue) * time.Millisecond) - cancel() + taskScheduler.Shutdown() } - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() // rebuild with dedupe true imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, cacheDriver) @@ -1387,7 +1386,7 @@ func TestDedupeLinks(t *testing.T) { // wait until rebuild finishes time.Sleep(10 * time.Second) - cancel() + taskScheduler.Shutdown() fi1, err := os.Stat(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1)) So(err, ShouldBeNil) @@ -1398,7 +1397,7 @@ func TestDedupeLinks(t *testing.T) { Convey("rebuild dedupe index error cache nil", func() { // switch dedupe to true from false - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, nil) @@ -1408,7 +1407,7 @@ func TestDedupeLinks(t *testing.T) { time.Sleep(3 * time.Second) - cancel() + taskScheduler.Shutdown() fi1, err := os.Stat(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1)) So(err, ShouldBeNil) @@ -1420,7 +1419,7 @@ func TestDedupeLinks(t *testing.T) { Convey("rebuild dedupe index cache error on original blob", func() { // switch dedupe to true from false - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, &mocks.CacheMock{ HasBlobFn: func(digest godigest.Digest, path string) bool { @@ -1436,7 +1435,7 @@ func TestDedupeLinks(t *testing.T) { time.Sleep(10 * time.Second) - cancel() + taskScheduler.Shutdown() fi1, err := os.Stat(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1)) So(err, ShouldBeNil) @@ -1448,7 +1447,7 @@ func TestDedupeLinks(t *testing.T) { Convey("rebuild dedupe index cache error on duplicate blob", func() { // switch dedupe to true from false - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, &mocks.CacheMock{ HasBlobFn: func(digest godigest.Digest, path string) bool { @@ -1468,7 +1467,7 @@ func TestDedupeLinks(t *testing.T) { time.Sleep(15 * time.Second) - cancel() + taskScheduler.Shutdown() fi1, err := os.Stat(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1)) So(err, ShouldBeNil) diff --git a/pkg/storage/s3/s3_test.go b/pkg/storage/s3/s3_test.go index d9e3b9079a..ecd9ab809b 100644 --- a/pkg/storage/s3/s3_test.go +++ b/pkg/storage/s3/s3_test.go @@ -186,16 +186,15 @@ func createObjectsStoreDynamo(rootDir string, cacheDir string, dedupe bool, tabl return store, il, err } -func runAndGetScheduler() (*scheduler.Scheduler, context.CancelFunc) { +func runAndGetScheduler() *scheduler.Scheduler { logger := log.Logger{} metrics := monitoring.NewMetricsServer(false, logger) taskScheduler := scheduler.NewScheduler(config.New(), metrics, logger) taskScheduler.RateLimit = 50 * time.Millisecond - ctx, cancel := context.WithCancel(context.Background()) - taskScheduler.RunScheduler(ctx) + taskScheduler.RunScheduler() - return taskScheduler, cancel + return taskScheduler } type FileInfoMock struct { @@ -1587,7 +1586,7 @@ func TestS3Dedupe(t *testing.T) { }) Convey("rebuild s3 dedupe index from true to false", func() { //nolint: dupl - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), false) defer cleanupStorage(storeDriver, testDir) @@ -1598,7 +1597,7 @@ func TestS3Dedupe(t *testing.T) { time.Sleep(10 * time.Second) - cancel() + taskScheduler.Shutdown() fi1, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe1", "blobs", "sha256", blobDigest1.Encoded())) @@ -1615,9 +1614,7 @@ func TestS3Dedupe(t *testing.T) { So(len(blobContent), ShouldEqual, fi1.Size()) Convey("rebuild s3 dedupe index from false to true", func() { - taskScheduler, cancel := runAndGetScheduler() - - defer cancel() + taskScheduler := runAndGetScheduler() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), true) defer cleanupStorage(storeDriver, testDir) @@ -1628,6 +1625,8 @@ func TestS3Dedupe(t *testing.T) { time.Sleep(10 * time.Second) + taskScheduler.Shutdown() + fi2, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", blobDigest2.Encoded())) So(err, ShouldBeNil) @@ -1816,7 +1815,7 @@ func TestS3Dedupe(t *testing.T) { }) Convey("rebuild s3 dedupe index from true to false", func() { //nolint: dupl - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), false) defer cleanupStorage(storeDriver, testDir) @@ -1827,7 +1826,7 @@ func TestS3Dedupe(t *testing.T) { time.Sleep(10 * time.Second) - cancel() + taskScheduler.Shutdown() fi1, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe1", "blobs", "sha256", blobDigest1.Encoded())) @@ -1861,7 +1860,7 @@ func TestS3Dedupe(t *testing.T) { }) Convey("rebuild s3 dedupe index from false to true", func() { - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), true) defer cleanupStorage(storeDriver, testDir) @@ -1872,7 +1871,7 @@ func TestS3Dedupe(t *testing.T) { time.Sleep(10 * time.Second) - cancel() + taskScheduler.Shutdown() fi2, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", blobDigest2.Encoded())) @@ -2055,9 +2054,7 @@ func TestRebuildDedupeIndex(t *testing.T) { taskScheduler := scheduler.NewScheduler(config.New(), metrics, logger) taskScheduler.RateLimit = 1 * time.Millisecond - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - - taskScheduler.RunScheduler(ctx) + taskScheduler.RunScheduler() storeDriver, imgStore, _ = createObjectsStore(testDir, t.TempDir(), false) defer cleanupStorage(storeDriver, testDir) @@ -2067,17 +2064,17 @@ func TestRebuildDedupeIndex(t *testing.T) { sleepValue := i * 5 time.Sleep(time.Duration(sleepValue) * time.Millisecond) - cancel() + taskScheduler.Shutdown() } - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) // wait until rebuild finishes time.Sleep(10 * time.Second) - cancel() + taskScheduler.Shutdown() fi2, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", blobDigest2.Encoded())) @@ -2096,10 +2093,7 @@ func TestRebuildDedupeIndex(t *testing.T) { taskScheduler := scheduler.NewScheduler(config.New(), metrics, logger) taskScheduler.RateLimit = 1 * time.Millisecond - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() - - taskScheduler.RunScheduler(ctx) + taskScheduler.RunScheduler() storeDriver, imgStore, _ = createObjectsStore(testDir, t.TempDir(), true) defer cleanupStorage(storeDriver, testDir) @@ -2110,10 +2104,10 @@ func TestRebuildDedupeIndex(t *testing.T) { sleepValue := i * 5 time.Sleep(time.Duration(sleepValue) * time.Millisecond) - cancel() + taskScheduler.Shutdown() } - taskScheduler, cancel = runAndGetScheduler() + taskScheduler = runAndGetScheduler() // rebuild with dedupe false, should have all blobs with content imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) @@ -2121,7 +2115,7 @@ func TestRebuildDedupeIndex(t *testing.T) { // wait until rebuild finishes time.Sleep(10 * time.Second) - cancel() + taskScheduler.Shutdown() fi2, err = storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", blobDigest2.Encoded())) @@ -2140,18 +2134,18 @@ func TestRebuildDedupeIndex(t *testing.T) { storeDriver, imgStore, _ := createObjectsStore(testDir, tdir, true) defer cleanupStorage(storeDriver, testDir) - taskScheduler, cancel := runAndGetScheduler() - defer cancel() + taskScheduler := runAndGetScheduler() imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) // wait until rebuild finishes time.Sleep(3 * time.Second) + + taskScheduler.Shutdown() }) Convey("Rebuild dedupe index already rebuilt", func() { - taskScheduler, cancel := runAndGetScheduler() - defer cancel() + taskScheduler := runAndGetScheduler() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), true) defer cleanupStorage(storeDriver, testDir) @@ -2160,6 +2154,8 @@ func TestRebuildDedupeIndex(t *testing.T) { // wait until rebuild finishes time.Sleep(5 * time.Second) + + taskScheduler.Shutdown() }) Convey("Trigger Stat error while getting original blob", func() { @@ -2171,13 +2167,14 @@ func TestRebuildDedupeIndex(t *testing.T) { err := storeDriver.PutContent(context.Background(), fi1.Path(), []byte{}) So(err, ShouldBeNil) - taskScheduler, cancel := runAndGetScheduler() - defer cancel() + taskScheduler := runAndGetScheduler() imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) // wait until rebuild finishes time.Sleep(5 * time.Second) + + taskScheduler.Shutdown() }) Convey("Trigger ErrDedupeRebuild while statting original blob", func() { @@ -2185,8 +2182,7 @@ func TestRebuildDedupeIndex(t *testing.T) { err := storeDriver.Delete(context.Background(), fi1.Path()) So(err, ShouldBeNil) - taskScheduler, cancel := runAndGetScheduler() - defer cancel() + taskScheduler := runAndGetScheduler() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), true) defer cleanupStorage(storeDriver, testDir) @@ -2195,6 +2191,8 @@ func TestRebuildDedupeIndex(t *testing.T) { // wait until rebuild finishes time.Sleep(5 * time.Second) + + taskScheduler.Shutdown() }) Convey("Trigger ErrDedupeRebuild when original blob has 0 size", func() { @@ -2202,8 +2200,7 @@ func TestRebuildDedupeIndex(t *testing.T) { err := storeDriver.PutContent(context.Background(), fi1.Path(), []byte{}) So(err, ShouldBeNil) - taskScheduler, cancel := runAndGetScheduler() - defer cancel() + taskScheduler := runAndGetScheduler() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), true) defer cleanupStorage(storeDriver, testDir) @@ -2213,6 +2210,8 @@ func TestRebuildDedupeIndex(t *testing.T) { // wait until rebuild finishes time.Sleep(5 * time.Second) + + taskScheduler.Shutdown() }) Convey("Trigger GetNextDigestWithBlobPaths path not found err", func() { @@ -2224,19 +2223,19 @@ func TestRebuildDedupeIndex(t *testing.T) { err := storeDriver.Delete(context.Background(), imgStore.RootDir()) So(err, ShouldBeNil) - taskScheduler, cancel := runAndGetScheduler() - defer cancel() + taskScheduler := runAndGetScheduler() // rebuild with dedupe false, should have all blobs with content imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) // wait until rebuild finishes time.Sleep(5 * time.Second) + + taskScheduler.Shutdown() }) Convey("Rebuild from true to false", func() { - taskScheduler, cancel := runAndGetScheduler() - defer cancel() + taskScheduler := runAndGetScheduler() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), false) defer cleanupStorage(storeDriver, testDir) @@ -2247,6 +2246,8 @@ func TestRebuildDedupeIndex(t *testing.T) { // wait until rebuild finishes time.Sleep(10 * time.Second) + taskScheduler.Shutdown() + fi2, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", blobDigest2.Encoded())) So(err, ShouldBeNil) diff --git a/pkg/test/common/utils.go b/pkg/test/common/utils.go index 51e17b721b..d2f1d57c04 100644 --- a/pkg/test/common/utils.go +++ b/pkg/test/common/utils.go @@ -1,7 +1,6 @@ package common import ( - "context" "errors" "fmt" "math/rand" @@ -62,44 +61,34 @@ func Location(baseURL string, resp *resty.Response) string { } type Controller interface { - Init(ctx context.Context) error - Run(ctx context.Context) error + Init() error + Run() error Shutdown() GetPort() int } type ControllerManager struct { controller Controller - // used to stop background tasks(goroutines) - cancelRoutinesFunc context.CancelFunc } -func (cm *ControllerManager) RunServer(ctx context.Context) { +func (cm *ControllerManager) RunServer() { // Useful to be able to call in the same goroutine for testing purposes - if err := cm.controller.Run(ctx); !errors.Is(err, http.ErrServerClosed) { + if err := cm.controller.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } } func (cm *ControllerManager) StartServer() { - ctx, cancel := context.WithCancel(context.Background()) - cm.cancelRoutinesFunc = cancel - - if err := cm.controller.Init(ctx); err != nil { + if err := cm.controller.Init(); err != nil { panic(err) } go func() { - cm.RunServer(ctx) + cm.RunServer() }() } func (cm *ControllerManager) StopServer() { - // stop background tasks - if cm.cancelRoutinesFunc != nil { - cm.cancelRoutinesFunc() - } - cm.controller.Shutdown() } diff --git a/pkg/test/common/utils_test.go b/pkg/test/common/utils_test.go index f9e485b85e..9c547d1027 100644 --- a/pkg/test/common/utils_test.go +++ b/pkg/test/common/utils_test.go @@ -1,7 +1,6 @@ package common_test import ( - "context" "os" "path" "testing" @@ -53,11 +52,9 @@ func TestControllerManager(t *testing.T) { ctlr := api.NewController(conf) ctlrManager := tcommon.NewControllerManager(ctlr) - ctx := context.Background() - - err := ctlr.Init(ctx) + err := ctlr.Init() So(err, ShouldBeNil) - So(func() { ctlrManager.RunServer(ctx) }, ShouldPanic) + So(func() { ctlrManager.RunServer() }, ShouldPanic) }) }