-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindexer.py
47 lines (36 loc) · 1.38 KB
/
indexer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class Indexer:
def __init__(self, config):
self.config = config
self.es = Elasticsearch([config])
# Insert single document to index
def add_to_index(self, data):
result = self.es.index(index=self.config["index"], id=data['id'], document=data)
def add_to_index_bulk(self, data_gen):
def bulk_func():
for data in data_gen:
doc = {
"_index": self.config["index"],
"_id": data['id'],
}
doc.update(data)
yield doc
bulk(self.es, bulk_func())
def add_to_index_bulk(self, data_gen):
def bulk_func():
for data in data_gen:
doc = {
"_index": self.config["index"],
"_id": data['id'],
}
doc.update(data)
yield doc
bulk(self.es, bulk_func())
# Insert set of documents to index
def bulk_to_index(self, data):
result = self.es.bulk(index = self.config["index"], body=data, refresh='wait_for');
def add_to_index_with_id(self, data, data_id):
result = self.es.index(index = self.config["index"], body = data, id=data_id)