-
Notifications
You must be signed in to change notification settings - Fork 0
/
filter.go
142 lines (115 loc) · 2.99 KB
/
filter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package bloomd
import (
"strings"
)
// Filter represents a single filter in the bloomd server
type Filter struct {
Name string
client *Client
}
var yes = []byte("Yes")
// BulkSet adds multiple keys to the filter
func (f Filter) BulkSet(reader KeyReader) (ResultReader, error) {
count, err := f.sendBatchOp("b", reader)
if err != nil {
return nil, f.client.handleWriteError(err)
}
return f.readerFor(count), nil
}
// MultiCheck checks multiple keys for the filter
func (f Filter) MultiCheck(reader KeyReader) (ResultReader, error) {
count, err := f.sendBatchOp("m", reader)
if err != nil {
return nil, f.client.handleWriteError(err)
}
return f.readerFor(count), nil
}
func (f Filter) sendBatchOp(op string, reader KeyReader) (int, error) {
count := 0
w := f.client.writer
w.WriteString(op)
w.WriteByte(itemDelimeter)
w.WriteString(f.Name)
for reader.Next() {
count++
w.WriteByte(itemDelimeter)
w.Write(reader.Current())
}
w.WriteByte(cmdDelimeter)
return count, w.Flush()
}
// Clear clears the filter
func (f Filter) Clear() error {
return checkResponse(f.client.sendAndReceive([]byte("clear " + f.Name)))
}
// Close closes the filter on the server
func (f Filter) Close() error {
return checkResponse(f.client.sendAndReceive([]byte("close " + f.Name)))
}
// Drop drops the filter on the server
func (f Filter) Drop() error {
return checkResponse(f.client.sendAndReceive([]byte("drop " + f.Name)))
}
// Flush force flushes the filter
func (f Filter) Flush() error {
return checkResponse(f.client.sendAndReceive([]byte("flush " + f.Name)))
}
// Info returns info map from the server
func (f Filter) Info() (map[string]string, error) {
if err := f.client.send([]byte("info " + f.Name)); err != nil {
return nil, err
}
lines, err := f.client.readList()
if err != nil {
return nil, err
}
resp := map[string]string{}
for _, line := range lines {
split := strings.SplitN(line, " ", 2)
resp[split[0]] = split[1]
}
return resp, nil
}
// Set sets a single key to the bloom
func (f Filter) Set(key Key) (bool, error) {
err := f.sendSingleOp("s", key)
if err != nil {
return false, f.client.handleWriteError(err)
}
return f.readSingle()
}
// Check gets a single key to the bloom
func (f Filter) Check(key Key) (bool, error) {
err := f.sendSingleOp("c", key)
if err != nil {
return false, f.client.handleWriteError(err)
}
return f.readSingle()
}
func (f Filter) sendSingleOp(op string, key Key) error {
w := f.client.writer
w.WriteString(op)
w.WriteByte(itemDelimeter)
w.WriteString(f.Name)
w.WriteByte(itemDelimeter)
w.Write(key)
w.WriteByte(cmdDelimeter)
return w.Flush()
}
func (f Filter) readerFor(resultLength int) ResultReader {
f.client.resultReader.resetLength(resultLength)
return f.client.resultReader
}
func (f Filter) readSingle() (bool, error) {
r := f.readerFor(1)
defer r.Close()
return r.Next()
}
func checkResponse(resp string, err error) error {
if resp != "Done" {
return Error{
Message: "invalid response from server: " + resp,
}
}
return err
}