-
Notifications
You must be signed in to change notification settings - Fork 8
/
iter.go
179 lines (149 loc) · 5.25 KB
/
iter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package sortedmap
import (
"errors"
"time"
)
// IterChCloser allows records to be read through a channel that is returned by the Records method.
// IterChCloser values should be closed after use using the Close method.
type IterChCloser struct {
ch chan Record
canceled chan struct{}
}
// Close cancels a channel-based iteration and causes the sending goroutine to exit.
// Close should be used after an IterChCloser is finished being read from.
func (iterCh *IterChCloser) Close() error {
close(iterCh.canceled)
return nil
}
// Records returns a channel that records can be read from.
func (iterCh *IterChCloser) Records() <-chan Record {
return iterCh.ch
}
// IterChParams contains configurable settings for CustomIterCh.
// SendTimeout is disabled by default, though it should be set to allow
// channel send goroutines to time-out.
// BufSize is set to 1 if its field is set to a lower value.
// LowerBound and UpperBound default to regular iteration when left unset.
type IterChParams struct {
Reversed bool
SendTimeout time.Duration
BufSize int
LowerBound,
UpperBound interface{}
}
// IterCallbackFunc defines the type of function that is passed into an IterFunc method.
// The function is passed a record value argument.
type IterCallbackFunc func(rec Record) bool
func setBufSize(bufSize int) int {
// initialBufSize must be >= 1 or a blocked channel send goroutine may not exit.
// More info: https://github.com/golang/go/wiki/Timeouts
const initialBufSize = 1
if bufSize < initialBufSize {
return initialBufSize
}
return bufSize
}
func (sm *SortedMap) recordFromIdx(i int) Record {
rec := Record{}
rec.Key = sm.sorted[i]
rec.Val = sm.idx[rec.Key]
return rec
}
func (sm *SortedMap) sendRecord(iterCh IterChCloser, sendTimeout time.Duration, i int) bool {
if sendTimeout <= time.Duration(0) {
select {
case <-iterCh.canceled:
return false
case iterCh.ch <- sm.recordFromIdx(i):
return true
}
}
select {
case <-iterCh.canceled:
return false
case iterCh.ch <- sm.recordFromIdx(i):
return true
case <-time.After(sendTimeout):
return false
}
}
func (sm *SortedMap) iterCh(params IterChParams) (IterChCloser, error) {
iterBounds := sm.boundsIdxSearch(params.LowerBound, params.UpperBound)
if iterBounds == nil {
return IterChCloser{}, errors.New(noValuesErr)
}
iterCh := IterChCloser{
ch: make(chan Record, setBufSize(params.BufSize)),
canceled: make(chan struct{}),
}
go func(params IterChParams, iterCh IterChCloser) {
if params.Reversed {
for i := iterBounds[1]; i >= iterBounds[0]; i-- {
if !sm.sendRecord(iterCh, params.SendTimeout, i) {
break
}
}
} else {
for i := iterBounds[0]; i <= iterBounds[1]; i++ {
if !sm.sendRecord(iterCh, params.SendTimeout, i) {
break
}
}
}
close(iterCh.ch)
}(params, iterCh)
return iterCh, nil
}
func (sm *SortedMap) iterFunc(reversed bool, lowerBound, upperBound interface{}, f IterCallbackFunc) error {
iterBounds := sm.boundsIdxSearch(lowerBound, upperBound)
if iterBounds == nil {
return errors.New(noValuesErr)
}
if reversed {
for i := iterBounds[1]; i >= iterBounds[0]; i-- {
if !f(sm.recordFromIdx(i)) {
break
}
}
} else {
for i := iterBounds[0]; i <= iterBounds[1]; i++ {
if !f(sm.recordFromIdx(i)) {
break
}
}
}
return nil
}
// IterCh returns a channel that sorted records can be read from and processed.
// This method defaults to the expected behavior of blocking until a read, with no timeout.
func (sm *SortedMap) IterCh() (IterChCloser, error) {
return sm.iterCh(IterChParams{})
}
// BoundedIterCh returns a channel that sorted records can be read from and processed.
// BoundedIterCh starts at the lower bound value and sends all values in the collection until reaching the upper bounds value.
// Sort order is reversed if the reversed argument is set to true.
// This method defaults to the expected behavior of blocking until a channel send completes, with no timeout.
func (sm *SortedMap) BoundedIterCh(reversed bool, lowerBound, upperBound interface{}) (IterChCloser, error) {
return sm.iterCh(IterChParams{
Reversed: reversed,
LowerBound: lowerBound,
UpperBound: upperBound,
})
}
// CustomIterCh returns a channel that sorted records can be read from and processed.
// CustomIterCh starts at the lower bound value and sends all values in the collection until reaching the upper bounds value.
// Sort order is reversed if the reversed argument is set to true.
// This method defaults to the expected behavior of blocking until a channel send completes, with no timeout.
func (sm *SortedMap) CustomIterCh(params IterChParams) (IterChCloser, error) {
return sm.iterCh(params)
}
// IterFunc passes each record to the specified callback function.
// Sort order is reversed if the reversed argument is set to true.
func (sm *SortedMap) IterFunc(reversed bool, f IterCallbackFunc) {
sm.iterFunc(reversed, nil, nil, f)
}
// BoundedIterFunc starts at the lower bound value and passes all values in the collection to the callback function until reaching the upper bounds value.
// Sort order is reversed if the reversed argument is set to true.
func (sm *SortedMap) BoundedIterFunc(reversed bool, lowerBound, upperBound interface{}, f IterCallbackFunc) error {
return sm.iterFunc(reversed, lowerBound, upperBound, f)
}