From ff01475490fd5ff9243bd4e182d206fce84e7f39 Mon Sep 17 00:00:00 2001 From: Petu Eusebiu Date: Wed, 29 Nov 2023 17:38:49 +0200 Subject: [PATCH 1/2] fix(scheduler): data race when pushing new tasks the problem here is that scheduler can be closed in two ways: - canceling the context given as argument to scheduler.RunScheduler() - running scheduler.Shutdown() because of this shutdown can trigger a data race between calling scheduler.inShutdown() and actually pushing tasks into the pool workers solved that by keeping a quit channel and listening on both quit channel and ctx.Done() and closing the worker chan and scheduler afterwards. Signed-off-by: Petu Eusebiu --- Makefile | 2 +- pkg/api/authn_test.go | 88 ++++++++++++ pkg/api/cookiestore.go | 4 +- pkg/extensions/extension_image_trust_test.go | 10 +- pkg/extensions/scrub/scrub_test.go | 18 +-- pkg/extensions/sync/service.go | 2 +- pkg/extensions/sync/sync_internal_test.go | 23 ++++ pkg/extensions/sync/sync_test.go | 133 +++++++++---------- pkg/meta/meta_test.go | 4 +- pkg/scheduler/scheduler.go | 70 +++++++--- pkg/scheduler/scheduler_test.go | 119 ++++++++++++++++- pkg/test/mocks/sync_remote_mock.go | 67 ++++++++++ 12 files changed, 422 insertions(+), 118 deletions(-) create mode 100644 pkg/test/mocks/sync_remote_mock.go diff --git a/Makefile b/Makefile index a77e3357b..fccde17e6 100644 --- a/Makefile +++ b/Makefile @@ -195,7 +195,7 @@ test-prereq: check-skopeo $(TESTDATA) $(ORAS) .PHONY: test-extended test-extended: $(if $(findstring ui,$(BUILD_LABELS)), ui) test-extended: test-prereq - go test -failfast -tags $(BUILD_LABELS),containers_image_openpgp -trimpath -race -timeout 15m -cover -coverpkg ./... -coverprofile=coverage-extended.txt -covermode=atomic ./... + go test -failfast -tags $(BUILD_LABELS),containers_image_openpgp -trimpath -race -timeout 20m -cover -coverpkg ./... -coverprofile=coverage-extended.txt -covermode=atomic ./... rm -rf /tmp/getter*; rm -rf /tmp/trivy* .PHONY: test-minimal diff --git a/pkg/api/authn_test.go b/pkg/api/authn_test.go index e7e89a87e..4890c8a7f 100644 --- a/pkg/api/authn_test.go +++ b/pkg/api/authn_test.go @@ -11,6 +11,7 @@ import ( "net/http" "net/http/httptest" "os" + "path" "testing" "time" @@ -23,9 +24,14 @@ import ( "zotregistry.io/zot/pkg/api/config" "zotregistry.io/zot/pkg/api/constants" extconf "zotregistry.io/zot/pkg/extensions/config" + "zotregistry.io/zot/pkg/extensions/monitoring" "zotregistry.io/zot/pkg/log" mTypes "zotregistry.io/zot/pkg/meta/types" reqCtx "zotregistry.io/zot/pkg/requestcontext" + "zotregistry.io/zot/pkg/scheduler" + "zotregistry.io/zot/pkg/storage" + storageConstants "zotregistry.io/zot/pkg/storage/constants" + "zotregistry.io/zot/pkg/storage/local" authutils "zotregistry.io/zot/pkg/test/auth" test "zotregistry.io/zot/pkg/test/common" "zotregistry.io/zot/pkg/test/mocks" @@ -922,6 +928,88 @@ func TestAPIKeysGeneratorErrors(t *testing.T) { }) } +func TestCookiestoreCleanup(t *testing.T) { + log := log.Logger{} + metrics := monitoring.NewMetricsServer(true, log) + + Convey("Test cookiestore cleanup works", t, func() { + taskScheduler := scheduler.NewScheduler(config.New(), metrics, log) + taskScheduler.RateLimit = 50 * time.Millisecond + taskScheduler.RunScheduler(context.Background()) + + rootDir := t.TempDir() + + err := os.MkdirAll(path.Join(rootDir, "_sessions"), storageConstants.DefaultDirPerms) + So(err, ShouldBeNil) + + sessionPath := path.Join(rootDir, "_sessions", "session_1234") + + err = os.WriteFile(sessionPath, []byte("session"), storageConstants.DefaultFilePerms) + So(err, ShouldBeNil) + + err = os.Chtimes(sessionPath, time.Time{}, time.Time{}) + So(err, ShouldBeNil) + + imgStore := local.NewImageStore(rootDir, false, false, log, metrics, nil, nil) + + storeController := storage.StoreController{ + DefaultStore: imgStore, + } + + cookieStore, err := api.NewCookieStore(storeController) + So(err, ShouldBeNil) + + cookieStore.RunSessionCleaner(taskScheduler) + + time.Sleep(2 * time.Second) + + taskScheduler.Shutdown() + + // make sure session is removed + _, err = os.Stat(sessionPath) + So(err, ShouldNotBeNil) + }) + + Convey("Test cookiestore cleanup without permissions on rootDir", t, func() { + taskScheduler := scheduler.NewScheduler(config.New(), metrics, log) + taskScheduler.RateLimit = 50 * time.Millisecond + taskScheduler.RunScheduler(context.Background()) + + rootDir := t.TempDir() + + err := os.MkdirAll(path.Join(rootDir, "_sessions"), storageConstants.DefaultDirPerms) + So(err, ShouldBeNil) + + sessionPath := path.Join(rootDir, "_sessions", "session_1234") + + err = os.WriteFile(sessionPath, []byte("session"), storageConstants.DefaultFilePerms) + So(err, ShouldBeNil) + + imgStore := local.NewImageStore(rootDir, false, false, log, metrics, nil, nil) + + storeController := storage.StoreController{ + DefaultStore: imgStore, + } + + cookieStore, err := api.NewCookieStore(storeController) + So(err, ShouldBeNil) + + err = os.Chmod(rootDir, 0o000) + So(err, ShouldBeNil) + + defer func() { + err = os.Chmod(rootDir, storageConstants.DefaultDirPerms) + So(err, ShouldBeNil) + }() + + cookieStore.RunSessionCleaner(taskScheduler) + + time.Sleep(1 * time.Second) + + taskScheduler.Shutdown() + }) +} + type mockUUIDGenerator struct { guuid.Generator succeedAttempts int diff --git a/pkg/api/cookiestore.go b/pkg/api/cookiestore.go index d66b971a7..86c109e58 100644 --- a/pkg/api/cookiestore.go +++ b/pkg/api/cookiestore.go @@ -152,7 +152,9 @@ type CleanTask struct { func (cleanTask *CleanTask) DoWork(ctx context.Context) error { for _, session := range cleanTask.sessions { if err := os.Remove(session); err != nil { - return err + if !os.IsNotExist(err) { + return err + } } } diff --git a/pkg/extensions/extension_image_trust_test.go b/pkg/extensions/extension_image_trust_test.go index a0dc6d304..5db9a6068 100644 --- a/pkg/extensions/extension_image_trust_test.go +++ b/pkg/extensions/extension_image_trust_test.go @@ -257,7 +257,7 @@ func RunSignatureUploadAndVerificationTests(t *testing.T, cacheDriverParams map[ []string{fmt.Sprintf("localhost:%s/%s@%s", port, repo, image.DigestStr())}) So(err, ShouldBeNil) - found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 10*time.Second) + found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 30*time.Second) So(err, ShouldBeNil) So(found, ShouldBeTrue) @@ -369,7 +369,7 @@ func RunSignatureUploadAndVerificationTests(t *testing.T, cacheDriverParams map[ err = signature.SignWithNotation(certName, imageURL, rootDir, true) So(err, ShouldBeNil) - found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 10*time.Second) + found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 30*time.Second) So(err, ShouldBeNil) So(found, ShouldBeTrue) @@ -502,7 +502,7 @@ func RunSignatureUploadAndVerificationTests(t *testing.T, cacheDriverParams map[ err = signature.SignWithNotation(certName, imageURL, rootDir, false) So(err, ShouldBeNil) - found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 10*time.Second) + found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 30*time.Second) So(err, ShouldBeNil) So(found, ShouldBeTrue) @@ -672,7 +672,7 @@ func RunSignatureUploadAndVerificationTests(t *testing.T, cacheDriverParams map[ []string{fmt.Sprintf("localhost:%s/%s@%s", port, repo, image.DigestStr())}) So(err, ShouldBeNil) - found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 10*time.Second) + found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 30*time.Second) So(err, ShouldBeNil) So(found, ShouldBeTrue) @@ -883,7 +883,7 @@ func RunSignatureUploadAndVerificationTests(t *testing.T, cacheDriverParams map[ So(err, ShouldBeNil) So(found, ShouldBeTrue) - found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 10*time.Second) + found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 30*time.Second) So(err, ShouldBeNil) So(found, ShouldBeTrue) diff --git a/pkg/extensions/scrub/scrub_test.go b/pkg/extensions/scrub/scrub_test.go index 69af3d521..2939722fe 100644 --- a/pkg/extensions/scrub/scrub_test.go +++ b/pkg/extensions/scrub/scrub_test.go @@ -70,13 +70,11 @@ func TestScrubExtension(t *testing.T) { cm := test.NewControllerManager(ctlr) cm.StartAndWait(port) - time.Sleep(6 * time.Second) - defer cm.StopServer() - data, err := os.ReadFile(logFile.Name()) + found, err := test.ReadLogFileAndSearchString(logFile.Name(), "blobs/manifest ok", 60*time.Second) + So(found, ShouldBeTrue) So(err, ShouldBeNil) - So(string(data), ShouldContainSubstring, "blobs/manifest ok") }) Convey("Blobs integrity affected", t, func(c C) { @@ -122,13 +120,11 @@ func TestScrubExtension(t *testing.T) { cm := test.NewControllerManager(ctlr) cm.StartAndWait(port) - time.Sleep(6 * time.Second) - defer cm.StopServer() - data, err := os.ReadFile(logFile.Name()) + found, err := test.ReadLogFileAndSearchString(logFile.Name(), "blobs/manifest affected", 60*time.Second) + So(found, ShouldBeTrue) So(err, ShouldBeNil) - So(string(data), ShouldContainSubstring, "blobs/manifest affected") }) Convey("Generator error - not enough permissions to access root directory", t, func(c C) { @@ -170,13 +166,11 @@ func TestScrubExtension(t *testing.T) { cm := test.NewControllerManager(ctlr) cm.StartAndWait(port) - time.Sleep(6 * time.Second) - defer cm.StopServer() - data, err := os.ReadFile(logFile.Name()) + found, err := test.ReadLogFileAndSearchString(logFile.Name(), "failed to execute generator", 60*time.Second) + So(found, ShouldBeTrue) So(err, ShouldBeNil) - So(string(data), ShouldContainSubstring, "failed to execute generator") So(os.Chmod(path.Join(dir, repoName), 0o755), ShouldBeNil) }) diff --git a/pkg/extensions/sync/service.go b/pkg/extensions/sync/service.go index afcd48908..2cecf3bbb 100644 --- a/pkg/extensions/sync/service.go +++ b/pkg/extensions/sync/service.go @@ -44,7 +44,7 @@ func New( storeController storage.StoreController, metadb mTypes.MetaDB, log log.Logger, -) (Service, error) { +) (*BaseService, error) { service := &BaseService{} service.config = opts diff --git a/pkg/extensions/sync/sync_internal_test.go b/pkg/extensions/sync/sync_internal_test.go index 50c572ca1..9e985180f 100644 --- a/pkg/extensions/sync/sync_internal_test.go +++ b/pkg/extensions/sync/sync_internal_test.go @@ -170,6 +170,29 @@ func TestService(t *testing.T) { }) } +func TestSyncRepo(t *testing.T) { + Convey("trigger context error", t, func() { + conf := syncconf.RegistryConfig{ + URLs: []string{"http://localhost"}, + } + + service, err := New(conf, "", os.TempDir(), storage.StoreController{}, mocks.MetaDBMock{}, log.Logger{}) + So(err, ShouldBeNil) + + service.remote = mocks.SyncRemote{ + GetRepoTagsFn: func(repo string) ([]string, error) { + return []string{"repo1", "repo2"}, nil + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + err = service.SyncRepo(ctx, "repo") + So(err, ShouldEqual, ctx.Err()) + }) +} + func TestDestinationRegistry(t *testing.T) { Convey("make StoreController", t, func() { dir := t.TempDir() diff --git a/pkg/extensions/sync/sync_test.go b/pkg/extensions/sync/sync_test.go index 6574dea75..0a43e5ae7 100644 --- a/pkg/extensions/sync/sync_test.go +++ b/pkg/extensions/sync/sync_test.go @@ -3938,6 +3938,12 @@ func TestPeriodicallySignaturesErr(t *testing.T) { } } + // start downstream server + updateDuration, err = time.ParseDuration("1s") + So(err, ShouldBeNil) + + syncConfig.Registries[0].PollInterval = updateDuration + // start downstream server dctlr, destBaseURL, _, _ := makeDownstreamServer(t, false, syncConfig) @@ -4030,6 +4036,61 @@ func TestPeriodicallySignaturesErr(t *testing.T) { So(err, ShouldBeNil) So(len(index.Manifests), ShouldEqual, 0) }) + + Convey("of type OCI image, error on downstream in canSkipReference()", func() { //nolint: dupl + // start downstream server + updateDuration, err = time.ParseDuration("1s") + So(err, ShouldBeNil) + + syncConfig.Registries[0].PollInterval = updateDuration + dctlr, _, destDir, _ := makeDownstreamServer(t, false, syncConfig) + + dcm := test.NewControllerManager(dctlr) + dcm.StartAndWait(dctlr.Config.HTTP.Port) + defer dcm.StopServer() + + found, err := test.ReadLogFileAndSearchString(dctlr.Config.Log.Output, + "finished syncing all repos", 15*time.Second) + if err != nil { + panic(err) + } + + if !found { + data, err := os.ReadFile(dctlr.Config.Log.Output) + So(err, ShouldBeNil) + + t.Logf("downstream log: %s", string(data)) + } + + So(found, ShouldBeTrue) + + time.Sleep(time.Second) + + blob := referrers.Manifests[0] + blobsDir := path.Join(destDir, repoName, "blobs", string(blob.Digest.Algorithm())) + blobPath := path.Join(blobsDir, blob.Digest.Encoded()) + err = os.MkdirAll(blobsDir, storageConstants.DefaultDirPerms) + So(err, ShouldBeNil) + err = os.WriteFile(blobPath, []byte("blob"), storageConstants.DefaultFilePerms) + So(err, ShouldBeNil) + err = os.Chmod(blobPath, 0o000) + So(err, ShouldBeNil) + + found, err = test.ReadLogFileAndSearchString(dctlr.Config.Log.Output, + "couldn't check if the upstream oci references for image can be skipped", 30*time.Second) + if err != nil { + panic(err) + } + + if !found { + data, err := os.ReadFile(dctlr.Config.Log.Output) + So(err, ShouldBeNil) + + t.Logf("downstream log: %s", string(data)) + } + + So(found, ShouldBeTrue) + }) }) }) } @@ -5200,78 +5261,6 @@ func TestOnDemandPullsOnce(t *testing.T) { }) } -func TestError(t *testing.T) { - Convey("Verify periodically sync pushSyncedLocalImage() error", t, func() { - updateDuration, _ := time.ParseDuration("30m") - - sctlr, srcBaseURL, _, _, _ := makeUpstreamServer(t, false, false) - - scm := test.NewControllerManager(sctlr) - scm.StartAndWait(sctlr.Config.HTTP.Port) - defer scm.StopServer() - - regex := ".*" - semver := true - var tlsVerify bool - - syncRegistryConfig := syncconf.RegistryConfig{ - Content: []syncconf.Content{ - { - Prefix: testImage, - Tags: &syncconf.Tags{ - Regex: ®ex, - Semver: &semver, - }, - }, - }, - URLs: []string{srcBaseURL}, - PollInterval: updateDuration, - TLSVerify: &tlsVerify, - CertDir: "", - } - - defaultVal := true - syncConfig := &syncconf.Config{ - Enable: &defaultVal, - Registries: []syncconf.RegistryConfig{syncRegistryConfig}, - } - - dctlr, _, destDir, _ := makeDownstreamServer(t, false, syncConfig) - - dcm := test.NewControllerManager(dctlr) - dcm.StartAndWait(dctlr.Config.HTTP.Port) - defer dcm.StopServer() - - // give permission denied on pushSyncedLocalImage() - localRepoPath := path.Join(destDir, testImage, "blobs") - err := os.MkdirAll(localRepoPath, 0o755) - So(err, ShouldBeNil) - - err = os.Chmod(localRepoPath, 0o000) - So(err, ShouldBeNil) - - defer func() { - err = os.Chmod(localRepoPath, 0o755) - So(err, ShouldBeNil) - }() - - found, err := test.ReadLogFileAndSearchString(dctlr.Config.Log.Output, - "couldn't commit image to local image store", 30*time.Second) - if err != nil { - panic(err) - } - - if !found { - data, err := os.ReadFile(dctlr.Config.Log.Output) - So(err, ShouldBeNil) - - t.Logf("downstream log: %s", string(data)) - } - - So(found, ShouldBeTrue) - }) -} - func TestSignaturesOnDemand(t *testing.T) { Convey("Verify sync signatures on demand feature", t, func() { sctlr, srcBaseURL, srcDir, _, _ := makeUpstreamServer(t, false, false) diff --git a/pkg/meta/meta_test.go b/pkg/meta/meta_test.go index 59e2101ee..990f1df4c 100644 --- a/pkg/meta/meta_test.go +++ b/pkg/meta/meta_test.go @@ -413,7 +413,7 @@ func RunMetaDBTests(t *testing.T, metaDB mTypes.MetaDB, preparationFuncs ...func }) Convey("Test API keys with short expiration date", func() { - expirationDate := time.Now().Add(500 * time.Millisecond).Local().Round(time.Millisecond) + expirationDate := time.Now().Add(1 * time.Second) apiKeyDetails.ExpirationDate = expirationDate userAc := reqCtx.NewUserAccessControl() @@ -435,7 +435,7 @@ func RunMetaDBTests(t *testing.T, metaDB mTypes.MetaDB, preparationFuncs ...func So(isExpired, ShouldBeFalse) So(err, ShouldBeNil) - time.Sleep(600 * time.Millisecond) + time.Sleep(1 * time.Second) Convey("GetUserAPIKeys detects api key expired", func() { storedAPIKeys, err = metaDB.GetUserAPIKeys(ctx) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 8ea89d548..23e894c82 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -83,6 +83,12 @@ type Scheduler struct { workerWg *sync.WaitGroup isShuttingDown atomic.Bool metricServer monitoring.MetricServer + + // ensure the scheduler can only be stopped once + stop sync.Once + + // close to signal the workers to stop working + quit chan struct{} } func NewScheduler(cfg *config.Config, ms monitoring.MetricServer, logC log.Logger) *Scheduler { //nolint: varnamelen @@ -107,12 +113,14 @@ func NewScheduler(cfg *config.Config, ms monitoring.MetricServer, logC log.Logge generatorsLock: new(sync.Mutex), log: log.Logger{Logger: sublogger}, // default value + metricServer: ms, RateLimit: rateLimit, NumWorkers: numWorkers, workerChan: make(chan Task, numWorkers), metricsChan: make(chan struct{}, 1), workerWg: new(sync.WaitGroup), - metricServer: ms, + stop: sync.Once{}, + quit: make(chan struct{}), } } @@ -210,12 +218,19 @@ func (scheduler *Scheduler) metricsWorker() { } } +/* +Scheduler can be stopped either by stopping the context provided in scheduler.RunScheduler(ctx context.Context) + + or by calling this function or both. + +Shutdown() will wait for all tasks being run to finish their work before exiting. +*/ func (scheduler *Scheduler) Shutdown() { + defer scheduler.workerWg.Wait() + if !scheduler.inShutdown() { scheduler.shutdown() } - - scheduler.workerWg.Wait() } func (scheduler *Scheduler) inShutdown() bool { @@ -223,11 +238,19 @@ func (scheduler *Scheduler) inShutdown() bool { } func (scheduler *Scheduler) shutdown() { - close(scheduler.workerChan) - close(scheduler.metricsChan) - scheduler.isShuttingDown.Store(true) + scheduler.stop.Do(func() { + scheduler.isShuttingDown.Store(true) + + close(scheduler.metricsChan) + close(scheduler.quit) + }) } +/* + This context is passed to all task generators + +canceling the context will stop scheduler and also it will notify tasks to finish their work gracefully. +*/ func (scheduler *Scheduler) RunScheduler(ctx context.Context) { throttle := time.NewTicker(rateLimit).C @@ -243,8 +266,12 @@ func (scheduler *Scheduler) RunScheduler(ctx context.Context) { go scheduler.metricsWorker() go func() { + // will close workers chan when either ctx is canceled or scheduler.Shutdown() + defer close(scheduler.workerChan) + for { select { + // can be stopped either by closing from outside the ctx given to RunScheduler() case <-ctx.Done(): if !scheduler.inShutdown() { scheduler.shutdown() @@ -252,24 +279,27 @@ func (scheduler *Scheduler) RunScheduler(ctx context.Context) { scheduler.log.Debug().Msg("received stop signal, gracefully shutting down...") + return + // or by running scheduler.Shutdown() + case <-scheduler.quit: + scheduler.log.Debug().Msg("scheduler: received stop signal, gracefully shutting down...") + return default: - i := 0 - for i < numWorkers { - task := scheduler.getTask() - - if task != nil { - // push tasks into worker pool - if !scheduler.inShutdown() { - scheduler.log.Debug().Str("task", task.String()).Msg("pushing task into worker pool") - scheduler.workerChan <- task - } - } - i++ + // we don't want to block on sending task in workerChan. + if len(scheduler.workerChan) == scheduler.NumWorkers { + <-throttle + + continue } - } - <-throttle + task := scheduler.getTask() + + if task != nil { + // push tasks into worker pool until workerChan is full. + scheduler.workerChan <- task + } + } } }() } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 89a39246e..3418ae16b 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -30,6 +30,14 @@ func (t *task) DoWork(ctx context.Context) error { return errInternal } + for idx := 0; idx < 5; idx++ { + if ctx.Err() != nil { + return ctx.Err() + } + + time.Sleep(100 * time.Millisecond) + } + t.log.Info().Msg(t.msg) return nil @@ -52,12 +60,20 @@ type generator struct { } func (g *generator) Next() (scheduler.Task, error) { - if g.step > 1 { + if g.step > 100 { g.done = true } g.step++ g.index++ + if g.step%11 == 0 { + return nil, nil + } + + if g.step%13 == 0 { + return nil, errInternal + } + return &task{log: g.log, msg: fmt.Sprintf("executing %s task; index: %d", g.priority, g.index), err: false}, nil } @@ -114,12 +130,12 @@ func TestScheduler(t *testing.T) { genH := &shortGenerator{log: logger, priority: "high priority"} // interval has to be higher than throttle value to simulate - sch.SubmitGenerator(genH, 12000*time.Millisecond, scheduler.HighPriority) + sch.SubmitGenerator(genH, 6*time.Second, scheduler.HighPriority) ctx, cancel := context.WithCancel(context.Background()) sch.RunScheduler(ctx) - time.Sleep(16 * time.Second) + time.Sleep(7 * time.Second) cancel() data, err := os.ReadFile(logFile.Name()) @@ -205,7 +221,7 @@ func TestScheduler(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) sch.RunScheduler(ctx) - time.Sleep(6 * time.Second) + time.Sleep(4 * time.Second) cancel() data, err := os.ReadFile(logFile.Name()) @@ -256,6 +272,101 @@ func TestScheduler(t *testing.T) { So(string(data), ShouldNotContainSubstring, "adding a new task") }) + Convey("Test stopping scheduler by cancelling the context", t, func() { + logFile, err := os.CreateTemp("", "zot-log*.txt") + So(err, ShouldBeNil) + + defer os.Remove(logFile.Name()) // clean up + + logger := log.NewLogger("debug", logFile.Name()) + metrics := monitoring.NewMetricsServer(true, logger) + sch := scheduler.NewScheduler(config.New(), metrics, logger) + + genL := &generator{log: logger, priority: "medium priority"} + sch.SubmitGenerator(genL, 20*time.Millisecond, scheduler.MediumPriority) + + ctx, cancel := context.WithCancel(context.Background()) + sch.RunScheduler(ctx) + + time.Sleep(1 * time.Second) + cancel() + + time.Sleep(4 * time.Second) + + data, err := os.ReadFile(logFile.Name()) + So(err, ShouldBeNil) + So(string(data), ShouldContainSubstring, "executing medium priority task; index: 1") + So(string(data), ShouldContainSubstring, "executing medium priority task; index: 2") + So(string(data), ShouldContainSubstring, "scheduler: received stop signal, gracefully shutting down...") + }) + + Convey("Test stopping scheduler by calling scheduler.Shutdown()", t, func() { + logFile, err := os.CreateTemp("", "zot-log*.txt") + So(err, ShouldBeNil) + + defer os.Remove(logFile.Name()) // clean up + + logger := log.NewLogger("debug", logFile.Name()) + metrics := monitoring.NewMetricsServer(true, logger) + sch := scheduler.NewScheduler(config.New(), metrics, logger) + + genL := &generator{log: logger, priority: "high priority"} + sch.SubmitGenerator(genL, 20*time.Millisecond, scheduler.HighPriority) + + sch.RunScheduler(context.Background()) + + time.Sleep(4 * time.Second) + + sch.Shutdown() + + data, err := os.ReadFile(logFile.Name()) + t.Log(string(data)) + So(err, ShouldBeNil) + So(string(data), ShouldContainSubstring, "executing high priority task; index: 1") + So(string(data), ShouldContainSubstring, "executing high priority task; index: 2") + So(string(data), ShouldContainSubstring, "scheduler: received stop signal, gracefully shutting down...") + }) + + Convey("Test stopping scheduler by both calling scheduler.Shutdown() and cancelling context", t, func() { + logFile, err := os.CreateTemp("", "zot-log*.txt") + So(err, ShouldBeNil) + + defer os.Remove(logFile.Name()) // clean up + + logger := log.NewLogger("debug", logFile.Name()) + metrics := monitoring.NewMetricsServer(true, logger) + sch := scheduler.NewScheduler(config.New(), metrics, logger) + + genL := &generator{log: logger, priority: "high priority"} + sch.SubmitGenerator(genL, 1*time.Millisecond, scheduler.HighPriority) + + ctx, cancel := context.WithCancel(context.Background()) + sch.RunScheduler(ctx) + + time.Sleep(4 * time.Second) + + go func() { + cancel() + sch.Shutdown() + }() + + go func() { + sch.Shutdown() + }() + + // will wait for scheduler to finish all tasks + sch.Shutdown() + + sch.Shutdown() + + data, err := os.ReadFile(logFile.Name()) + t.Log(string(data)) + So(err, ShouldBeNil) + So(string(data), ShouldContainSubstring, "executing high priority task; index: 1") + So(string(data), ShouldContainSubstring, "executing high priority task; index: 2") + So(string(data), ShouldContainSubstring, "scheduler: received stop signal, gracefully shutting down...") + }) + Convey("Test scheduler Priority.String() method", t, func() { var p scheduler.Priority //nolint: varnamelen // test invalid priority diff --git a/pkg/test/mocks/sync_remote_mock.go b/pkg/test/mocks/sync_remote_mock.go new file mode 100644 index 000000000..d75000296 --- /dev/null +++ b/pkg/test/mocks/sync_remote_mock.go @@ -0,0 +1,67 @@ +package mocks + +import ( + "context" + + "github.com/containers/image/v5/types" + "github.com/opencontainers/go-digest" +) + +type SyncRemote struct { + // Get temporary ImageReference, is used by functions in containers/image package + GetImageReferenceFn func(repo string, tag string) (types.ImageReference, error) + + // Get local oci layout context, is used by functions in containers/image package + GetContextFn func() *types.SystemContext + + // Get a list of repos (catalog) + GetRepositoriesFn func(ctx context.Context) ([]string, error) + + // Get a list of tags given a repo + GetRepoTagsFn func(repo string) ([]string, error) + + // Get manifest content, mediaType, digest given an ImageReference + GetManifestContentFn func(imageReference types.ImageReference) ([]byte, string, digest.Digest, error) +} + +func (remote SyncRemote) GetImageReference(repo string, tag string) (types.ImageReference, error) { + if remote.GetImageReferenceFn != nil { + return remote.GetImageReferenceFn(repo, tag) + } + + return nil, nil +} + +func (remote SyncRemote) GetContext() *types.SystemContext { + if remote.GetContextFn != nil { + return remote.GetContextFn() + } + + return nil +} + +func (remote SyncRemote) GetRepositories(ctx context.Context) ([]string, error) { + if remote.GetRepositoriesFn != nil { + return remote.GetRepositoriesFn(ctx) + } + + return []string{}, nil +} + +func (remote SyncRemote) GetRepoTags(repo string) ([]string, error) { + if remote.GetRepoTagsFn != nil { + return remote.GetRepoTagsFn(repo) + } + + return []string{}, nil +} + +func (remote SyncRemote) GetManifestContent(imageReference types.ImageReference) ( + []byte, string, digest.Digest, error, +) { + if remote.GetManifestContentFn != nil { + return remote.GetManifestContentFn(imageReference) + } + + return nil, "", "", nil +} From 8b0b567b8fea20e105e4370ed9365ffd38c0ab3f Mon Sep 17 00:00:00 2001 From: Petu Eusebiu Date: Thu, 7 Dec 2023 15:25:48 +0200 Subject: [PATCH 2/2] refactor(scheduler): refactor into a single shutdown before this we could stop scheduler either by closing the context provided to RunScheduler(ctx) or by running Shutdown(). simplify things by getting rid of the external context in RunScheduler(). keep an internal context in the scheduler itself and pass it down to all tasks. Signed-off-by: Petu Eusebiu --- pkg/api/authn_test.go | 4 +- pkg/api/controller.go | 25 +++-- pkg/api/controller_test.go | 22 ++-- pkg/cli/client/cve_cmd_test.go | 18 +-- pkg/cli/client/image_cmd_test.go | 2 +- pkg/cli/server/config_reloader.go | 36 ++---- pkg/cli/server/root.go | 10 +- pkg/compliance/v1_0_0/check_test.go | 4 +- pkg/exporter/api/controller_test.go | 4 +- pkg/extensions/monitoring/monitoring_test.go | 6 +- pkg/extensions/search/cve/scan_test.go | 12 +- pkg/extensions/search/cve/update_test.go | 6 +- pkg/extensions/search/search_test.go | 18 +-- pkg/extensions/sync/sync_test.go | 12 +- pkg/scheduler/scheduler.go | 41 ++----- pkg/scheduler/scheduler_test.go | 110 +++---------------- pkg/storage/local/local_test.go | 38 ++++--- pkg/storage/s3/s3_test.go | 84 +++++++------- pkg/test/common/utils.go | 23 +--- pkg/test/common/utils_test.go | 7 +- test/blackbox/sync.bats | 2 +- test/blackbox/sync_cloud.bats | 6 +- test/blackbox/sync_replica_cluster.bats | 4 +- 23 files changed, 179 insertions(+), 315 deletions(-) diff --git a/pkg/api/authn_test.go b/pkg/api/authn_test.go index 4890c8a7f..ac8145bdf 100644 --- a/pkg/api/authn_test.go +++ b/pkg/api/authn_test.go @@ -935,7 +935,7 @@ func TestCookiestoreCleanup(t *testing.T) { Convey("Test cookiestore cleanup works", t, func() { taskScheduler := scheduler.NewScheduler(config.New(), metrics, log) taskScheduler.RateLimit = 50 * time.Millisecond - taskScheduler.RunScheduler(context.Background()) + taskScheduler.RunScheduler() rootDir := t.TempDir() @@ -973,7 +973,7 @@ func TestCookiestoreCleanup(t *testing.T) { Convey("Test cookiestore cleanup without permissions on rootDir", t, func() { taskScheduler := scheduler.NewScheduler(config.New(), metrics, log) taskScheduler.RateLimit = 50 * time.Millisecond - taskScheduler.RunScheduler(context.Background()) + taskScheduler.RunScheduler() rootDir := t.TempDir() diff --git a/pkg/api/controller.go b/pkg/api/controller.go index 36cbd1048..fda220f5b 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -94,12 +94,12 @@ func (c *Controller) GetPort() int { return c.chosenPort } -func (c *Controller) Run(reloadCtx context.Context) error { +func (c *Controller) Run() error { if err := c.initCookieStore(); err != nil { return err } - c.StartBackgroundTasks(reloadCtx) + c.StartBackgroundTasks() // setup HTTP API router engine := mux.NewRouter() @@ -216,7 +216,7 @@ func (c *Controller) Run(reloadCtx context.Context) error { return server.Serve(listener) } -func (c *Controller) Init(reloadCtx context.Context) error { +func (c *Controller) Init() error { // print the current configuration, but strip secrets c.Log.Info().Interface("params", c.Config.Sanitize()).Msg("configuration settings") @@ -237,7 +237,7 @@ func (c *Controller) Init(reloadCtx context.Context) error { return err } - if err := c.InitMetaDB(reloadCtx); err != nil { + if err := c.InitMetaDB(); err != nil { return err } @@ -280,7 +280,7 @@ func (c *Controller) initCookieStore() error { return nil } -func (c *Controller) InitMetaDB(reloadCtx context.Context) error { +func (c *Controller) InitMetaDB() error { // init metaDB if search is enabled or we need to store user profiles, api keys or signatures if c.Config.IsSearchEnabled() || c.Config.IsBasicAuthnEnabled() || c.Config.IsImageTrustEnabled() || c.Config.IsRetentionEnabled() { @@ -310,7 +310,7 @@ func (c *Controller) InitMetaDB(reloadCtx context.Context) error { return nil } -func (c *Controller) LoadNewConfig(reloadCtx context.Context, newConfig *config.Config) { +func (c *Controller) LoadNewConfig(newConfig *config.Config) { // reload access control config c.Config.HTTP.AccessControl = newConfig.HTTP.AccessControl @@ -364,21 +364,24 @@ func (c *Controller) LoadNewConfig(reloadCtx context.Context, newConfig *config. c.InitCVEInfo() - c.StartBackgroundTasks(reloadCtx) - c.Log.Info().Interface("reloaded params", c.Config.Sanitize()). Msg("loaded new configuration settings") } func (c *Controller) Shutdown() { - c.taskScheduler.Shutdown() + c.StopBackgroundTasks() ctx := context.Background() _ = c.Server.Shutdown(ctx) } -func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) { +// Will stop scheduler and wait for all tasks to finish their work. +func (c *Controller) StopBackgroundTasks() { + c.taskScheduler.Shutdown() +} + +func (c *Controller) StartBackgroundTasks() { c.taskScheduler = scheduler.NewScheduler(c.Config, c.Metrics, c.Log) - c.taskScheduler.RunScheduler(reloadCtx) + c.taskScheduler.RunScheduler() // Enable running garbage-collect periodically for DefaultStore if c.Config.Storage.GC { diff --git a/pkg/api/controller_test.go b/pkg/api/controller_test.go index 87c0c0846..ca63430da 100644 --- a/pkg/api/controller_test.go +++ b/pkg/api/controller_test.go @@ -300,10 +300,10 @@ func TestRunAlreadyRunningServer(t *testing.T) { cm.StartAndWait(port) defer cm.StopServer() - err := ctlr.Init(context.Background()) + err := ctlr.Init() So(err, ShouldNotBeNil) - err = ctlr.Run(context.Background()) + err = ctlr.Run() So(err, ShouldNotBeNil) }) } @@ -377,7 +377,7 @@ func TestObjectStorageController(t *testing.T) { ctlr := makeController(conf, tmp) So(ctlr, ShouldNotBeNil) - err := ctlr.Init(context.Background()) + err := ctlr.Init() So(err, ShouldNotBeNil) }) @@ -1218,7 +1218,7 @@ func TestMultipleInstance(t *testing.T) { } ctlr := api.NewController(conf) ctlr.Log.Info().Int64("seedUser", seedUser).Int64("seedPass", seedPass).Msg("random seed for username & password") - err := ctlr.Init(context.Background()) + err := ctlr.Init() So(err, ShouldEqual, errors.ErrImgStoreNotFound) globalDir := t.TempDir() @@ -1311,7 +1311,7 @@ func TestMultipleInstance(t *testing.T) { ctlr.Config.Storage.SubPaths = subPathMap - err := ctlr.Init(context.Background()) + err := ctlr.Init() So(err, ShouldNotBeNil) // subpath root directory does not exist. @@ -1320,7 +1320,7 @@ func TestMultipleInstance(t *testing.T) { ctlr.Config.Storage.SubPaths = subPathMap - err = ctlr.Init(context.Background()) + err = ctlr.Init() So(err, ShouldNotBeNil) subPathMap["/a"] = config.StorageConfig{RootDirectory: subDir, Dedupe: true, GC: true} @@ -1328,7 +1328,7 @@ func TestMultipleInstance(t *testing.T) { ctlr.Config.Storage.SubPaths = subPathMap - err = ctlr.Init(context.Background()) + err = ctlr.Init() So(err, ShouldNotBeNil) }) } @@ -1826,12 +1826,12 @@ func TestTSLFailedReadingOfCACert(t *testing.T) { defer cancel() ctlr := makeController(conf, t.TempDir()) - err = ctlr.Init(ctx) + err = ctlr.Init() So(err, ShouldBeNil) errChan := make(chan error, 1) go func() { - err = ctlr.Run(ctx) + err = ctlr.Run() errChan <- err }() @@ -1866,12 +1866,12 @@ func TestTSLFailedReadingOfCACert(t *testing.T) { defer cancel() ctlr := makeController(conf, t.TempDir()) - err = ctlr.Init(ctx) + err = ctlr.Init() So(err, ShouldBeNil) errChan := make(chan error, 1) go func() { - err = ctlr.Run(ctx) + err = ctlr.Run() errChan <- err }() diff --git a/pkg/cli/client/cve_cmd_test.go b/pkg/cli/client/cve_cmd_test.go index 91a922bd5..d0876b28b 100644 --- a/pkg/cli/client/cve_cmd_test.go +++ b/pkg/cli/client/cve_cmd_test.go @@ -163,16 +163,14 @@ func TestNegativeServerResponse(t *testing.T) { ctlr := api.NewController(conf) ctlr.Log.Logger = ctlr.Log.Output(writers) - ctx := context.Background() - - if err := ctlr.Init(ctx); err != nil { + if err := ctlr.Init(); err != nil { panic(err) } ctlr.CveScanner = getMockCveScanner(ctlr.MetaDB) go func() { - if err := ctlr.Run(ctx); !errors.Is(err, http.ErrServerClosed) { + if err := ctlr.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } }() @@ -239,16 +237,14 @@ func TestServerCVEResponse(t *testing.T) { ctlr := api.NewController(conf) ctlr.Log.Logger = ctlr.Log.Output(writers) - ctx := context.Background() - - if err := ctlr.Init(ctx); err != nil { + if err := ctlr.Init(); err != nil { panic(err) } ctlr.CveScanner = getMockCveScanner(ctlr.MetaDB) go func() { - if err := ctlr.Run(ctx); !errors.Is(err, http.ErrServerClosed) { + if err := ctlr.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } }() @@ -578,9 +574,7 @@ func TestCVESort(t *testing.T) { t.FailNow() } - ctx := context.Background() - - if err := ctlr.Init(ctx); err != nil { + if err := ctlr.Init(); err != nil { panic(err) } @@ -617,7 +611,7 @@ func TestCVESort(t *testing.T) { } go func() { - if err := ctlr.Run(ctx); !errors.Is(err, http.ErrServerClosed) { + if err := ctlr.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } }() diff --git a/pkg/cli/client/image_cmd_test.go b/pkg/cli/client/image_cmd_test.go index df56f1afd..496ff0260 100644 --- a/pkg/cli/client/image_cmd_test.go +++ b/pkg/cli/client/image_cmd_test.go @@ -866,7 +866,7 @@ func TestServerResponseGQLWithoutPermissions(t *testing.T) { } ctlr := api.NewController(conf) - if err := ctlr.Init(context.Background()); err != nil { + if err := ctlr.Init(); err != nil { So(err, ShouldNotBeNil) } }) diff --git a/pkg/cli/server/config_reloader.go b/pkg/cli/server/config_reloader.go index f81f76331..43967a275 100644 --- a/pkg/cli/server/config_reloader.go +++ b/pkg/cli/server/config_reloader.go @@ -1,7 +1,6 @@ package server import ( - "context" "os" "os/signal" "syscall" @@ -35,28 +34,22 @@ func NewHotReloader(ctlr *api.Controller, filePath string) (*HotReloader, error) return hotReloader, nil } -func signalHandler(ctlr *api.Controller, sigCh chan os.Signal, ctx context.Context, cancel context.CancelFunc) { - select { +func signalHandler(ctlr *api.Controller, sigCh chan os.Signal) { // if signal then shutdown - case sig := <-sigCh: - defer cancel() - + if sig, ok := <-sigCh; ok { ctlr.Log.Info().Interface("signal", sig).Msg("received signal") // gracefully shutdown http server ctlr.Shutdown() //nolint: contextcheck close(sigCh) - // if reload then return - case <-ctx.Done(): - return } } -func initShutDownRoutine(ctlr *api.Controller, ctx context.Context, cancel context.CancelFunc) { +func initShutDownRoutine(ctlr *api.Controller) { sigCh := make(chan os.Signal, 1) - go signalHandler(ctlr, sigCh, ctx, cancel) + go signalHandler(ctlr, sigCh) // block all async signals to this server signal.Ignore() @@ -65,12 +58,10 @@ func initShutDownRoutine(ctlr *api.Controller, ctx context.Context, cancel conte signal.Notify(sigCh, syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP) } -func (hr *HotReloader) Start() context.Context { - reloadCtx, cancelFunc := context.WithCancel(context.Background()) - +func (hr *HotReloader) Start() { done := make(chan bool) - initShutDownRoutine(hr.ctlr, reloadCtx, cancelFunc) + initShutDownRoutine(hr.ctlr) // run watcher go func() { @@ -92,16 +83,15 @@ func (hr *HotReloader) Start() context.Context { continue } - // if valid config then reload - cancelFunc() - // create new context - reloadCtx, cancelFunc = context.WithCancel(context.Background()) + // stop background tasks gracefully + hr.ctlr.StopBackgroundTasks() - // init shutdown routine - initShutDownRoutine(hr.ctlr, reloadCtx, cancelFunc) + // load new config + hr.ctlr.LoadNewConfig(newConfig) - hr.ctlr.LoadNewConfig(reloadCtx, newConfig) + // start background tasks based on new loaded config + hr.ctlr.StartBackgroundTasks() } // watch for errors case err := <-hr.watcher.Errors: @@ -116,6 +106,4 @@ func (hr *HotReloader) Start() context.Context { <-done }() - - return reloadCtx } diff --git a/pkg/cli/server/root.go b/pkg/cli/server/root.go index d3e638453..afdc1d156 100644 --- a/pkg/cli/server/root.go +++ b/pkg/cli/server/root.go @@ -61,20 +61,16 @@ func newServeCmd(conf *config.Config) *cobra.Command { return err } - /* context used to cancel go routines so that - we can change their config on the fly (restart routines with different config) */ - reloaderCtx := hotReloader.Start() + hotReloader.Start() - if err := ctlr.Init(reloaderCtx); err != nil { + if err := ctlr.Init(); err != nil { ctlr.Log.Error().Err(err).Msg("failed to init controller") return err } - if err := ctlr.Run(reloaderCtx); err != nil { + if err := ctlr.Run(); err != nil { log.Error().Err(err).Msg("failed to start controller, exiting") - - return err } return nil diff --git a/pkg/compliance/v1_0_0/check_test.go b/pkg/compliance/v1_0_0/check_test.go index 3b104f3a3..1ed06f74e 100644 --- a/pkg/compliance/v1_0_0/check_test.go +++ b/pkg/compliance/v1_0_0/check_test.go @@ -81,12 +81,12 @@ func startServer(t *testing.T) (*api.Controller, string) { ctrl.Config.Storage.SubPaths = subPaths go func() { - if err := ctrl.Init(context.Background()); err != nil { + if err := ctrl.Init(); err != nil { return } // this blocks - if err := ctrl.Run(context.Background()); err != nil { + if err := ctrl.Run(); err != nil { return } }() diff --git a/pkg/exporter/api/controller_test.go b/pkg/exporter/api/controller_test.go index 387470b81..f9bd08521 100644 --- a/pkg/exporter/api/controller_test.go +++ b/pkg/exporter/api/controller_test.go @@ -140,12 +140,12 @@ func TestNewExporter(t *testing.T) { dir := t.TempDir() serverController.Config.Storage.RootDirectory = dir go func(ctrl *zotapi.Controller) { - if err := ctrl.Init(context.Background()); err != nil { + if err := ctrl.Init(); err != nil { panic(err) } // this blocks - if err := ctrl.Run(context.Background()); !errors.Is(err, http.ErrServerClosed) { + if err := ctrl.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } }(serverController) diff --git a/pkg/extensions/monitoring/monitoring_test.go b/pkg/extensions/monitoring/monitoring_test.go index 90f966815..e5ad13955 100644 --- a/pkg/extensions/monitoring/monitoring_test.go +++ b/pkg/extensions/monitoring/monitoring_test.go @@ -4,7 +4,6 @@ package monitoring_test import ( - "context" "fmt" "io" "math/rand" @@ -463,8 +462,7 @@ func TestPopulateStorageMetrics(t *testing.T) { metrics := monitoring.NewMetricsServer(true, ctlr.Log) sch := scheduler.NewScheduler(conf, metrics, ctlr.Log) - ctx, cancel := context.WithCancel(context.Background()) - sch.RunScheduler(ctx) + sch.RunScheduler() generator := &common.StorageMetricsInitGenerator{ ImgStore: ctlr.StoreController.DefaultStore, @@ -485,7 +483,7 @@ func TestPopulateStorageMetrics(t *testing.T) { So(err, ShouldBeNil) So(found, ShouldBeTrue) - cancel() + sch.Shutdown() alpineSize, err := monitoring.GetDirSize(path.Join(rootDir, "alpine")) So(err, ShouldBeNil) busyboxSize, err := monitoring.GetDirSize(path.Join(rootDir, "busybox")) diff --git a/pkg/extensions/search/cve/scan_test.go b/pkg/extensions/search/cve/scan_test.go index ac05e7252..13af44a83 100644 --- a/pkg/extensions/search/cve/scan_test.go +++ b/pkg/extensions/search/cve/scan_test.go @@ -432,11 +432,9 @@ func TestScanGeneratorWithMockedData(t *testing.T) { //nolint: gocyclo sch.SubmitGenerator(generator, 10*time.Second, scheduler.MediumPriority) - ctx, cancel := context.WithCancel(context.Background()) + sch.RunScheduler() - sch.RunScheduler(ctx) - - defer cancel() + defer sch.Shutdown() // Make sure the scanner generator has completed despite errors found, err := test.ReadLogFileAndSearchString(logPath, @@ -529,11 +527,9 @@ func TestScanGeneratorWithRealData(t *testing.T) { // Start the generator sch.SubmitGenerator(generator, 120*time.Second, scheduler.MediumPriority) - ctx, cancel := context.WithCancel(context.Background()) - - sch.RunScheduler(ctx) + sch.RunScheduler() - defer cancel() + defer sch.Shutdown() // Make sure the scanner generator has completed found, err := test.ReadLogFileAndSearchString(logPath, diff --git a/pkg/extensions/search/cve/update_test.go b/pkg/extensions/search/cve/update_test.go index 9efbdee74..7545ae6d5 100644 --- a/pkg/extensions/search/cve/update_test.go +++ b/pkg/extensions/search/cve/update_test.go @@ -63,11 +63,9 @@ func TestCVEDBGenerator(t *testing.T) { sch.SubmitGenerator(generator, 12000*time.Millisecond, scheduler.HighPriority) - ctx, cancel := context.WithCancel(context.Background()) + sch.RunScheduler() - sch.RunScheduler(ctx) - - defer cancel() + defer sch.Shutdown() // Wait for trivy db to download found, err := test.ReadLogFileAndCountStringOccurence(logPath, diff --git a/pkg/extensions/search/search_test.go b/pkg/extensions/search/search_test.go index 4fac01d3d..b13dd8934 100644 --- a/pkg/extensions/search/search_test.go +++ b/pkg/extensions/search/search_test.go @@ -681,16 +681,14 @@ func TestRepoListWithNewestImage(t *testing.T) { ctlr := api.NewController(conf) ctlr.Log.Logger = ctlr.Log.Output(writers) - ctx := context.Background() - - if err := ctlr.Init(ctx); err != nil { + if err := ctlr.Init(); err != nil { panic(err) } ctlr.CveScanner = getMockCveScanner(ctlr.MetaDB) go func() { - if err := ctlr.Run(ctx); !errors.Is(err, http.ErrServerClosed) { + if err := ctlr.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } }() @@ -3373,16 +3371,14 @@ func TestGlobalSearch(t *testing.T) { ctlr := api.NewController(conf) ctlr.Log.Logger = ctlr.Log.Output(writers) - ctx := context.Background() - - if err := ctlr.Init(ctx); err != nil { + if err := ctlr.Init(); err != nil { panic(err) } ctlr.CveScanner = getMockCveScanner(ctlr.MetaDB) go func() { - if err := ctlr.Run(ctx); !errors.Is(err, http.ErrServerClosed) { + if err := ctlr.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } }() @@ -6205,16 +6201,14 @@ func TestImageSummary(t *testing.T) { configDigest := godigest.FromBytes(configBlob) So(errConfig, ShouldBeNil) // marshall success, config is valid JSON - ctx := context.Background() - - if err := ctlr.Init(ctx); err != nil { + if err := ctlr.Init(); err != nil { panic(err) } ctlr.CveScanner = getMockCveScanner(ctlr.MetaDB) go func() { - if err := ctlr.Run(ctx); !errors.Is(err, http.ErrServerClosed) { + if err := ctlr.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } }() diff --git a/pkg/extensions/sync/sync_test.go b/pkg/extensions/sync/sync_test.go index 0a43e5ae7..344ad7395 100644 --- a/pkg/extensions/sync/sync_test.go +++ b/pkg/extensions/sync/sync_test.go @@ -1901,15 +1901,15 @@ func TestConfigReloader(t *testing.T) { hotReloader, err := cli.NewHotReloader(dctlr, cfgfile.Name()) So(err, ShouldBeNil) - reloadCtx := hotReloader.Start() + hotReloader.Start() go func() { // this blocks - if err := dctlr.Init(reloadCtx); err != nil { + if err := dctlr.Init(); err != nil { return } - if err := dctlr.Run(reloadCtx); err != nil { + if err := dctlr.Run(); err != nil { return } }() @@ -2051,15 +2051,15 @@ func TestConfigReloader(t *testing.T) { hotReloader, err := cli.NewHotReloader(dctlr, cfgfile.Name()) So(err, ShouldBeNil) - reloadCtx := hotReloader.Start() + hotReloader.Start() go func() { // this blocks - if err := dctlr.Init(reloadCtx); err != nil { + if err := dctlr.Init(); err != nil { return } - if err := dctlr.Run(reloadCtx); err != nil { + if err := dctlr.Run(); err != nil { return } }() diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 23e894c82..174559d45 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -83,12 +83,7 @@ type Scheduler struct { workerWg *sync.WaitGroup isShuttingDown atomic.Bool metricServer monitoring.MetricServer - - // ensure the scheduler can only be stopped once - stop sync.Once - - // close to signal the workers to stop working - quit chan struct{} + cancelFunc context.CancelFunc } func NewScheduler(cfg *config.Config, ms monitoring.MetricServer, logC log.Logger) *Scheduler { //nolint: varnamelen @@ -119,8 +114,6 @@ func NewScheduler(cfg *config.Config, ms monitoring.MetricServer, logC log.Logge workerChan: make(chan Task, numWorkers), metricsChan: make(chan struct{}, 1), workerWg: new(sync.WaitGroup), - stop: sync.Once{}, - quit: make(chan struct{}), } } @@ -219,11 +212,8 @@ func (scheduler *Scheduler) metricsWorker() { } /* -Scheduler can be stopped either by stopping the context provided in scheduler.RunScheduler(ctx context.Context) - - or by calling this function or both. - -Shutdown() will wait for all tasks being run to finish their work before exiting. +Scheduler can be stopped by calling Shutdown(). +it will wait for all tasks being run to finish their work before exiting. */ func (scheduler *Scheduler) Shutdown() { defer scheduler.workerWg.Wait() @@ -238,20 +228,19 @@ func (scheduler *Scheduler) inShutdown() bool { } func (scheduler *Scheduler) shutdown() { - scheduler.stop.Do(func() { - scheduler.isShuttingDown.Store(true) + scheduler.isShuttingDown.Store(true) - close(scheduler.metricsChan) - close(scheduler.quit) - }) + scheduler.cancelFunc() + close(scheduler.metricsChan) } -/* - This context is passed to all task generators +func (scheduler *Scheduler) RunScheduler() { + /*This context is passed to all task generators + calling scheduler.Shutdown() will cancel this context and will wait for all tasks + to finish their work gracefully.*/ + ctx, cancel := context.WithCancel(context.Background()) + scheduler.cancelFunc = cancel -canceling the context will stop scheduler and also it will notify tasks to finish their work gracefully. -*/ -func (scheduler *Scheduler) RunScheduler(ctx context.Context) { throttle := time.NewTicker(rateLimit).C numWorkers := scheduler.NumWorkers @@ -271,7 +260,6 @@ func (scheduler *Scheduler) RunScheduler(ctx context.Context) { for { select { - // can be stopped either by closing from outside the ctx given to RunScheduler() case <-ctx.Done(): if !scheduler.inShutdown() { scheduler.shutdown() @@ -279,11 +267,6 @@ func (scheduler *Scheduler) RunScheduler(ctx context.Context) { scheduler.log.Debug().Msg("received stop signal, gracefully shutting down...") - return - // or by running scheduler.Shutdown() - case <-scheduler.quit: - scheduler.log.Debug().Msg("scheduler: received stop signal, gracefully shutting down...") - return default: // we don't want to block on sending task in workerChan. diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 3418ae16b..9980cd310 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -132,11 +132,9 @@ func TestScheduler(t *testing.T) { // interval has to be higher than throttle value to simulate sch.SubmitGenerator(genH, 6*time.Second, scheduler.HighPriority) - ctx, cancel := context.WithCancel(context.Background()) - sch.RunScheduler(ctx) - + sch.RunScheduler() time.Sleep(7 * time.Second) - cancel() + sch.Shutdown() data, err := os.ReadFile(logFile.Name()) So(err, ShouldBeNil) @@ -164,12 +162,9 @@ func TestScheduler(t *testing.T) { genH := &generator{log: logger, priority: "high priority"} sch.SubmitGenerator(genH, time.Duration(0), scheduler.HighPriority) - ctx, cancel := context.WithCancel(context.Background()) - - sch.RunScheduler(ctx) - + sch.RunScheduler() time.Sleep(4 * time.Second) - cancel() + sch.Shutdown() data, err := os.ReadFile(logFile.Name()) So(err, ShouldBeNil) @@ -193,11 +188,9 @@ func TestScheduler(t *testing.T) { t := &task{log: logger, msg: "", err: true} sch.SubmitTask(t, scheduler.MediumPriority) - ctx, cancel := context.WithCancel(context.Background()) - sch.RunScheduler(ctx) - + sch.RunScheduler() time.Sleep(500 * time.Millisecond) - cancel() + sch.Shutdown() data, err := os.ReadFile(logFile.Name()) So(err, ShouldBeNil) @@ -218,11 +211,9 @@ func TestScheduler(t *testing.T) { genL := &generator{log: logger, priority: "low priority"} sch.SubmitGenerator(genL, 20*time.Millisecond, scheduler.LowPriority) - ctx, cancel := context.WithCancel(context.Background()) - sch.RunScheduler(ctx) - + sch.RunScheduler() time.Sleep(4 * time.Second) - cancel() + sch.Shutdown() data, err := os.ReadFile(logFile.Name()) So(err, ShouldBeNil) @@ -258,10 +249,8 @@ func TestScheduler(t *testing.T) { metrics := monitoring.NewMetricsServer(true, logger) sch := scheduler.NewScheduler(config.New(), metrics, logger) - ctx, cancel := context.WithCancel(context.Background()) - - sch.RunScheduler(ctx) - cancel() + sch.RunScheduler() + sch.Shutdown() time.Sleep(500 * time.Millisecond) t := &task{log: logger, msg: "", err: false} @@ -272,7 +261,7 @@ func TestScheduler(t *testing.T) { So(string(data), ShouldNotContainSubstring, "adding a new task") }) - Convey("Test stopping scheduler by cancelling the context", t, func() { + Convey("Test stopping scheduler by calling Shutdown()", t, func() { logFile, err := os.CreateTemp("", "zot-log*.txt") So(err, ShouldBeNil) @@ -285,86 +274,15 @@ func TestScheduler(t *testing.T) { genL := &generator{log: logger, priority: "medium priority"} sch.SubmitGenerator(genL, 20*time.Millisecond, scheduler.MediumPriority) - ctx, cancel := context.WithCancel(context.Background()) - sch.RunScheduler(ctx) - - time.Sleep(1 * time.Second) - cancel() - + sch.RunScheduler() time.Sleep(4 * time.Second) + sch.Shutdown() data, err := os.ReadFile(logFile.Name()) So(err, ShouldBeNil) So(string(data), ShouldContainSubstring, "executing medium priority task; index: 1") So(string(data), ShouldContainSubstring, "executing medium priority task; index: 2") - So(string(data), ShouldContainSubstring, "scheduler: received stop signal, gracefully shutting down...") - }) - - Convey("Test stopping scheduler by calling scheduler.Shutdown()", t, func() { - logFile, err := os.CreateTemp("", "zot-log*.txt") - So(err, ShouldBeNil) - - defer os.Remove(logFile.Name()) // clean up - - logger := log.NewLogger("debug", logFile.Name()) - metrics := monitoring.NewMetricsServer(true, logger) - sch := scheduler.NewScheduler(config.New(), metrics, logger) - - genL := &generator{log: logger, priority: "high priority"} - sch.SubmitGenerator(genL, 20*time.Millisecond, scheduler.HighPriority) - - sch.RunScheduler(context.Background()) - - time.Sleep(4 * time.Second) - - sch.Shutdown() - - data, err := os.ReadFile(logFile.Name()) - t.Log(string(data)) - So(err, ShouldBeNil) - So(string(data), ShouldContainSubstring, "executing high priority task; index: 1") - So(string(data), ShouldContainSubstring, "executing high priority task; index: 2") - So(string(data), ShouldContainSubstring, "scheduler: received stop signal, gracefully shutting down...") - }) - - Convey("Test stopping scheduler by both calling scheduler.Shutdown() and cancelling context", t, func() { - logFile, err := os.CreateTemp("", "zot-log*.txt") - So(err, ShouldBeNil) - - defer os.Remove(logFile.Name()) // clean up - - logger := log.NewLogger("debug", logFile.Name()) - metrics := monitoring.NewMetricsServer(true, logger) - sch := scheduler.NewScheduler(config.New(), metrics, logger) - - genL := &generator{log: logger, priority: "high priority"} - sch.SubmitGenerator(genL, 1*time.Millisecond, scheduler.HighPriority) - - ctx, cancel := context.WithCancel(context.Background()) - sch.RunScheduler(ctx) - - time.Sleep(4 * time.Second) - - go func() { - cancel() - sch.Shutdown() - }() - - go func() { - sch.Shutdown() - }() - - // will wait for scheduler to finish all tasks - sch.Shutdown() - - sch.Shutdown() - - data, err := os.ReadFile(logFile.Name()) - t.Log(string(data)) - So(err, ShouldBeNil) - So(string(data), ShouldContainSubstring, "executing high priority task; index: 1") - So(string(data), ShouldContainSubstring, "executing high priority task; index: 2") - So(string(data), ShouldContainSubstring, "scheduler: received stop signal, gracefully shutting down...") + So(string(data), ShouldContainSubstring, "received stop signal, gracefully shutting down...") }) Convey("Test scheduler Priority.String() method", t, func() { diff --git a/pkg/storage/local/local_test.go b/pkg/storage/local/local_test.go index 0409b68a0..c825eb687 100644 --- a/pkg/storage/local/local_test.go +++ b/pkg/storage/local/local_test.go @@ -62,16 +62,15 @@ var DeleteReferrers = config.ImageRetention{ //nolint: gochecknoglobals var errCache = errors.New("new cache error") -func runAndGetScheduler() (*scheduler.Scheduler, context.CancelFunc) { +func runAndGetScheduler() *scheduler.Scheduler { log := zlog.Logger{} metrics := monitoring.NewMetricsServer(true, log) taskScheduler := scheduler.NewScheduler(config.New(), metrics, log) taskScheduler.RateLimit = 50 * time.Millisecond - ctx, cancel := context.WithCancel(context.Background()) - taskScheduler.RunScheduler(ctx) + taskScheduler.RunScheduler() - return taskScheduler, cancel + return taskScheduler } func TestStorageFSAPIs(t *testing.T) { @@ -1195,14 +1194,15 @@ func TestDedupeLinks(t *testing.T) { // run on empty image store // switch dedupe to true from false - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() // rebuild with dedupe true imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) // wait until rebuild finishes time.Sleep(1 * time.Second) - cancel() + taskScheduler.Shutdown() // manifest1 upload, err := imgStore.NewBlobUpload("dedupe1") @@ -1367,7 +1367,9 @@ func TestDedupeLinks(t *testing.T) { Convey("Intrerrupt rebuilding and restart, checking idempotency", func() { for i := 0; i < 10; i++ { - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() + // rebuild with dedupe true imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, cacheDriver) @@ -1375,10 +1377,11 @@ func TestDedupeLinks(t *testing.T) { sleepValue := i * 5 time.Sleep(time.Duration(sleepValue) * time.Millisecond) - cancel() + taskScheduler.Shutdown() } - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() // rebuild with dedupe true imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, cacheDriver) @@ -1387,7 +1390,7 @@ func TestDedupeLinks(t *testing.T) { // wait until rebuild finishes time.Sleep(10 * time.Second) - cancel() + taskScheduler.Shutdown() fi1, err := os.Stat(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1)) So(err, ShouldBeNil) @@ -1398,7 +1401,8 @@ func TestDedupeLinks(t *testing.T) { Convey("rebuild dedupe index error cache nil", func() { // switch dedupe to true from false - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, nil) @@ -1408,7 +1412,7 @@ func TestDedupeLinks(t *testing.T) { time.Sleep(3 * time.Second) - cancel() + taskScheduler.Shutdown() fi1, err := os.Stat(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1)) So(err, ShouldBeNil) @@ -1420,7 +1424,8 @@ func TestDedupeLinks(t *testing.T) { Convey("rebuild dedupe index cache error on original blob", func() { // switch dedupe to true from false - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, &mocks.CacheMock{ HasBlobFn: func(digest godigest.Digest, path string) bool { @@ -1436,7 +1441,7 @@ func TestDedupeLinks(t *testing.T) { time.Sleep(10 * time.Second) - cancel() + taskScheduler.Shutdown() fi1, err := os.Stat(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1)) So(err, ShouldBeNil) @@ -1448,7 +1453,8 @@ func TestDedupeLinks(t *testing.T) { Convey("rebuild dedupe index cache error on duplicate blob", func() { // switch dedupe to true from false - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, &mocks.CacheMock{ HasBlobFn: func(digest godigest.Digest, path string) bool { @@ -1468,7 +1474,7 @@ func TestDedupeLinks(t *testing.T) { time.Sleep(15 * time.Second) - cancel() + taskScheduler.Shutdown() fi1, err := os.Stat(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1)) So(err, ShouldBeNil) diff --git a/pkg/storage/s3/s3_test.go b/pkg/storage/s3/s3_test.go index d9e3b9079..67b8b6869 100644 --- a/pkg/storage/s3/s3_test.go +++ b/pkg/storage/s3/s3_test.go @@ -186,16 +186,15 @@ func createObjectsStoreDynamo(rootDir string, cacheDir string, dedupe bool, tabl return store, il, err } -func runAndGetScheduler() (*scheduler.Scheduler, context.CancelFunc) { +func runAndGetScheduler() *scheduler.Scheduler { logger := log.Logger{} metrics := monitoring.NewMetricsServer(false, logger) taskScheduler := scheduler.NewScheduler(config.New(), metrics, logger) taskScheduler.RateLimit = 50 * time.Millisecond - ctx, cancel := context.WithCancel(context.Background()) - taskScheduler.RunScheduler(ctx) + taskScheduler.RunScheduler() - return taskScheduler, cancel + return taskScheduler } type FileInfoMock struct { @@ -1587,7 +1586,8 @@ func TestS3Dedupe(t *testing.T) { }) Convey("rebuild s3 dedupe index from true to false", func() { //nolint: dupl - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), false) defer cleanupStorage(storeDriver, testDir) @@ -1598,7 +1598,7 @@ func TestS3Dedupe(t *testing.T) { time.Sleep(10 * time.Second) - cancel() + taskScheduler.Shutdown() fi1, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe1", "blobs", "sha256", blobDigest1.Encoded())) @@ -1615,9 +1615,8 @@ func TestS3Dedupe(t *testing.T) { So(len(blobContent), ShouldEqual, fi1.Size()) Convey("rebuild s3 dedupe index from false to true", func() { - taskScheduler, cancel := runAndGetScheduler() - - defer cancel() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), true) defer cleanupStorage(storeDriver, testDir) @@ -1628,6 +1627,8 @@ func TestS3Dedupe(t *testing.T) { time.Sleep(10 * time.Second) + taskScheduler.Shutdown() + fi2, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", blobDigest2.Encoded())) So(err, ShouldBeNil) @@ -1816,7 +1817,8 @@ func TestS3Dedupe(t *testing.T) { }) Convey("rebuild s3 dedupe index from true to false", func() { //nolint: dupl - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), false) defer cleanupStorage(storeDriver, testDir) @@ -1827,7 +1829,7 @@ func TestS3Dedupe(t *testing.T) { time.Sleep(10 * time.Second) - cancel() + taskScheduler.Shutdown() fi1, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe1", "blobs", "sha256", blobDigest1.Encoded())) @@ -1861,7 +1863,8 @@ func TestS3Dedupe(t *testing.T) { }) Convey("rebuild s3 dedupe index from false to true", func() { - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), true) defer cleanupStorage(storeDriver, testDir) @@ -1872,7 +1875,7 @@ func TestS3Dedupe(t *testing.T) { time.Sleep(10 * time.Second) - cancel() + taskScheduler.Shutdown() fi2, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", blobDigest2.Encoded())) @@ -2055,9 +2058,8 @@ func TestRebuildDedupeIndex(t *testing.T) { taskScheduler := scheduler.NewScheduler(config.New(), metrics, logger) taskScheduler.RateLimit = 1 * time.Millisecond - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - - taskScheduler.RunScheduler(ctx) + taskScheduler.RunScheduler() + defer taskScheduler.Shutdown() storeDriver, imgStore, _ = createObjectsStore(testDir, t.TempDir(), false) defer cleanupStorage(storeDriver, testDir) @@ -2067,17 +2069,18 @@ func TestRebuildDedupeIndex(t *testing.T) { sleepValue := i * 5 time.Sleep(time.Duration(sleepValue) * time.Millisecond) - cancel() + taskScheduler.Shutdown() } - taskScheduler, cancel := runAndGetScheduler() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) // wait until rebuild finishes time.Sleep(10 * time.Second) - cancel() + taskScheduler.Shutdown() fi2, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", blobDigest2.Encoded())) @@ -2096,10 +2099,8 @@ func TestRebuildDedupeIndex(t *testing.T) { taskScheduler := scheduler.NewScheduler(config.New(), metrics, logger) taskScheduler.RateLimit = 1 * time.Millisecond - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() - - taskScheduler.RunScheduler(ctx) + taskScheduler.RunScheduler() + defer taskScheduler.Shutdown() storeDriver, imgStore, _ = createObjectsStore(testDir, t.TempDir(), true) defer cleanupStorage(storeDriver, testDir) @@ -2110,10 +2111,11 @@ func TestRebuildDedupeIndex(t *testing.T) { sleepValue := i * 5 time.Sleep(time.Duration(sleepValue) * time.Millisecond) - cancel() + taskScheduler.Shutdown() } - taskScheduler, cancel = runAndGetScheduler() + taskScheduler = runAndGetScheduler() + defer taskScheduler.Shutdown() // rebuild with dedupe false, should have all blobs with content imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) @@ -2121,7 +2123,7 @@ func TestRebuildDedupeIndex(t *testing.T) { // wait until rebuild finishes time.Sleep(10 * time.Second) - cancel() + taskScheduler.Shutdown() fi2, err = storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", blobDigest2.Encoded())) @@ -2140,8 +2142,8 @@ func TestRebuildDedupeIndex(t *testing.T) { storeDriver, imgStore, _ := createObjectsStore(testDir, tdir, true) defer cleanupStorage(storeDriver, testDir) - taskScheduler, cancel := runAndGetScheduler() - defer cancel() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) @@ -2150,8 +2152,8 @@ func TestRebuildDedupeIndex(t *testing.T) { }) Convey("Rebuild dedupe index already rebuilt", func() { - taskScheduler, cancel := runAndGetScheduler() - defer cancel() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), true) defer cleanupStorage(storeDriver, testDir) @@ -2171,8 +2173,8 @@ func TestRebuildDedupeIndex(t *testing.T) { err := storeDriver.PutContent(context.Background(), fi1.Path(), []byte{}) So(err, ShouldBeNil) - taskScheduler, cancel := runAndGetScheduler() - defer cancel() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) @@ -2185,8 +2187,8 @@ func TestRebuildDedupeIndex(t *testing.T) { err := storeDriver.Delete(context.Background(), fi1.Path()) So(err, ShouldBeNil) - taskScheduler, cancel := runAndGetScheduler() - defer cancel() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), true) defer cleanupStorage(storeDriver, testDir) @@ -2202,8 +2204,8 @@ func TestRebuildDedupeIndex(t *testing.T) { err := storeDriver.PutContent(context.Background(), fi1.Path(), []byte{}) So(err, ShouldBeNil) - taskScheduler, cancel := runAndGetScheduler() - defer cancel() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), true) defer cleanupStorage(storeDriver, testDir) @@ -2224,8 +2226,8 @@ func TestRebuildDedupeIndex(t *testing.T) { err := storeDriver.Delete(context.Background(), imgStore.RootDir()) So(err, ShouldBeNil) - taskScheduler, cancel := runAndGetScheduler() - defer cancel() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() // rebuild with dedupe false, should have all blobs with content imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) @@ -2235,8 +2237,8 @@ func TestRebuildDedupeIndex(t *testing.T) { }) Convey("Rebuild from true to false", func() { - taskScheduler, cancel := runAndGetScheduler() - defer cancel() + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), false) defer cleanupStorage(storeDriver, testDir) @@ -2247,6 +2249,8 @@ func TestRebuildDedupeIndex(t *testing.T) { // wait until rebuild finishes time.Sleep(10 * time.Second) + taskScheduler.Shutdown() + fi2, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", blobDigest2.Encoded())) So(err, ShouldBeNil) diff --git a/pkg/test/common/utils.go b/pkg/test/common/utils.go index 51e17b721..d2f1d57c0 100644 --- a/pkg/test/common/utils.go +++ b/pkg/test/common/utils.go @@ -1,7 +1,6 @@ package common import ( - "context" "errors" "fmt" "math/rand" @@ -62,44 +61,34 @@ func Location(baseURL string, resp *resty.Response) string { } type Controller interface { - Init(ctx context.Context) error - Run(ctx context.Context) error + Init() error + Run() error Shutdown() GetPort() int } type ControllerManager struct { controller Controller - // used to stop background tasks(goroutines) - cancelRoutinesFunc context.CancelFunc } -func (cm *ControllerManager) RunServer(ctx context.Context) { +func (cm *ControllerManager) RunServer() { // Useful to be able to call in the same goroutine for testing purposes - if err := cm.controller.Run(ctx); !errors.Is(err, http.ErrServerClosed) { + if err := cm.controller.Run(); !errors.Is(err, http.ErrServerClosed) { panic(err) } } func (cm *ControllerManager) StartServer() { - ctx, cancel := context.WithCancel(context.Background()) - cm.cancelRoutinesFunc = cancel - - if err := cm.controller.Init(ctx); err != nil { + if err := cm.controller.Init(); err != nil { panic(err) } go func() { - cm.RunServer(ctx) + cm.RunServer() }() } func (cm *ControllerManager) StopServer() { - // stop background tasks - if cm.cancelRoutinesFunc != nil { - cm.cancelRoutinesFunc() - } - cm.controller.Shutdown() } diff --git a/pkg/test/common/utils_test.go b/pkg/test/common/utils_test.go index f9e485b85..9c547d102 100644 --- a/pkg/test/common/utils_test.go +++ b/pkg/test/common/utils_test.go @@ -1,7 +1,6 @@ package common_test import ( - "context" "os" "path" "testing" @@ -53,11 +52,9 @@ func TestControllerManager(t *testing.T) { ctlr := api.NewController(conf) ctlrManager := tcommon.NewControllerManager(ctlr) - ctx := context.Background() - - err := ctlr.Init(ctx) + err := ctlr.Init() So(err, ShouldBeNil) - So(func() { ctlrManager.RunServer(ctx) }, ShouldPanic) + So(func() { ctlrManager.RunServer() }, ShouldPanic) }) } diff --git a/test/blackbox/sync.bats b/test/blackbox/sync.bats index 6ec24f70c..218e21f19 100644 --- a/test/blackbox/sync.bats +++ b/test/blackbox/sync.bats @@ -76,7 +76,7 @@ function setup_file() { ], "onDemand": false, "tlsVerify": false, - "PollInterval": "1s", + "PollInterval": "10s", "content": [ { "prefix": "**" diff --git a/test/blackbox/sync_cloud.bats b/test/blackbox/sync_cloud.bats index a35e76700..0a78e4902 100644 --- a/test/blackbox/sync_cloud.bats +++ b/test/blackbox/sync_cloud.bats @@ -30,7 +30,7 @@ function setup_file() { exit 1 fi - # Download test data to folder common for the entire suite, not just this file + # Download test data to folder common for the entire suite, not just this file skopeo --insecure-policy copy --format=oci docker://ghcr.io/project-zot/golang:1.20 oci:${TEST_DATA_DIR}/golang:1.20 # Setup zot server local zot_sync_per_root_dir=${BATS_FILE_TMPDIR}/zot-per @@ -88,7 +88,7 @@ function setup_file() { ], "onDemand": false, "tlsVerify": false, - "PollInterval": "1s", + "PollInterval": "10s", "content": [ { "prefix": "**" @@ -197,7 +197,7 @@ function teardown_file() { [ "$status" -eq 0 ] [ $(echo "${lines[-1]}" | jq '.tags[]') = '"1.20"' ] - run sleep 20s + run sleep 30s run curl http://127.0.0.1:${zot_port1}/v2/_catalog [ "$status" -eq 0 ] diff --git a/test/blackbox/sync_replica_cluster.bats b/test/blackbox/sync_replica_cluster.bats index 37291d044..1032f11a2 100644 --- a/test/blackbox/sync_replica_cluster.bats +++ b/test/blackbox/sync_replica_cluster.bats @@ -69,7 +69,7 @@ function setup_file() { ], "onDemand": false, "tlsVerify": false, - "PollInterval": "1s", + "PollInterval": "10s", "content": [ { "prefix": "**" @@ -105,7 +105,7 @@ EOF ], "onDemand": false, "tlsVerify": false, - "PollInterval": "1s", + "PollInterval": "10s", "content": [ { "prefix": "**"