Skip to content

Commit

Permalink
refactor(scheduler): refactor into a single shutdown
Browse files Browse the repository at this point in the history
before this we could stop scheduler either by closing the context
provided to RunScheduler(ctx) or by running Shutdown().

simplify things by getting rid of the external context in RunScheduler().
keep an internal context in the scheduler itself and pass it down to all tasks.

Signed-off-by: Petu Eusebiu <[email protected]>
  • Loading branch information
eusebiu-constantin-petu-dbk committed Dec 7, 2023
1 parent efd76c9 commit 7d8179a
Show file tree
Hide file tree
Showing 20 changed files with 160 additions and 302 deletions.
4 changes: 2 additions & 2 deletions pkg/api/authn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -935,7 +935,7 @@ func TestCookiestoreCleanup(t *testing.T) {
Convey("Test cookiestore cleanup works", t, func() {
taskScheduler := scheduler.NewScheduler(config.New(), metrics, log)
taskScheduler.RateLimit = 50 * time.Millisecond
taskScheduler.RunScheduler(context.Background())
taskScheduler.RunScheduler()

rootDir := t.TempDir()

Expand Down Expand Up @@ -973,7 +973,7 @@ func TestCookiestoreCleanup(t *testing.T) {
Convey("Test cookiestore cleanup without permissions on rootDir", t, func() {
taskScheduler := scheduler.NewScheduler(config.New(), metrics, log)
taskScheduler.RateLimit = 50 * time.Millisecond
taskScheduler.RunScheduler(context.Background())
taskScheduler.RunScheduler()

rootDir := t.TempDir()

Expand Down
25 changes: 14 additions & 11 deletions pkg/api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ func (c *Controller) GetPort() int {
return c.chosenPort
}

func (c *Controller) Run(reloadCtx context.Context) error {
func (c *Controller) Run() error {
if err := c.initCookieStore(); err != nil {
return err
}

c.StartBackgroundTasks(reloadCtx)
c.StartBackgroundTasks()

// setup HTTP API router
engine := mux.NewRouter()
Expand Down Expand Up @@ -216,7 +216,7 @@ func (c *Controller) Run(reloadCtx context.Context) error {
return server.Serve(listener)
}

func (c *Controller) Init(reloadCtx context.Context) error {
func (c *Controller) Init() error {
// print the current configuration, but strip secrets
c.Log.Info().Interface("params", c.Config.Sanitize()).Msg("configuration settings")

Expand All @@ -237,7 +237,7 @@ func (c *Controller) Init(reloadCtx context.Context) error {
return err
}

if err := c.InitMetaDB(reloadCtx); err != nil {
if err := c.InitMetaDB(); err != nil {
return err
}

Expand Down Expand Up @@ -280,7 +280,7 @@ func (c *Controller) initCookieStore() error {
return nil
}

func (c *Controller) InitMetaDB(reloadCtx context.Context) error {
func (c *Controller) InitMetaDB() error {
// init metaDB if search is enabled or we need to store user profiles, api keys or signatures
if c.Config.IsSearchEnabled() || c.Config.IsBasicAuthnEnabled() || c.Config.IsImageTrustEnabled() ||
c.Config.IsRetentionEnabled() {
Expand Down Expand Up @@ -310,7 +310,7 @@ func (c *Controller) InitMetaDB(reloadCtx context.Context) error {
return nil
}

func (c *Controller) LoadNewConfig(reloadCtx context.Context, newConfig *config.Config) {
func (c *Controller) LoadNewConfig(newConfig *config.Config) {
// reload access control config
c.Config.HTTP.AccessControl = newConfig.HTTP.AccessControl

Expand Down Expand Up @@ -364,21 +364,24 @@ func (c *Controller) LoadNewConfig(reloadCtx context.Context, newConfig *config.

c.InitCVEInfo()

c.StartBackgroundTasks(reloadCtx)

c.Log.Info().Interface("reloaded params", c.Config.Sanitize()).
Msg("loaded new configuration settings")
}

func (c *Controller) Shutdown() {
c.taskScheduler.Shutdown()
c.StopBackgroundTasks()
ctx := context.Background()
_ = c.Server.Shutdown(ctx)
}

func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
// Will stop scheduler and wait for all tasks to finish their work.
func (c *Controller) StopBackgroundTasks() {
c.taskScheduler.Shutdown()
}

func (c *Controller) StartBackgroundTasks() {
c.taskScheduler = scheduler.NewScheduler(c.Config, c.Metrics, c.Log)
c.taskScheduler.RunScheduler(reloadCtx)
c.taskScheduler.RunScheduler()

// Enable running garbage-collect periodically for DefaultStore
if c.Config.Storage.GC {
Expand Down
22 changes: 11 additions & 11 deletions pkg/api/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,10 +300,10 @@ func TestRunAlreadyRunningServer(t *testing.T) {
cm.StartAndWait(port)
defer cm.StopServer()

err := ctlr.Init(context.Background())
err := ctlr.Init()
So(err, ShouldNotBeNil)

err = ctlr.Run(context.Background())
err = ctlr.Run()
So(err, ShouldNotBeNil)
})
}
Expand Down Expand Up @@ -377,7 +377,7 @@ func TestObjectStorageController(t *testing.T) {
ctlr := makeController(conf, tmp)
So(ctlr, ShouldNotBeNil)

err := ctlr.Init(context.Background())
err := ctlr.Init()
So(err, ShouldNotBeNil)
})

Expand Down Expand Up @@ -1218,7 +1218,7 @@ func TestMultipleInstance(t *testing.T) {
}
ctlr := api.NewController(conf)
ctlr.Log.Info().Int64("seedUser", seedUser).Int64("seedPass", seedPass).Msg("random seed for username & password")
err := ctlr.Init(context.Background())
err := ctlr.Init()
So(err, ShouldEqual, errors.ErrImgStoreNotFound)

globalDir := t.TempDir()
Expand Down Expand Up @@ -1311,7 +1311,7 @@ func TestMultipleInstance(t *testing.T) {

ctlr.Config.Storage.SubPaths = subPathMap

err := ctlr.Init(context.Background())
err := ctlr.Init()
So(err, ShouldNotBeNil)

// subpath root directory does not exist.
Expand All @@ -1320,15 +1320,15 @@ func TestMultipleInstance(t *testing.T) {

ctlr.Config.Storage.SubPaths = subPathMap

err = ctlr.Init(context.Background())
err = ctlr.Init()
So(err, ShouldNotBeNil)

subPathMap["/a"] = config.StorageConfig{RootDirectory: subDir, Dedupe: true, GC: true}
subPathMap["/b"] = config.StorageConfig{RootDirectory: subDir, Dedupe: true, GC: true}

ctlr.Config.Storage.SubPaths = subPathMap

err = ctlr.Init(context.Background())
err = ctlr.Init()
So(err, ShouldNotBeNil)
})
}
Expand Down Expand Up @@ -1826,12 +1826,12 @@ func TestTSLFailedReadingOfCACert(t *testing.T) {
defer cancel()
ctlr := makeController(conf, t.TempDir())

err = ctlr.Init(ctx)
err = ctlr.Init()
So(err, ShouldBeNil)

errChan := make(chan error, 1)
go func() {
err = ctlr.Run(ctx)
err = ctlr.Run()
errChan <- err
}()

Expand Down Expand Up @@ -1866,12 +1866,12 @@ func TestTSLFailedReadingOfCACert(t *testing.T) {
defer cancel()
ctlr := makeController(conf, t.TempDir())

err = ctlr.Init(ctx)
err = ctlr.Init()
So(err, ShouldBeNil)

errChan := make(chan error, 1)
go func() {
err = ctlr.Run(ctx)
err = ctlr.Run()
errChan <- err
}()

Expand Down
18 changes: 6 additions & 12 deletions pkg/cli/client/cve_cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,16 +163,14 @@ func TestNegativeServerResponse(t *testing.T) {
ctlr := api.NewController(conf)
ctlr.Log.Logger = ctlr.Log.Output(writers)

ctx := context.Background()

if err := ctlr.Init(ctx); err != nil {
if err := ctlr.Init(); err != nil {
panic(err)
}

ctlr.CveScanner = getMockCveScanner(ctlr.MetaDB)

go func() {
if err := ctlr.Run(ctx); !errors.Is(err, http.ErrServerClosed) {
if err := ctlr.Run(); !errors.Is(err, http.ErrServerClosed) {
panic(err)
}
}()
Expand Down Expand Up @@ -238,16 +236,14 @@ func TestServerCVEResponse(t *testing.T) {
ctlr := api.NewController(conf)
ctlr.Log.Logger = ctlr.Log.Output(writers)

ctx := context.Background()

if err := ctlr.Init(ctx); err != nil {
if err := ctlr.Init(); err != nil {
panic(err)
}

ctlr.CveScanner = getMockCveScanner(ctlr.MetaDB)

go func() {
if err := ctlr.Run(ctx); !errors.Is(err, http.ErrServerClosed) {
if err := ctlr.Run(); !errors.Is(err, http.ErrServerClosed) {
panic(err)
}
}()
Expand Down Expand Up @@ -576,9 +572,7 @@ func TestCVESort(t *testing.T) {
t.FailNow()
}

ctx := context.Background()

if err := ctlr.Init(ctx); err != nil {
if err := ctlr.Init(); err != nil {
panic(err)
}

Expand Down Expand Up @@ -615,7 +609,7 @@ func TestCVESort(t *testing.T) {
}

go func() {
if err := ctlr.Run(ctx); !errors.Is(err, http.ErrServerClosed) {
if err := ctlr.Run(); !errors.Is(err, http.ErrServerClosed) {
panic(err)
}
}()
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/client/image_cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,7 @@ func TestServerResponseGQLWithoutPermissions(t *testing.T) {
}

ctlr := api.NewController(conf)
if err := ctlr.Init(context.Background()); err != nil {
if err := ctlr.Init(); err != nil {
So(err, ShouldNotBeNil)
}
})
Expand Down
36 changes: 12 additions & 24 deletions pkg/cli/server/config_reloader.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package server

import (
"context"
"os"
"os/signal"
"syscall"
Expand Down Expand Up @@ -35,28 +34,22 @@ func NewHotReloader(ctlr *api.Controller, filePath string) (*HotReloader, error)
return hotReloader, nil
}

func signalHandler(ctlr *api.Controller, sigCh chan os.Signal, ctx context.Context, cancel context.CancelFunc) {
select {
func signalHandler(ctlr *api.Controller, sigCh chan os.Signal) {
// if signal then shutdown
case sig := <-sigCh:
defer cancel()

if sig, ok := <-sigCh; ok {
ctlr.Log.Info().Interface("signal", sig).Msg("received signal")

// gracefully shutdown http server
ctlr.Shutdown() //nolint: contextcheck

close(sigCh)
// if reload then return
case <-ctx.Done():
return
}
}

func initShutDownRoutine(ctlr *api.Controller, ctx context.Context, cancel context.CancelFunc) {
func initShutDownRoutine(ctlr *api.Controller) {
sigCh := make(chan os.Signal, 1)

go signalHandler(ctlr, sigCh, ctx, cancel)
go signalHandler(ctlr, sigCh)

// block all async signals to this server
signal.Ignore()
Expand All @@ -65,12 +58,10 @@ func initShutDownRoutine(ctlr *api.Controller, ctx context.Context, cancel conte
signal.Notify(sigCh, syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP)
}

func (hr *HotReloader) Start() context.Context {
reloadCtx, cancelFunc := context.WithCancel(context.Background())

func (hr *HotReloader) Start() {
done := make(chan bool)

initShutDownRoutine(hr.ctlr, reloadCtx, cancelFunc)
initShutDownRoutine(hr.ctlr)

// run watcher
go func() {
Expand All @@ -92,16 +83,15 @@ func (hr *HotReloader) Start() context.Context {

continue
}
// if valid config then reload
cancelFunc()

// create new context
reloadCtx, cancelFunc = context.WithCancel(context.Background())
// stop background tasks gracefully
hr.ctlr.StopBackgroundTasks()

// init shutdown routine
initShutDownRoutine(hr.ctlr, reloadCtx, cancelFunc)
// load new config
hr.ctlr.LoadNewConfig(newConfig)

hr.ctlr.LoadNewConfig(reloadCtx, newConfig)
// start background tasks based on new loaded config
hr.ctlr.StartBackgroundTasks()
}
// watch for errors
case err := <-hr.watcher.Errors:
Expand All @@ -116,6 +106,4 @@ func (hr *HotReloader) Start() context.Context {

<-done
}()

return reloadCtx
}
8 changes: 3 additions & 5 deletions pkg/cli/server/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,15 @@ func newServeCmd(conf *config.Config) *cobra.Command {
return err
}

/* context used to cancel go routines so that
we can change their config on the fly (restart routines with different config) */
reloaderCtx := hotReloader.Start()
hotReloader.Start()

if err := ctlr.Init(reloaderCtx); err != nil {
if err := ctlr.Init(); err != nil {
ctlr.Log.Error().Err(err).Msg("failed to init controller")

return err
}

if err := ctlr.Run(reloaderCtx); err != nil {
if err := ctlr.Run(); err != nil {
log.Error().Err(err).Msg("unable to start controller, exiting")
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/compliance/v1_0_0/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ func startServer(t *testing.T) (*api.Controller, string) {
ctrl.Config.Storage.SubPaths = subPaths

go func() {
if err := ctrl.Init(context.Background()); err != nil {
if err := ctrl.Init(); err != nil {
return
}

// this blocks
if err := ctrl.Run(context.Background()); err != nil {
if err := ctrl.Run(); err != nil {
return
}
}()
Expand Down
4 changes: 2 additions & 2 deletions pkg/exporter/api/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,12 @@ func TestNewExporter(t *testing.T) {
dir := t.TempDir()
serverController.Config.Storage.RootDirectory = dir
go func(ctrl *zotapi.Controller) {
if err := ctrl.Init(context.Background()); err != nil {
if err := ctrl.Init(); err != nil {
panic(err)
}

// this blocks
if err := ctrl.Run(context.Background()); !errors.Is(err, http.ErrServerClosed) {
if err := ctrl.Run(); !errors.Is(err, http.ErrServerClosed) {
panic(err)
}
}(serverController)
Expand Down
Loading

0 comments on commit 7d8179a

Please sign in to comment.