forked from lytics/cloudstorage
-
Notifications
You must be signed in to change notification settings - Fork 0
/
store.go
357 lines (324 loc) · 11.8 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
package cloudstorage
import (
"encoding/base64"
"fmt"
"io"
"os"
"strings"
"time"
"github.com/araddon/gou"
"golang.org/x/net/context"
)
const (
// StoreCacheFileExt = ".cache"
StoreCacheFileExt = ".cache"
// ContentTypeKey
ContentTypeKey = "content_type"
// MaxResults default number of objects to retrieve during a list-objects request,
// if more objects exist, then they will need to be paged
MaxResults = 3000
)
// AccessLevel is the level of permissions on files
type AccessLevel int
const (
// ReadOnly File Permissions Levels
ReadOnly AccessLevel = 0
ReadWrite AccessLevel = 1
)
var (
// ErrObjectNotFound Error of not finding a file(object)
ErrObjectNotFound = fmt.Errorf("object not found")
// ErrObjectExists error trying to create an already existing file.
ErrObjectExists = fmt.Errorf("object already exists in backing store (use store.Get)")
// ErrNotImplemented this feature is not implemented for this store
ErrNotImplemented = fmt.Errorf("Not implemented")
)
type (
Opts struct {
IfNotExists bool
}
// StoreReader interface to define the Storage Interface abstracting
// the GCS, S3, LocalFile, etc interfaces
StoreReader interface {
// Type is he Store Type [google, s3, azure, localfs, etc]
Type() string
// Client gets access to the underlying native Client for Google, S3, etc
Client() interface{}
// Get returns an object (file) from the cloud store. The object
// isn't opened already, see Object.Open()
// ObjectNotFound will be returned if the object is not found.
Get(ctx context.Context, o string) (Object, error)
// Objects returns an object Iterator to allow paging through object
// which keeps track of page cursors. Query defines the specific set
// of filters to apply to request.
Objects(ctx context.Context, q Query) (ObjectIterator, error)
// List file/objects filter by given query. This just wraps the object-iterator
// returning full list of objects.
List(ctx context.Context, q Query) (*ObjectsResponse, error)
// Folders creates list of folders
Folders(ctx context.Context, q Query) ([]string, error)
// NewReader creates a new Reader to read the contents of the object.
// ErrObjectNotFound will be returned if the object is not found.
NewReader(o string) (io.ReadCloser, error)
// NewReader with context (for cancelation, etc)
NewReaderWithContext(ctx context.Context, o string) (io.ReadCloser, error)
// String default descriptor.
String() string
}
// StoreCopy Optional interface to fast path copy. Many of the cloud providers
// don't actually copy bytes. Rather they allow a "pointer" that is a fast copy.
StoreCopy interface {
// Copy from object, to object
Copy(ctx context.Context, src, dst Object) error
}
// StoreMove Optional interface to fast path move. Many of the cloud providers
// don't actually copy bytes.
StoreMove interface {
// Move from object location, to object location.
Move(ctx context.Context, src, dst Object) error
}
// Store interface to define the Storage Interface abstracting
// the GCS, S3, LocalFile interfaces
Store interface {
StoreReader
// NewWriter returns a io.Writer that writes to a Cloud object
// associated with this backing Store object.
//
// A new object will be created if an object with this name already exists.
// Otherwise any previous object with the same name will be replaced.
// The object will not be available (and any previous object will remain)
// until Close has been called
NewWriter(o string, metadata map[string]string) (io.WriteCloser, error)
// NewWriter but with context.
NewWriterWithContext(ctx context.Context, o string, metadata map[string]string, opts ...Opts) (io.WriteCloser, error)
// NewObject creates a new empty object backed by the cloud store
// This new object isn't' synced/created in the backing store
// until the object is Closed/Sync'ed.
NewObject(o string) (Object, error)
// Delete removes the object from the cloud store.
Delete(ctx context.Context, o string) error
}
// Object is a handle to a cloud stored file/object. Calling Open will pull the remote file onto
// your local filesystem for reading/writing. Calling Sync/Close will push the local copy
// backup to the cloud store.
Object interface {
// Name of object/file.
Name() string
// String is default descriptor.
String() string
// Updated timestamp.
Updated() time.Time
// MetaData is map of arbitrary name/value pairs about object.
MetaData() map[string]string
// SetMetaData allows you to set key/value pairs.
SetMetaData(meta map[string]string)
// StorageSource is the type of store.
StorageSource() string
// Open copies the remote file to a local cache and opens the cached version
// for read/writing. Calling Close/Sync will push the copy back to the
// backing store.
Open(readonly AccessLevel) (*os.File, error)
// Release will remove the locally cached copy of the file. You most call Close
// before releasing. Release will call os.Remove(local_copy_file) so opened
// filehandles need to be closed.
Release() error
// Implement io.ReadWriteCloser Open most be called before using these
// functions.
Read(p []byte) (n int, err error)
Write(p []byte) (n int, err error)
Sync() error
Close() error
// File returns the cached/local copy of the file
File() *os.File
// Delete removes the object from the cloud store and local cache.
Delete() error
}
// ObjectIterator interface to page through objects
// See go doc for examples https://github.com/GoogleCloudPlatform/google-cloud-go/wiki/Iterator-Guidelines
ObjectIterator interface {
// Next gets next object, returns google.golang.org/api/iterator iterator.Done error.
Next() (Object, error)
// Close this down (and or context.Close)
Close()
}
// ObjectsResponse for paged object apis.
ObjectsResponse struct {
Objects Objects
NextMarker string
}
// Objects are just a collection of Object(s).
// Used as the results for store.List commands.
Objects []Object
// AuthMethod Is the source/location/type of auth token
AuthMethod string
// Config the cloud store config settings.
Config struct {
// Type is StoreType [gcs,localfs,s3,azure]
Type string
// AuthMethod the methods of authenticating store. Ie, where/how to
// find auth tokens.
AuthMethod AuthMethod
// Cloud Bucket Project
Project string
// Region is the cloud region
Region string
// Endpoint is the api endpoint
Endpoint string
// Bucket is the "path" or named bucket in cloud
Bucket string
// the page size to use with api requests (default 1000)
PageSize int
// used by JWTKeySource
JwtConf *JwtConf
// JwtFile is the file-path to local auth-token file.
JwtFile string `json:"jwtfile,omitempty"`
// BaseUrl is the base-url path for customizing regions etc. IE
// AWS has different url paths per region on some situations.
BaseUrl string `json:"baseurl,omitempty"`
// Permissions scope
Scope string `json:"scope,omitempty"`
// LocalFS is filesystem path to use for the local files
// for Type=localfs
LocalFS string `json:"localfs,omitempty"`
// The filesystem path to save locally cached files as they are
// being read/written from cloud and need a staging area.
TmpDir string `json:"tmpdir,omitempty"`
// Settings are catch-all-bag to allow per-implementation over-rides
Settings gou.JsonHelper `json:"settings,omitempty"`
// LogPrefix Logging Prefix/Context message
LogPrefix string
// EnableCompression turns on transparent compression of objects
// Reading pre-existing non-compressed objects continues to work
EnableCompression bool `json:"enablecompression,omitempty"`
}
// JwtConf For use with google/google_jwttransporter.go
// Which can be used by the google go sdk's. This struct is based on the Google
// Jwt files json for service accounts.
JwtConf struct {
// Unfortuneately we departed from the standard jwt service account field-naming
// for reasons we forgot. So, during load, we convert from bad->correct format.
PrivateKeyDeprecated string `json:"private_keybase64,omitempty"`
KeyTypeDeprecated string `json:"keytype,omitempty"`
// Jwt Service Account Fields
ProjectID string `json:"project_id,omitempty"`
PrivateKeyID string `json:"private_key_id,omitempty"`
PrivateKey string `json:"private_key,omitempty"`
ClientEmail string `json:"client_email,omitempty"`
ClientID string `json:"client_id,omitempty"`
Type string `json:"type,omitempty"`
// Scopes is list of what scope to use when the token is created.
// for example https://github.com/google/google-api-go-client/blob/0d3983fb069cb6651353fc44c5cb604e263f2a93/storage/v1/storage-gen.go#L54
Scopes []string `json:"scopes,omitempty"`
}
)
// NewStore create new Store from Storage Config/Context.
func NewStore(conf *Config) (Store, error) {
if conf.Type == "" {
return nil, fmt.Errorf("Type is required on Config")
}
registryMu.RLock()
st, ok := storeProviders[conf.Type]
registryMu.RUnlock()
if !ok {
return nil, fmt.Errorf("config.Type=%q was not found", conf.Type)
}
if conf.PageSize == 0 {
conf.PageSize = MaxResults
}
if conf.TmpDir == "" {
conf.TmpDir = os.TempDir()
}
return st(conf)
}
// Copy source to destination.
func Copy(ctx context.Context, s Store, src, des Object) error {
// for Providers that offer fast path, and use the backend copier
if src.StorageSource() == des.StorageSource() {
if cp, ok := s.(StoreCopy); ok {
return cp.Copy(ctx, src, des)
}
}
// Slow path, open an io.Reader from the source and copy it to an
// io.Writer to the destination. This is considered a "slow path" because we
// have to act as a broker to relay bytes between the two objects. Some
// stores support moving data using an API call.
fout, err := s.NewWriterWithContext(ctx, des.Name(), src.MetaData())
if err != nil {
gou.Warnf("Move could not open destination %v", src.Name())
return err
}
fin, err := s.NewReaderWithContext(ctx, src.Name())
if err != nil {
gou.Warnf("Move could not open source %v err=%v", src.Name(), err)
return err
}
if _, err = io.Copy(fout, fin); err != nil {
return err
}
if err := fin.Close(); err != nil {
return err
}
if err := fout.Close(); err != nil { //this will flush and sync the file.
return err
}
return nil
}
// Move source object to destination.
func Move(ctx context.Context, s Store, src, des Object) error {
// take the fast path, and use the store provided mover if available
if src.StorageSource() == des.StorageSource() {
if sm, ok := s.(StoreMove); ok {
return sm.Move(ctx, src, des)
}
}
if err := Copy(ctx, s, src, des); err != nil { // use Copy() to copy the files
return err
}
if err := src.Delete(); err != nil { //delete the src, after des has been flushed/synced
return err
}
return nil
}
func NewObjectsResponse() *ObjectsResponse {
return &ObjectsResponse{
Objects: make(Objects, 0),
}
}
func (o Objects) Len() int { return len(o) }
func (o Objects) Less(i, j int) bool { return o[i].Name() < o[j].Name() }
func (o Objects) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
// Validate that this is a valid jwt conf set of tokens
func (j *JwtConf) Validate() error {
if j.PrivateKeyDeprecated != "" {
j.PrivateKey = j.PrivateKeyDeprecated
j.PrivateKeyDeprecated = ""
}
j.fixKey()
if j.KeyTypeDeprecated != "" {
j.Type = j.KeyTypeDeprecated
j.KeyTypeDeprecated = ""
}
_, err := j.KeyBytes()
if err != nil {
return fmt.Errorf("Invalid JwtConf.PrivateKeyBase64 (error trying to decode base64 err: %v", err)
}
return nil
}
func (j *JwtConf) fixKey() {
parts := strings.Split(j.PrivateKey, "\n")
if len(parts) > 1 {
for _, part := range parts {
if strings.HasPrefix(part, "---") {
continue
}
j.PrivateKey = part
break
}
}
}
func (j *JwtConf) KeyBytes() ([]byte, error) {
if j.PrivateKey == "" {
return nil, fmt.Errorf("invalid config, private key empty")
}
return base64.StdEncoding.DecodeString(j.PrivateKey)
}