-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathstart.go
145 lines (121 loc) · 3.67 KB
/
start.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package xtractr
import (
"errors"
"os"
)
// Sane defaults.
const (
DefaultDirMode = 0o755
DefaultFileMode = 0o644
DefaultSuffix = "_xtractr"
// DefaultBufferSize is the size of the extraction buffer.
// ie. How many jobs can be queued before things get slow.
DefaultBufferSize = 1000
)
// Config is the input data to configure the Xtract queue. Fill this out and
// pass it into NewQueue() to create a queue for archive extractions.
type Config struct {
// Size of the extraction channel buffer. Default=1000.
// Use -1 for unbuffered channel. Not recommend.
BuffSize int
// Number of concurrent extractions allowed.
Parallel int
// Filemode used when writing files, tar ignores this, so does Windows.
FileMode os.FileMode
// Filemode used when writing folders, tar ignores this.
DirMode os.FileMode
// The suffix used for temporary folders.
Suffix string
// Logs are sent to this Logger.
Logger
}
// Logger allows this library to write logs.
// Use this to capture them in your own flow.
type Logger interface {
Printf(format string, v ...interface{})
Debugf(format string, v ...interface{})
}
// Xtractr is what you get from NewQueue(). This is the main app struct.
// Use this struct to call Xtractr.Extract() to queue an extraction.
type Xtractr struct {
config *Config
queue chan *Xtract
done chan struct{}
}
// Custom errors returned by this module.
var (
ErrQueueStopped = errors.New("extractor queue stopped, cannot extract")
ErrNoCompressedFiles = errors.New("no compressed files found")
ErrUnknownArchiveType = errors.New("unknown archive file type")
ErrInvalidPath = errors.New("archived file contains invalid path")
ErrInvalidHead = errors.New("archived file contains invalid header file")
ErrQueueRunning = errors.New("extractor queue running, cannot start")
ErrNoConfig = errors.New("call NewQueue() to initialize a queue")
ErrNoLogger = errors.New("xtractr.Config.Logger must be non-nil")
)
// NewQueue returns a new Xtractr Queue you can send Xtract jobs into.
// This is where to start if you're creating an extractor queue.
// You must provide a Logger in the config, everything else is optional.
func NewQueue(config *Config) *Xtractr {
x := parseConfig(config)
if err := x.Start(); err != nil {
panic(err)
}
return x
}
// Start restarts the queue. This can be called only after you call Stop().
func (x *Xtractr) Start() error {
if x.queue != nil {
// This happens if you call Start() without calling Stop() first.
return ErrQueueRunning
}
if x.config == nil {
// This happens if you call Start() on an *Xtractr without NewQueue().
return ErrNoConfig
}
if x.config.Logger == nil {
// This happens if you forget a *Logger.
return ErrNoLogger
}
x.queue = make(chan *Xtract, x.config.BuffSize)
for i := 0; i < x.config.Parallel; i++ {
go x.processQueue()
}
return nil
}
// parseConfig verifies sane config data and returns the Xtractr struct.
func parseConfig(config *Config) *Xtractr {
if config.FileMode == 0 {
config.FileMode = DefaultFileMode
}
if config.DirMode == 0 {
config.DirMode = DefaultDirMode
}
if config.Parallel < 1 {
config.Parallel = 1
}
if config.BuffSize == 0 {
config.BuffSize = DefaultBufferSize
} else if config.BuffSize < 0 {
config.BuffSize = 0
}
if config.Suffix == "" {
config.Suffix = DefaultSuffix
}
return &Xtractr{
config: config,
done: make(chan struct{}),
}
}
// Stop shuts down the extractor routines. Call this to shut things down.
func (x *Xtractr) Stop() {
if x.queue == nil {
return
}
close(x.queue)
// Wait until all running extractions are done.
for i := 0; i < x.config.Parallel; i++ {
<-x.done
}
x.queue = nil
}