Skip to content

Commit

Permalink
Sync s3 (#2073)
Browse files Browse the repository at this point in the history
* feat(sync): local tmp store

Signed-off-by: a <[email protected]>

* fix(sync): various fixes for s3+remote storage feature

Signed-off-by: Petu Eusebiu <[email protected]>

---------

Signed-off-by: a <[email protected]>
Signed-off-by: Petu Eusebiu <[email protected]>
Co-authored-by: a <[email protected]>
  • Loading branch information
eusebiu-constantin-petu-dbk and elee1766 authored Nov 28, 2023
1 parent 0de2210 commit 3c8da6e
Show file tree
Hide file tree
Showing 12 changed files with 934 additions and 61 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ run-blackbox-ci: check-blackbox-prerequisites binary binary-minimal cli
run-blackbox-cloud-ci: check-blackbox-prerequisites check-awslocal binary $(BATS)
echo running cloud CI bats tests; \
$(BATS) $(BATS_FLAGS) test/blackbox/cloud_only.bats
$(BATS) $(BATS_FLAGS) test/blackbox/sync_cloud.bats

.PHONY: run-blackbox-dedupe-nightly
run-blackbox-dedupe-nightly: check-blackbox-prerequisites check-awslocal binary binary-minimal
Expand Down
49 changes: 49 additions & 0 deletions examples/config-sync-cloud-storage.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
{
"distSpecVersion": "1.1.0-dev",
"storage": {
"rootDirectory": "/tmp/zot",
"dedupe": true,
"gc": true,
"remoteCache": true,
"storageDriver": {
"name": "s3",
"rootdirectory": "/zot",
"region": "us-east-2",
"bucket": "zot-storage",
"secure": true,
"skipverify": false
},
"cacheDriver": {
"name": "dynamodb",
"region": "us-east-2",
"cacheTablename": "BlobTable"
}
},
"http": {
"address": "0.0.0.0",
"port": "8080"
},
"log": {
"level": "debug"
},
"extensions": {
"sync": {
"downloadDir": "/tmp/sync",
"registries": [
{
"urls": [
"http://localhost:5000"
],
"onDemand": false,
"tlsVerify": false,
"PollInterval": "30m",
"content": [
{
"prefix": "**"
}
]
}
]
}
}
}
204 changes: 204 additions & 0 deletions pkg/cli/server/extensions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1653,3 +1653,207 @@ func TestOverlappingSyncRetentionConfig(t *testing.T) {
So(string(data), ShouldContainSubstring, "overlapping sync content\":{\"Prefix\":\"prod/*")
})
}

func TestSyncWithRemoteStorageConfig(t *testing.T) {
oldArgs := os.Args

defer func() { os.Args = oldArgs }()

Convey("Test verify sync with remote storage works if sync.tmpdir is provided", t, func(c C) {
tmpfile, err := os.CreateTemp("", "zot-test*.json")
So(err, ShouldBeNil)
defer os.Remove(tmpfile.Name()) // clean up

content := `{
"distSpecVersion": "1.1.0-dev",
"storage": {
"rootDirectory": "%s",
"dedupe": false,
"remoteCache": false,
"storageDriver": {
"name": "s3",
"rootdirectory": "/zot",
"region": "us-east-2",
"regionendpoint": "localhost:4566",
"bucket": "zot-storage",
"secure": false,
"skipverify": false
}
},
"http": {
"address": "0.0.0.0",
"port": "%s"
},
"log": {
"level": "debug",
"output": "%s"
},
"extensions": {
"sync": {
"downloadDir": "/tmp/sync",
"registries": [
{
"urls": [
"http://localhost:9000"
],
"onDemand": true,
"tlsVerify": false,
"content": [
{
"prefix": "**"
}
]
}
]
}
}
}`

logPath, err := runCLIWithConfig(t.TempDir(), content)
So(err, ShouldBeNil)

data, err := os.ReadFile(logPath)
So(err, ShouldBeNil)
defer os.Remove(logPath) // clean up
So(string(data), ShouldNotContainSubstring,
"using both sync and remote storage features needs config.Extensions.Sync.DownloadDir to be specified")
})

Convey("Test verify sync with remote storage panics if sync.tmpdir is not provided", t, func(c C) {
port := GetFreePort()
logFile, err := os.CreateTemp("", "zot-log*.txt")
So(err, ShouldBeNil)
defer os.Remove(logFile.Name()) // clean up

tmpfile, err := os.CreateTemp("", "zot-test*.json")
So(err, ShouldBeNil)
defer os.Remove(tmpfile.Name()) // clean up
content := fmt.Sprintf(`{
"distSpecVersion": "1.1.0-dev",
"storage": {
"rootDirectory": "%s",
"dedupe": false,
"remoteCache": false,
"storageDriver": {
"name": "s3",
"rootdirectory": "/zot",
"region": "us-east-2",
"regionendpoint": "localhost:4566",
"bucket": "zot-storage",
"secure": false,
"skipverify": false
}
},
"http": {
"address": "0.0.0.0",
"port": "%s"
},
"log": {
"level": "debug",
"output": "%s"
},
"extensions": {
"sync": {
"registries": [
{
"urls": [
"http://localhost:9000"
],
"onDemand": true,
"tlsVerify": false,
"content": [
{
"prefix": "**"
}
]
}
]
}
}
}`, t.TempDir(), port, logFile.Name())

err = os.WriteFile(tmpfile.Name(), []byte(content), 0o0600)
So(err, ShouldBeNil)

os.Args = []string{"cli_test", "serve", tmpfile.Name()}
err = cli.NewServerRootCmd().Execute()
So(err, ShouldNotBeNil)

data, err := os.ReadFile(logFile.Name())
So(err, ShouldBeNil)
defer os.Remove(logFile.Name()) // clean up
So(string(data), ShouldContainSubstring,
"using both sync and remote storage features needs config.Extensions.Sync.DownloadDir to be specified")
})

Convey("Test verify sync with remote storage on subpath panics if sync.tmpdir is not provided", t, func(c C) {
port := GetFreePort()
logFile, err := os.CreateTemp("", "zot-log*.txt")
So(err, ShouldBeNil)
defer os.Remove(logFile.Name()) // clean up

tmpfile, err := os.CreateTemp("", "zot-test*.json")
So(err, ShouldBeNil)
defer os.Remove(tmpfile.Name()) // clean up
content := fmt.Sprintf(`{
"distSpecVersion": "1.1.0-dev",
"storage": {
"rootDirectory": "%s",
"subPaths":{
"/a": {
"rootDirectory": "%s",
"dedupe": false,
"remoteCache": false,
"storageDriver":{
"name":"s3",
"rootdirectory":"/zot-a",
"region":"us-east-2",
"bucket":"zot-storage",
"secure":true,
"skipverify":true
}
}
}
},
"http": {
"address": "0.0.0.0",
"port": "%s"
},
"log": {
"level": "debug",
"output": "%s"
},
"extensions": {
"sync": {
"registries": [
{
"urls": [
"http://localhost:9000"
],
"onDemand": true,
"tlsVerify": false,
"content": [
{
"prefix": "**"
}
]
}
]
}
}
}`, t.TempDir(), t.TempDir(), port, logFile.Name())

err = os.WriteFile(tmpfile.Name(), []byte(content), 0o0600)
So(err, ShouldBeNil)

os.Args = []string{"cli_test", "serve", tmpfile.Name()}
err = cli.NewServerRootCmd().Execute()
So(err, ShouldNotBeNil)

data, err := os.ReadFile(logFile.Name())
So(err, ShouldBeNil)
defer os.Remove(logFile.Name()) // clean up
So(string(data), ShouldContainSubstring,
"using both sync and remote storage features needs config.Extensions.Sync.DownloadDir to be specified")
})
}
15 changes: 12 additions & 3 deletions pkg/cli/server/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,9 +392,10 @@ func validateConfiguration(config *config.Config, log zlog.Logger) error {
return zerr.ErrBadConfig
}

// enforce filesystem storage in case sync feature is enabled
if config.Extensions != nil && config.Extensions.Sync != nil {
log.Error().Err(zerr.ErrBadConfig).Msg("sync supports only filesystem storage")
// enforce tmpDir in case sync + s3
if config.Extensions != nil && config.Extensions.Sync != nil && config.Extensions.Sync.DownloadDir == "" {
log.Error().Err(zerr.ErrBadConfig).
Msg("using both sync and remote storage features needs config.Extensions.Sync.DownloadDir to be specified")

return zerr.ErrBadConfig
}
Expand All @@ -413,6 +414,14 @@ func validateConfiguration(config *config.Config, log zlog.Logger) error {

return zerr.ErrBadConfig
}

// enforce tmpDir in case sync + s3
if config.Extensions != nil && config.Extensions.Sync != nil && config.Extensions.Sync.DownloadDir == "" {
log.Error().Err(zerr.ErrBadConfig).
Msg("using both sync and remote storage features needs config.Extensions.Sync.DownloadDir to be specified")

return zerr.ErrBadConfig
}
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/extensions/config/sync/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ type Credentials struct {
type Config struct {
Enable *bool
CredentialsFile string
Registries []RegistryConfig
/* DownloadDir is needed only in case of using cloud based storages
it uses regclient to first copy images into this dir (as oci layout)
and then move them into storage. */
DownloadDir string
Registries []RegistryConfig
}

type RegistryConfig struct {
Expand Down
34 changes: 19 additions & 15 deletions pkg/extensions/extension_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,27 @@ func EnableSyncExtension(config *config.Config, metaDB mTypes.MetaDB,
isPeriodical := len(registryConfig.Content) != 0 && registryConfig.PollInterval != 0
isOnDemand := registryConfig.OnDemand

if isPeriodical || isOnDemand {
service, err := sync.New(registryConfig, config.Extensions.Sync.CredentialsFile,
storeController, metaDB, log)
if err != nil {
return nil, err
}
if !(isPeriodical || isOnDemand) {
continue
}

if isPeriodical {
// add to task scheduler periodic sync
gen := sync.NewTaskGenerator(service, log)
sch.SubmitGenerator(gen, registryConfig.PollInterval, scheduler.MediumPriority)
}
tmpDir := config.Extensions.Sync.DownloadDir
credsPath := config.Extensions.Sync.CredentialsFile

if isOnDemand {
// onDemand services used in routes.go
onDemand.Add(service)
}
service, err := sync.New(registryConfig, credsPath, tmpDir, storeController, metaDB, log)
if err != nil {
return nil, err
}

if isPeriodical {
// add to task scheduler periodic sync
gen := sync.NewTaskGenerator(service, log)
sch.SubmitGenerator(gen, registryConfig.PollInterval, scheduler.MediumPriority)
}

if isOnDemand {
// onDemand services used in routes.go
onDemand.Add(service)
}
}

Expand Down
Loading

0 comments on commit 3c8da6e

Please sign in to comment.