Skip to content

Commit

Permalink
fix(scheduler): data race when pushing new tasks
Browse files Browse the repository at this point in the history
the problem here is that scheduler can be closed in two ways:
- canceling the context given as argument to scheduler.RunScheduler()
- running scheduler.Shutdown()

because of this shutdown can trigger a data race between calling scheduler.inShutdown()
and actually pushing tasks into the pool workers

solved that by keeping a quit channel and listening on both quit channel and ctx.Done()
and closing the worker chan and scheduler afterwards.

Signed-off-by: Petu Eusebiu <[email protected]>
  • Loading branch information
eusebiu-constantin-petu-dbk committed Dec 5, 2023
1 parent e3bd9a8 commit 16096f1
Show file tree
Hide file tree
Showing 11 changed files with 329 additions and 118 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ test-prereq: check-skopeo $(TESTDATA) $(ORAS)
.PHONY: test-extended
test-extended: $(if $(findstring ui,$(BUILD_LABELS)), ui)
test-extended: test-prereq
go test -failfast -tags $(BUILD_LABELS),containers_image_openpgp -trimpath -race -timeout 15m -cover -coverpkg ./... -coverprofile=coverage-extended.txt -covermode=atomic ./...
go test -failfast -tags $(BUILD_LABELS),containers_image_openpgp -trimpath -race -timeout 20m -cover -coverpkg ./... -coverprofile=coverage-extended.txt -covermode=atomic ./...
rm -rf /tmp/getter*; rm -rf /tmp/trivy*

.PHONY: test-minimal
Expand Down
10 changes: 5 additions & 5 deletions pkg/extensions/extension_image_trust_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func RunSignatureUploadAndVerificationTests(t *testing.T, cacheDriverParams map[
[]string{fmt.Sprintf("localhost:%s/%s@%s", port, repo, image.DigestStr())})
So(err, ShouldBeNil)

found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 10*time.Second)
found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 30*time.Second)
So(err, ShouldBeNil)
So(found, ShouldBeTrue)

Expand Down Expand Up @@ -369,7 +369,7 @@ func RunSignatureUploadAndVerificationTests(t *testing.T, cacheDriverParams map[
err = signature.SignWithNotation(certName, imageURL, rootDir, true)
So(err, ShouldBeNil)

found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 10*time.Second)
found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 30*time.Second)
So(err, ShouldBeNil)
So(found, ShouldBeTrue)

Expand Down Expand Up @@ -502,7 +502,7 @@ func RunSignatureUploadAndVerificationTests(t *testing.T, cacheDriverParams map[
err = signature.SignWithNotation(certName, imageURL, rootDir, false)
So(err, ShouldBeNil)

found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 10*time.Second)
found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 30*time.Second)
So(err, ShouldBeNil)
So(found, ShouldBeTrue)

Expand Down Expand Up @@ -672,7 +672,7 @@ func RunSignatureUploadAndVerificationTests(t *testing.T, cacheDriverParams map[
[]string{fmt.Sprintf("localhost:%s/%s@%s", port, repo, image.DigestStr())})
So(err, ShouldBeNil)

found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 10*time.Second)
found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 30*time.Second)
So(err, ShouldBeNil)
So(found, ShouldBeTrue)

Expand Down Expand Up @@ -883,7 +883,7 @@ func RunSignatureUploadAndVerificationTests(t *testing.T, cacheDriverParams map[
So(err, ShouldBeNil)
So(found, ShouldBeTrue)

found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 10*time.Second)
found, err = test.ReadLogFileAndSearchString(logFile.Name(), "update signatures validity", 30*time.Second)
So(err, ShouldBeNil)
So(found, ShouldBeTrue)

Expand Down
18 changes: 6 additions & 12 deletions pkg/extensions/scrub/scrub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,11 @@ func TestScrubExtension(t *testing.T) {

cm := test.NewControllerManager(ctlr)
cm.StartAndWait(port)
time.Sleep(6 * time.Second)

defer cm.StopServer()

data, err := os.ReadFile(logFile.Name())
found, err := test.ReadLogFileAndSearchString(logFile.Name(), "scrub: blobs/manifest ok", 60*time.Second)
So(found, ShouldBeTrue)
So(err, ShouldBeNil)
So(string(data), ShouldContainSubstring, "scrub: blobs/manifest ok")
})

Convey("Blobs integrity affected", t, func(c C) {
Expand Down Expand Up @@ -122,13 +120,11 @@ func TestScrubExtension(t *testing.T) {

cm := test.NewControllerManager(ctlr)
cm.StartAndWait(port)
time.Sleep(6 * time.Second)

defer cm.StopServer()

data, err := os.ReadFile(logFile.Name())
found, err := test.ReadLogFileAndSearchString(logFile.Name(), "scrub: blobs/manifest affected", 60*time.Second)
So(found, ShouldBeTrue)
So(err, ShouldBeNil)
So(string(data), ShouldContainSubstring, "scrub: blobs/manifest affected")
})

Convey("Generator error - not enough permissions to access root directory", t, func(c C) {
Expand Down Expand Up @@ -170,13 +166,11 @@ func TestScrubExtension(t *testing.T) {

cm := test.NewControllerManager(ctlr)
cm.StartAndWait(port)
time.Sleep(6 * time.Second)

defer cm.StopServer()

data, err := os.ReadFile(logFile.Name())
found, err := test.ReadLogFileAndSearchString(logFile.Name(), "error while executing generator", 60*time.Second)
So(found, ShouldBeTrue)
So(err, ShouldBeNil)
So(string(data), ShouldContainSubstring, "error while executing generator")

So(os.Chmod(path.Join(dir, repoName), 0o755), ShouldBeNil)
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/extensions/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func New(
storeController storage.StoreController,
metadb mTypes.MetaDB,
log log.Logger,
) (Service, error) {
) (*BaseService, error) {
service := &BaseService{}

service.config = opts
Expand Down
23 changes: 23 additions & 0 deletions pkg/extensions/sync/sync_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,29 @@ func TestService(t *testing.T) {
})
}

func TestSyncRepo(t *testing.T) {
Convey("trigger context error", t, func() {
conf := syncconf.RegistryConfig{
URLs: []string{"http://localhost"},
}

service, err := New(conf, "", os.TempDir(), storage.StoreController{}, mocks.MetaDBMock{}, log.Logger{})
So(err, ShouldBeNil)

service.remote = mocks.SyncRemote{
GetRepoTagsFn: func(repo string) ([]string, error) {
return []string{"repo1", "repo2"}, nil
},
}

ctx, cancel := context.WithCancel(context.Background())
cancel()

err = service.SyncRepo(ctx, "repo")
So(err, ShouldEqual, ctx.Err())
})
}

func TestDestinationRegistry(t *testing.T) {
Convey("make StoreController", t, func() {
dir := t.TempDir()
Expand Down
136 changes: 64 additions & 72 deletions pkg/extensions/sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4030,6 +4030,70 @@ func TestPeriodicallySignaturesErr(t *testing.T) {
So(err, ShouldBeNil)
So(len(index.Manifests), ShouldEqual, 0)
})

Convey("of type OCI image, error on downstream in canSkipReference()", func() { //nolint: dupl
// start downstream server
updateDuration, err = time.ParseDuration("1s")
So(err, ShouldBeNil)

syncConfig.Registries[0].PollInterval = updateDuration
dctlr, destBaseURL, destDir, _ := makeDownstreamServer(t, false, syncConfig)

dcm := test.NewControllerManager(dctlr)
dcm.StartAndWait(dctlr.Config.HTTP.Port)
defer dcm.StopServer()

found, err := test.ReadLogFileAndSearchString(dctlr.Config.Log.Output,
"finished syncing all repos", 15*time.Second)
if err != nil {
panic(err)
}

if !found {
data, err := os.ReadFile(dctlr.Config.Log.Output)
So(err, ShouldBeNil)

t.Logf("downstream log: %s", string(data))
}

So(found, ShouldBeTrue)

blob := referrers.Manifests[0]
blobsDir := path.Join(destDir, repoName, "blobs", string(blob.Digest.Algorithm()))
blobPath := path.Join(blobsDir, blob.Digest.Encoded())
err = os.MkdirAll(blobsDir, storageConstants.DefaultDirPerms)
So(err, ShouldBeNil)
err = os.WriteFile(blobPath, []byte("blob"), storageConstants.DefaultFilePerms)
So(err, ShouldBeNil)
err = os.Chmod(blobPath, 0o000)
So(err, ShouldBeNil)

// should not be synced nor sync on demand
resp, err = resty.R().
SetHeader("Content-Type", "application/json").
Get(destBaseURL + artifactURLPath)
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, http.StatusOK)

var index ispec.Index

err = json.Unmarshal(resp.Body(), &index)
So(err, ShouldBeNil)
So(len(index.Manifests), ShouldEqual, 0)

found, err = test.ReadLogFileAndSearchString(dctlr.Config.Log.Output,
"couldn't check if the upstream oci references for image can be skipped", 30*time.Second)
if err != nil {
panic(err)
}

if !found {
data, err := os.ReadFile(dctlr.Config.Log.Output)
So(err, ShouldBeNil)

t.Logf("downstream log: %s", string(data))
}
})
})
})
}
Expand Down Expand Up @@ -5200,78 +5264,6 @@ func TestOnDemandPullsOnce(t *testing.T) {
})
}

func TestError(t *testing.T) {
Convey("Verify periodically sync pushSyncedLocalImage() error", t, func() {
updateDuration, _ := time.ParseDuration("30m")

sctlr, srcBaseURL, _, _, _ := makeUpstreamServer(t, false, false)

scm := test.NewControllerManager(sctlr)
scm.StartAndWait(sctlr.Config.HTTP.Port)
defer scm.StopServer()

regex := ".*"
semver := true
var tlsVerify bool

syncRegistryConfig := syncconf.RegistryConfig{
Content: []syncconf.Content{
{
Prefix: testImage,
Tags: &syncconf.Tags{
Regex: &regex,
Semver: &semver,
},
},
},
URLs: []string{srcBaseURL},
PollInterval: updateDuration,
TLSVerify: &tlsVerify,
CertDir: "",
}

defaultVal := true
syncConfig := &syncconf.Config{
Enable: &defaultVal,
Registries: []syncconf.RegistryConfig{syncRegistryConfig},
}

dctlr, _, destDir, _ := makeDownstreamServer(t, false, syncConfig)

dcm := test.NewControllerManager(dctlr)
dcm.StartAndWait(dctlr.Config.HTTP.Port)
defer dcm.StopServer()

// give permission denied on pushSyncedLocalImage()
localRepoPath := path.Join(destDir, testImage, "blobs")
err := os.MkdirAll(localRepoPath, 0o755)
So(err, ShouldBeNil)

err = os.Chmod(localRepoPath, 0o000)
So(err, ShouldBeNil)

defer func() {
err = os.Chmod(localRepoPath, 0o755)
So(err, ShouldBeNil)
}()

found, err := test.ReadLogFileAndSearchString(dctlr.Config.Log.Output,
"couldn't commit image to local image store", 30*time.Second)
if err != nil {
panic(err)
}

if !found {
data, err := os.ReadFile(dctlr.Config.Log.Output)
So(err, ShouldBeNil)

t.Logf("downstream log: %s", string(data))
}

So(found, ShouldBeTrue)
})
}

func TestSignaturesOnDemand(t *testing.T) {
Convey("Verify sync signatures on demand feature", t, func() {
sctlr, srcBaseURL, srcDir, _, _ := makeUpstreamServer(t, false, false)
Expand Down
4 changes: 2 additions & 2 deletions pkg/meta/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ func RunMetaDBTests(t *testing.T, metaDB mTypes.MetaDB, preparationFuncs ...func
})

Convey("Test API keys with short expiration date", func() {
expirationDate := time.Now().Add(500 * time.Millisecond).Local().Round(time.Millisecond)
expirationDate := time.Now().Add(1 * time.Second)
apiKeyDetails.ExpirationDate = expirationDate

userAc := reqCtx.NewUserAccessControl()
Expand All @@ -435,7 +435,7 @@ func RunMetaDBTests(t *testing.T, metaDB mTypes.MetaDB, preparationFuncs ...func
So(isExpired, ShouldBeFalse)
So(err, ShouldBeNil)

time.Sleep(600 * time.Millisecond)
time.Sleep(1 * time.Second)

Convey("GetUserAPIKeys detects api key expired", func() {
storedAPIKeys, err = metaDB.GetUserAPIKeys(ctx)
Expand Down
Loading

0 comments on commit 16096f1

Please sign in to comment.