forked from weaveworks/libgitops
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgitdir.go
474 lines (407 loc) · 14.2 KB
/
gitdir.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
package gitdir
import (
"context"
"errors"
"fmt"
"io/ioutil"
"os"
"sync"
"time"
"github.com/fluxcd/go-git-providers/gitprovider"
git "github.com/go-git/go-git/v5"
"github.com/go-git/go-git/v5/plumbing"
"github.com/go-git/go-git/v5/plumbing/object"
log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/wait"
)
var (
// ErrNotStarted happens if you try to operate on the gitDirectory before you have started
// it with StartCheckoutLoop.
ErrNotStarted = errors.New("the gitDirectory hasn't been started (and hence, cloned) yet")
// ErrCannotWriteToReadOnly happens if you try to do a write operation for a non-authenticated Git repo.
ErrCannotWriteToReadOnly = errors.New("the gitDirectory is read-only, cannot write")
)
const (
defaultBranch = "master"
defaultRemote = "origin"
defaultInterval = 30 * time.Second
defaultTimeout = 1 * time.Minute
)
// GitDirectoryOptions provides options for the gitDirectory.
// TODO: Refactor this into the controller-runtime Options factory pattern.
type GitDirectoryOptions struct {
// Options
Branch string // default "master"
Interval time.Duration // default 30s
Timeout time.Duration // default 1m
// TODO: Support folder prefixes
// Authentication
AuthMethod AuthMethod
}
func (o *GitDirectoryOptions) Default() {
if o.Branch == "" {
o.Branch = defaultBranch
}
if o.Interval == 0 {
o.Interval = defaultInterval
}
if o.Timeout == 0 {
o.Timeout = defaultTimeout
}
}
// GitDirectory is an abstraction layer for a temporary Git clone. It pulls
// and checks out new changes periodically in the background. It also allows
// high-level access to write operations, like creating a new branch, committing,
// and pushing.
type GitDirectory interface {
// Dir returns the backing temporary directory of the git clone.
Dir() string
// MainBranch returns the configured main branch.
MainBranch() string
// RepositoryRef returns the repository reference.
RepositoryRef() gitprovider.RepositoryRef
// StartCheckoutLoop clones the repo synchronously, and then starts the checkout loop non-blocking.
// If the checkout loop has been started already, this is a no-op.
StartCheckoutLoop() error
// Suspend waits for any pending transactions or operations, and then locks the internal mutex so that
// no other operations can start. This means the periodic background checkout loop will momentarily stop.
Suspend()
// Resume unlocks the mutex locked in Suspend(), so that other Git operations, like the background checkout
// loop can resume its operation.
Resume()
// Pull performs a pull & checkout to the latest revision.
// ErrNotStarted is returned if the repo hasn't been cloned yet.
Pull(ctx context.Context) error
// CheckoutNewBranch creates a new branch and checks out to it.
// ErrNotStarted is returned if the repo hasn't been cloned yet.
CheckoutNewBranch(branchName string) error
// CheckoutMainBranch goes back to the main branch.
// ErrNotStarted is returned if the repo hasn't been cloned yet.
CheckoutMainBranch() error
// Commit creates a commit of all changes in the current worktree with the given parameters.
// It also automatically pushes the branch after the commit.
// ErrNotStarted is returned if the repo hasn't been cloned yet.
// ErrCannotWriteToReadOnly is returned if opts.AuthMethod wasn't provided.
Commit(ctx context.Context, authorName, authorEmail, msg string) error
// CommitChannel is a channel to where new observed Git SHAs are written.
CommitChannel() chan string
// Cleanup terminates any pending operations, and removes the temporary directory.
Cleanup() error
}
// Create a new GitDirectory implementation. In order to start using this, run StartCheckoutLoop().
func NewGitDirectory(repoRef gitprovider.RepositoryRef, opts GitDirectoryOptions) (GitDirectory, error) {
log.Info("Initializing the Git repo...")
// Default the options
opts.Default()
// Create a temporary directory for the clone
tmpDir, err := ioutil.TempDir("", "libgitops")
if err != nil {
return nil, err
}
log.Debugf("Created temporary directory for the git clone at %q", tmpDir)
d := &gitDirectory{
repoRef: repoRef,
GitDirectoryOptions: opts,
cloneDir: tmpDir,
// TODO: This needs to be large, otherwise it can start blocking unnecessarily if nobody reads it
commitChan: make(chan string, 1024),
lock: &sync.Mutex{},
}
// Set up the parent context for this class. d.cancel() is called only at Cleanup()
d.ctx, d.cancel = context.WithCancel(context.Background())
log.Trace("URL endpoint parsed and authentication method chosen")
if d.canWrite() {
log.Infof("Running in read-write mode, will commit back current status to the repo")
} else {
log.Infof("Running in read-only mode, won't write status back to the repo")
}
return d, nil
}
// gitDirectory is an implementation which keeps a directory
type gitDirectory struct {
// user-specified options
repoRef gitprovider.RepositoryRef
GitDirectoryOptions
// the temporary directory used for the clone
cloneDir string
// go-git objects. wt is the worktree of the repo, persistent during the lifetime of repo.
repo *git.Repository
wt *git.Worktree
// latest known commit to the system
lastCommit string
// events channel from new commits
commitChan chan string
// the context and its cancel function for the lifetime of this struct (until Cleanup())
ctx context.Context
cancel context.CancelFunc
// the lock for git operations (so pushing and pulling aren't done simultaneously)
lock *sync.Mutex
}
func (d *gitDirectory) Dir() string {
return d.cloneDir
}
func (d *gitDirectory) MainBranch() string {
return d.Branch
}
func (d *gitDirectory) RepositoryRef() gitprovider.RepositoryRef {
return d.repoRef
}
// StartCheckoutLoop clones the repo synchronously, and then starts the checkout loop non-blocking.
// If the checkout loop has been started already, this is a no-op.
func (d *gitDirectory) StartCheckoutLoop() error {
if d.wt != nil {
return nil // already initialized
}
// First, clone the repo
if err := d.clone(); err != nil {
return err
}
go d.checkoutLoop()
return nil
}
func (d *gitDirectory) Suspend() {
d.lock.Lock()
}
func (d *gitDirectory) Resume() {
d.lock.Unlock()
}
func (d *gitDirectory) CommitChannel() chan string {
return d.commitChan
}
func (d *gitDirectory) checkoutLoop() {
log.Info("Starting the checkout loop...")
wait.NonSlidingUntilWithContext(d.ctx, func(_ context.Context) {
log.Trace("checkoutLoop: Will perform pull operation")
// Perform a pull & checkout of the new revision
if err := d.Pull(d.ctx); err != nil {
log.Errorf("checkoutLoop: git pull failed with error: %v", err)
return
}
}, d.Interval)
log.Info("Exiting the checkout loop...")
}
func (d *gitDirectory) cloneURL() string {
return d.repoRef.GetCloneURL(d.AuthMethod.TransportType())
}
func (d *gitDirectory) canWrite() bool {
return d.AuthMethod != nil
}
// verifyRead makes sure it's ok to start a read-something-from-git process
func (d *gitDirectory) verifyRead() error {
// Safeguard against not starting yet
if d.wt == nil {
return fmt.Errorf("cannot pull: %w", ErrNotStarted)
}
return nil
}
// verifyWrite makes sure it's ok to start a write-something-to-git process
func (d *gitDirectory) verifyWrite() error {
// We need all read privileges first
if err := d.verifyRead(); err != nil {
return err
}
// Make sure we don't write to a possibly read-only repo
if !d.canWrite() {
return ErrCannotWriteToReadOnly
}
return nil
}
func (d *gitDirectory) clone() error {
// Lock the mutex now that we're starting, and unlock it when exiting
d.lock.Lock()
defer d.lock.Unlock()
log.Infof("Starting to clone the repository %s with timeout %s", d.repoRef, d.Timeout)
// Do a clone operation to the temporary directory, with a timeout
err := d.contextWithTimeout(d.ctx, func(ctx context.Context) error {
var err error
d.repo, err = git.PlainCloneContext(ctx, d.Dir(), false, &git.CloneOptions{
URL: d.cloneURL(),
Auth: d.AuthMethod,
RemoteName: defaultRemote,
ReferenceName: plumbing.NewBranchReferenceName(d.Branch),
SingleBranch: true,
NoCheckout: false,
//Depth: 1, // ref: https://github.com/src-d/go-git/issues/1143
RecurseSubmodules: 0,
Progress: nil,
Tags: git.NoTags,
})
return err
})
// Handle errors
switch err {
case nil:
// no-op, just continue.
case context.DeadlineExceeded:
return fmt.Errorf("git clone operation took longer than deadline %s", d.Timeout)
case context.Canceled:
log.Tracef("context was cancelled")
return nil // if Cleanup() was called, just exit the goroutine
default:
return fmt.Errorf("git clone error: %v", err)
}
// Populate the worktree pointer
d.wt, err = d.repo.Worktree()
if err != nil {
return fmt.Errorf("git get worktree error: %v", err)
}
// Get the latest HEAD commit and report it to the user
ref, err := d.repo.Head()
if err != nil {
return err
}
d.observeCommit(ref.Hash())
return nil
}
func (d *gitDirectory) Pull(ctx context.Context) error {
// Lock the mutex now that we're starting, and unlock it when exiting
d.lock.Lock()
defer d.lock.Unlock()
// Make sure it's okay to read
if err := d.verifyRead(); err != nil {
return err
}
// Perform the git pull operation using the timeout
err := d.contextWithTimeout(ctx, func(innerCtx context.Context) error {
log.Trace("checkoutLoop: Starting pull operation")
return d.wt.PullContext(innerCtx, &git.PullOptions{
Auth: d.AuthMethod,
SingleBranch: true,
})
})
// Handle errors
switch err {
case nil, git.NoErrAlreadyUpToDate:
// no-op, just continue. Allow the git.NoErrAlreadyUpToDate error
case context.DeadlineExceeded:
return fmt.Errorf("git pull operation took longer than deadline %s", d.Timeout)
case context.Canceled:
log.Tracef("context was cancelled")
return nil // if Cleanup() was called, just exit the goroutine
default:
return fmt.Errorf("failed to pull: %v", err)
}
log.Trace("checkoutLoop: Pulled successfully")
// get current head
ref, err := d.repo.Head()
if err != nil {
return err
}
// check if we changed commits
if d.lastCommit != ref.Hash().String() {
// Notify upstream that we now have a new commit, and allow writing again
d.observeCommit(ref.Hash())
}
return nil
}
func (d *gitDirectory) CheckoutNewBranch(branchName string) error {
// Make sure it's okay to write
if err := d.verifyWrite(); err != nil {
return err
}
return d.wt.Checkout(&git.CheckoutOptions{
Branch: plumbing.NewBranchReferenceName(branchName),
Create: true,
})
}
func (d *gitDirectory) CheckoutMainBranch() error {
// Make sure it's okay to write
if err := d.verifyWrite(); err != nil {
return err
}
// Best-effort clean
_ = d.wt.Clean(&git.CleanOptions{
Dir: true,
})
// Force-checkout the main branch
return d.wt.Checkout(&git.CheckoutOptions{
Branch: plumbing.NewBranchReferenceName(d.Branch),
Force: true,
})
}
// observeCommit sets the lastCommit variable so that we know the latest state
func (d *gitDirectory) observeCommit(commit plumbing.Hash) {
d.lastCommit = commit.String()
d.commitChan <- commit.String()
log.Infof("New commit observed on branch %q: %s", d.Branch, commit)
}
// Commit creates a commit of all changes in the current worktree with the given parameters.
// It also automatically pushes the branch after the commit.
// ErrNotStarted is returned if the repo hasn't been cloned yet.
// ErrCannotWriteToReadOnly is returned if opts.AuthMethod wasn't provided.
func (d *gitDirectory) Commit(ctx context.Context, authorName, authorEmail, msg string) error {
// Make sure it's okay to write
if err := d.verifyWrite(); err != nil {
return err
}
s, err := d.wt.Status()
if err != nil {
return fmt.Errorf("git status failed: %v", err)
}
if s.IsClean() {
log.Debugf("No changed files in git repo, nothing to commit...")
return nil
}
// Do a commit and push
log.Debug("commitLoop: Committing all local changes")
hash, err := d.wt.Commit(msg, &git.CommitOptions{
All: true,
Author: &object.Signature{
Name: authorName,
Email: authorEmail,
When: time.Now(),
},
})
if err != nil {
return fmt.Errorf("git commit error: %v", err)
}
// Perform the git push operation using the timeout
err = d.contextWithTimeout(ctx, func(innerCtx context.Context) error {
log.Debug("commitLoop: Will push with timeout")
return d.repo.PushContext(innerCtx, &git.PushOptions{
Auth: d.AuthMethod,
})
})
// Handle errors
switch err {
case nil, git.NoErrAlreadyUpToDate:
// no-op, just continue. Allow the git.NoErrAlreadyUpToDate error
case context.DeadlineExceeded:
return fmt.Errorf("git push operation took longer than deadline %s", d.Timeout)
case context.Canceled:
log.Tracef("context was cancelled")
return nil // if Cleanup() was called, just exit the goroutine
default:
return fmt.Errorf("failed to push: %v", err)
}
// Notify upstream that we now have a new commit, and allow writing again
log.Infof("A new commit with the actual state has been created and pushed to the origin: %q", hash)
d.observeCommit(hash)
return nil
}
func (d *gitDirectory) contextWithTimeout(ctx context.Context, fn func(context.Context) error) error {
// Create a new context with a timeout. The push operation either succeeds in time, times out,
// or is cancelled by Cleanup(). In case of a successful run, the context is always cancelled afterwards.
ctx, cancel := context.WithTimeout(ctx, d.Timeout)
defer cancel()
// Run the function using the context and cancel directly afterwards
fnErr := fn(ctx)
// Return the context error, if any, first so deadline/cancel signals can propagate.
// Otherwise passthrough the error returned from the function.
if ctx.Err() != nil {
log.Debugf("operation context yielded error %v to be returned. Function error was: %v", ctx.Err(), fnErr)
return ctx.Err()
}
return fnErr
}
// Cleanup cancels running goroutines and operations, and removes the temporary clone directory
func (d *gitDirectory) Cleanup() error {
// Cancel the context for the two running goroutines, and any possible long-running operations
d.cancel()
// Remove the temporary directory
if err := os.RemoveAll(d.Dir()); err != nil {
log.Errorf("Failed to clean up temp git directory: %v", err)
return err
}
return nil
}