-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathqueue_test.go
152 lines (146 loc) · 4.44 KB
/
queue_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
//go:build linux
package gosh_test
import (
"context"
"path/filepath"
"time"
. "github.com/meln5674/gosh/pkg/gomega"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"fmt"
"os"
"os/exec"
"github.com/meln5674/gosh"
)
var _ = Describe("Queue", func() {
When("running invalid max concurrency", func() {
It("should fail to start", func() {
dir, err := os.MkdirTemp("", "*")
Expect(err).ToNot(HaveOccurred())
defer os.RemoveAll(dir)
names := []string{"a", "b", "c"}
shells := make(chan gosh.Commander, len(names))
cmd := gosh.Queue(shells).WithMaxConcurrency(0).WithResultBufferSize(3)
Expect(cmd.Start()).NotTo(Succeed())
})
It("should fail to start", func() {
dir, err := os.MkdirTemp("", "*")
Expect(err).ToNot(HaveOccurred())
defer os.RemoveAll(dir)
names := []string{"a", "b", "c"}
shells := make(chan gosh.Commander, len(names))
cmd := gosh.Queue(shells).WithMaxConcurrency(0).WithResultBufferSize(3)
Expect(cmd.Run()).NotTo(Succeed())
})
})
When("running without a max concurrency", func() {
It("should run all processes at once", func() {
dir, err := os.MkdirTemp("", "*")
Expect(err).ToNot(HaveOccurred())
defer os.RemoveAll(dir)
names := []string{"a", "b", "c"}
shells := make(chan gosh.Commander)
go func() {
defer GinkgoRecover()
for _, name := range names {
shells <- gosh.Shell(allOf(
makeSentinel(dir, name),
waitForSentinels(dir, names...),
))
}
close(shells)
}()
Expect(gosh.Queue(shells).Run()).To(Succeed())
})
})
When("running with a max concurrency", func() {
It("should not run all processes at once", func() {
dir, err := os.MkdirTemp("", "*")
Expect(err).ToNot(HaveOccurred())
defer os.RemoveAll(dir)
names := []string{"a", "b", "c"}
shells := make(chan gosh.Commander, len(names))
cmd := gosh.Queue(shells).WithMaxConcurrency(2).WithResultBufferSize(3)
Expect(cmd.Start()).To(Succeed())
cmds := make([]*gosh.Cmd, 0, 3)
go func() {
for _, name := range names {
cmd := gosh.Shell(allOf(
makeSentinel(dir, name),
waitForSentinels(dir, names...),
)).UsingProcessGroup()
cmds = append(cmds, cmd)
shells <- cmd
}
close(shells)
}()
Expect(gosh.Shell(waitForSentinels(dir, "a", "b")).Start()).To(Succeed())
// If we don't sleep here, there is a chance that the processes don't have time to register their state
time.Sleep(1 * time.Second)
Expect(cmd.Kill()).To(Succeed())
Expect(fmt.Sprintf("%s/%s", dir, "c")).ToNot(BeARegularFile())
Expect(cmd.Wait()).To(Or(MatchMultiProcessError(gosh.ErrKilled), MatchMultiProcessErrorType(&exec.ExitError{})))
Expect(cmds[0].ProcessState).ToNot(BeNil())
Expect(cmds[1].ProcessState).ToNot(BeNil())
Expect(cmds[2].ProcessState).To(BeNil())
})
})
When("Any process fails", func() {
It("should fail", func() {
dir, err := os.MkdirTemp("", "*")
Expect(err).ToNot(HaveOccurred())
defer os.RemoveAll(dir)
names := []string{"a", "b", "c"}
shells := make(chan gosh.Commander)
go func() {
defer GinkgoRecover()
for _, name := range names {
shells <- gosh.Shell(allOf(
makeSentinel(dir, name),
waitForSentinels(dir, names...),
))
}
shells <- gosh.Shell(fail())
close(shells)
}()
Expect(gosh.Queue(shells).Run()).ToNot(Succeed())
})
})
When("Any process fails to start", func() {
It("should fail", func(ctx context.Context) {
dir, err := os.MkdirTemp("", "*")
Expect(err).ToNot(HaveOccurred())
defer os.RemoveAll(dir)
names := []string{"a", "b", "c"}
shells := make(chan gosh.Commander)
go func() {
defer GinkgoRecover()
for _, name := range names {
shells <- gosh.Shell(allOf(
makeSentinel(dir, name),
waitForSentinels(dir, names...),
))
}
shells <- failToStart(ctx)
close(shells)
}()
Expect(gosh.Queue(shells).Run()).ToNot(Succeed())
})
})
It("Should not complete until all processes are done", func(ctx context.Context) {
dir, err := os.MkdirTemp("", "*")
Expect(err).ToNot(HaveOccurred())
defer os.RemoveAll(dir)
shells := make(chan gosh.Commander, 1)
shells <- gosh.Shell("sleep 10")
sentinel := filepath.Join(dir, "a")
go func() {
defer GinkgoRecover()
Expect(gosh.And(gosh.Queue(shells), gosh.Shell(makeSentinel(dir, "a"))).Run()).To(Succeed())
}()
Consistently(func() error {
_, err := os.Stat(sentinel)
return err
}, "200ms", "8s").Should(MatchError(os.ErrNotExist))
})
})