-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathelasticsearch.go
215 lines (192 loc) · 6.56 KB
/
elasticsearch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
package kasper
import (
"fmt"
"strings"
"golang.org/x/net/context"
elastic "gopkg.in/olivere/elastic.v5"
)
const maxBulkErrorReasons = 5
// Elasticsearch is an implementation of Store that uses Elasticsearch.
// Each instance provides key-value access to a given index and a given document type.
// This implementation supports Elasticsearch 5.x and uses Oliver Eilhard's Go Elasticsearch client.
// See https://github.com/olivere/elastic
type Elasticsearch struct {
client *elastic.Client
context context.Context
indexName string
typeName string
logger Logger
labelValues []string
getCounter Counter
getAllSummary Summary
putCounter Counter
putAllSummary Summary
deleteCounter Counter
flushCounter Counter
}
// NewElasticsearch creates Elasticsearch instances. All documents read and written will correspond to the URL:
// https://{cluster}:9092/{indexName}/{typeName}/{key}
func NewElasticsearch(config *Config, client *elastic.Client, indexName, typeName string) *Elasticsearch {
metrics := config.MetricsProvider
labelNames := []string{"topicProcessor", "index", "type"}
s := &Elasticsearch{
client,
context.Background(),
indexName,
typeName,
config.Logger,
[]string{config.TopicProcessorName, indexName, typeName},
metrics.NewCounter("Elasticsearch_Get", "Number of Get() calls", labelNames...),
metrics.NewSummary("Elasticsearch_GetAll", "Summary of GetAll() calls", labelNames...),
metrics.NewCounter("Elasticsearch_Put", "Number of Put() calls", labelNames...),
metrics.NewSummary("Elasticsearch_PutAll", "Summary of PutAll() calls", labelNames...),
metrics.NewCounter("Elasticsearch_Delete", "Number of Delete() calls", labelNames...),
metrics.NewCounter("Elasticsearch_Flush", "Summary of Flush() calls", labelNames...),
}
return s
}
// Get gets a document by key (i.e. the Elasticsearch _id).
// It is implemented by using the Elasticsearch Get API.
// The returned byte slice contains the UTF8-encoded JSON document (i.e., _source).
// This function returns (nil, nil) if the document does not exist.
// See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-get.html
func (s *Elasticsearch) Get(key string) ([]byte, error) {
s.logger.Debugf("Elasticsearch Get: %s/%s/%s", s.indexName, s.typeName, key)
s.getCounter.Inc(s.labelValues...)
rawValue, err := s.client.Get().
Index(s.indexName).
Type(s.typeName).
Id(key).
Do(s.context)
if fmt.Sprintf("%s", err) == "elastic: Error 404 (Not Found)" {
return nil, nil
}
if err != nil {
return nil, err
}
if !rawValue.Found {
return nil, nil
}
return *rawValue.Source, nil
}
// GetAll gets multiple document from the store. It is implemented using the Elasticsearch MultiGet API.
// See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-multi-get.html
func (s *Elasticsearch) GetAll(keys []string) (map[string][]byte, error) {
s.getAllSummary.Observe(float64(len(keys)), s.labelValues...)
if len(keys) == 0 {
return map[string][]byte{}, nil
}
s.logger.Debug("Elasticsearch GetAll: ", keys)
multiGet := s.client.MultiGet()
for _, key := range keys {
item := elastic.NewMultiGetItem().
Index(s.indexName).
Type(s.typeName).
Id(key)
multiGet.Add(item)
}
response, err := multiGet.Do(s.context)
if err != nil {
return nil, err
}
kvs := make(map[string][]byte, len(keys))
for i, doc := range response.Docs {
if doc.Found {
kvs[keys[i]] = *doc.Source
}
}
return kvs, nil
}
// Put inserts or updates a document in the store (key is used as the document _id).
// It is implemented using the Elasticsearch Index API.
// The value byte slice must contain the UTF8-encoded JSON document (i.e., _source).
// See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html
func (s *Elasticsearch) Put(key string, value []byte) error {
s.logger.Debugf("Elasticsearch Put: %s/%s/%s %#v", s.indexName, s.typeName, key, value)
s.putCounter.Inc(s.labelValues...)
_, err := s.client.Index().
Index(s.indexName).
Type(s.typeName).
Id(key).
BodyString(string(value)).
Do(s.context)
return err
}
// PutAll inserts or updates a number of documents in the store.
// It is implemented using the Elasticsearch Bulk and Index APIs.
// It returns an error if any operation fails.
// See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
func (s *Elasticsearch) PutAll(kvs map[string][]byte) error {
s.logger.Debugf("Elasticsearch PutAll of %d keys", len(kvs))
s.putAllSummary.Observe(float64(len(kvs)), s.labelValues...)
if len(kvs) == 0 {
return nil
}
bulk := s.client.Bulk()
for key, value := range kvs {
bulk.Add(elastic.NewBulkIndexRequest().
Index(s.indexName).
Type(s.typeName).
Id(key).
Doc(string(value)),
)
}
response, err := bulk.Do(s.context)
if err != nil {
return err
}
if response.Errors {
return createBulkError(response)
}
return nil
}
// Delete removes a document from the store.
// It does not return an error if the document was not present.
// It is implemented using the Elasticsearch Delete API.
// See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete.html
func (s *Elasticsearch) Delete(key string) error {
s.logger.Debugf("Elasticsearch Delete: %s/%s/%s", s.indexName, s.typeName, key)
s.deleteCounter.Inc(s.labelValues...)
_, err := s.client.Delete().
Index(s.indexName).
Type(s.typeName).
Id(key).
Do(s.context)
if err != nil && err.(*elastic.Error).Status == 404 {
return nil
}
return err
}
// Flush flushes the Elasticsearch translog to disk.
// It is implemented using the Elasticsearch Flush API.
// See https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-flush.html
func (s *Elasticsearch) Flush() error {
s.logger.Info("Elasticsearch Flush...")
s.flushCounter.Inc(s.labelValues...)
_, err := s.client.Flush("_all").
WaitIfOngoing(true).
Do(s.context)
s.logger.Info("Elasticsearch Flush complete")
return err
}
// GetClient returns the underlying elastic.Client
func (s *Elasticsearch) GetClient() *elastic.Client {
return s.client
}
func createBulkError(response *elastic.BulkResponse) error {
reasons := []string{}
failed := response.Failed()
for i, item := range failed {
if item.Error != nil {
reason := fmt.Sprintf("id = %s, error = %s\n", item.Id, item.Error.Reason)
reasons = append(reasons, reason)
}
if i == maxBulkErrorReasons-1 {
reason := fmt.Sprintf("(omitted %d more errors)", len(failed)-maxBulkErrorReasons)
reasons = append(reasons, reason)
break
}
}
err := fmt.Errorf("PutAll failed for some requests:\n%s", strings.Join(reasons, ""))
return err
}