Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Support custom analysers #111

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
Next release
============

- Support custom analyzers in Elasticsearch
- Fix bug '_csv_split not found'

0.13.2
Expand Down
1 change: 0 additions & 1 deletion annotator/annotation.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
}
}


class Annotation(es.Model):

__type__ = TYPE
Expand Down
100 changes: 76 additions & 24 deletions annotator/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,67 @@ def conn(self):
self._connection = self._connect()
return self._connection

def drop_all(self):
"""Delete the index and its contents"""
if self.conn.indices.exists(self.index):
self.conn.indices.close(self.index)
self.conn.indices.delete(self.index)

def create_all(self, models, analysis_settings):
mappings = _compile_mappings(models)

# Test for index existence while also checking if connection works
try:
index_exists = self.conn.indices.exists(self.index)
except elasticsearch.exceptions.ConnectionError as e:
msg = ('Can not access ElasticSearch at {0}! '
'Check to ensure it is running.').format(self.host)
raise elasticsearch.exceptions.ConnectionError('N/A', msg, e)

if not index_exists:
# If index does not yet exist, simply create the index
self.conn.indices.create(self.index, body={
'mappings': mappings,
'settings': {'analysis': analysis_settings},
})
else:
# Otherwise, update its settings and mappings
self._update_analysis(analysis_settings)
self._update_mappings(mappings)

def _update_analysis(self, analysis):
"""Update analyzers and filters"""
settings = self.conn.indices.get_settings(index=self.index).values()[0]
existing = settings['settings']['index'].get('analysis', {})
# Only bother if new settings would differ from existing settings
if not self._analysis_up_to_date(existing, analysis):
try:
self.conn.indices.close(index=self.index)
self.conn.indices.put_settings(index=self.index,
body={'analysis': analysis})
finally:
self.conn.indices.open(index=self.index)

def _update_mappings(self, mappings):
"""Update mappings.

Warning: can explode because of a MergeMappingError when mappings are
incompatible"""
for doc_type, body in mappings.items():
self.conn.indices.put_mapping(
index=self.index,
doc_type=doc_type,
body=body
)

@staticmethod
def _analysis_up_to_date(existing, analysis):
"""Tell whether existing analysis settings are up to date"""
new_analysis = existing.copy()
for section, items in analysis.items():
new_analysis.setdefault(section,{}).update(items)
return new_analysis == existing


class _Model(dict):
"""Base class that represents a document type in an ElasticSearch index.
Expand All @@ -74,7 +135,7 @@ class _Model(dict):
__type__ -- The name of the document type
__mapping__ -- A mapping of the document's fields

Mapping: Calling create_all() will create the mapping in the index.
Mapping:
One field, 'id', is treated specially. Its value will not be stored,
but be used as the _id identifier of the document in Elasticsearch. If
an item is indexed without providing an id, the _id is automatically
Expand All @@ -87,25 +148,6 @@ class _Model(dict):
with 'analyzer':'standard'.
"""

@classmethod
def create_all(cls):
log.info("Creating index '%s'." % cls.es.index)
conn = cls.es.conn
try:
conn.indices.create(cls.es.index)
except elasticsearch.exceptions.RequestError as e:
# Reraise anything that isn't just a notification that the index
# already exists (either as index or as an alias).
if not (e.error.startswith('IndexAlreadyExistsException')
or e.error.startswith('InvalidIndexNameException')):
log.fatal("Failed to create an Elasticsearch index")
raise
log.warn("Index creation failed as index appears to already exist.")
mapping = cls.get_mapping()
conn.indices.put_mapping(index=cls.es.index,
doc_type=cls.__type__,
body=mapping)

@classmethod
def get_mapping(cls):
return {
Expand All @@ -122,10 +164,8 @@ def get_mapping(cls):
}

@classmethod
def drop_all(cls):
if cls.es.conn.indices.exists(cls.es.index):
cls.es.conn.indices.close(cls.es.index)
cls.es.conn.indices.delete(cls.es.index)
def get_analysis(cls):
return getattr(cls, '__analysis__', {})

# It would be lovely if this were called 'get', but the dict semantics
# already define that method name.
Expand Down Expand Up @@ -215,6 +255,18 @@ def make_model(es):
return type('Model', (_Model,), {'es': es})


def _compile_mappings(models):
"""Collect the mappings from the models"""
mappings = {}
for model in models:
mappings.update(model.get_mapping())
return mappings


def _csv_split(s, delimiter=','):
return [r for r in csv.reader([s], delimiter=delimiter)][0]


def _build_query(query, offset, limit):
# Create a match query for each keyword
match_clauses = [{'match': {k: v}} for k, v in iteritems(query)]
Expand Down
9 changes: 9 additions & 0 deletions annotator/elasticsearch_analyzers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
"""Custom Elasticsearch analyzers that can be used for indexing fields in
models (Annotation, Document).
"""

ANALYSIS = {
'analyzer': {},
'filter': {},
'tokenizer': {},
}
6 changes: 5 additions & 1 deletion annotator/reindexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@

from .annotation import Annotation
from .document import Document
from .elasticsearch_analyzers import ANALYSIS


class Reindexer(object):

es_models = Annotation, Document
analysis_settings = ANALYSIS

def __init__(self, conn, interactive=False):
self.conn = conn
Expand Down Expand Up @@ -60,7 +62,9 @@ def alias(self, index, alias):

def get_index_config(self):
# Configure index mappings
index_config = {'mappings': {}}
index_config = {'mappings': {},
'settings': {'analysis': self.analysis_settings},
}
for model in self.es_models:
index_config['mappings'].update(model.get_mapping())
return index_config
30 changes: 15 additions & 15 deletions run.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

from flask import Flask, g, current_app
import elasticsearch
from annotator import es, annotation, auth, authz, document, store
from annotator import es, annotation, auth, authz, document, \
elasticsearch_analyzers, store
from tests.helpers import MockUser, MockConsumer, MockAuthenticator
from tests.helpers import mock_authorizer

Expand Down Expand Up @@ -60,20 +61,19 @@ def main():
if app.config.get('AUTHZ_ON') is not None:
es.authorization_enabled = app.config['AUTHZ_ON']

with app.test_request_context():
try:
annotation.Annotation.create_all()
document.Document.create_all()
except elasticsearch.exceptions.RequestError as e:
if e.error.startswith('MergeMappingException'):
date = time.strftime('%Y-%m-%d')
log.fatal("Elasticsearch index mapping is incorrect! Please "
"reindex it. You can use reindex.py for this, e.g. "
"python reindex.py --host {0} {1} {1}-{2}".format(
es.host,
es.index,
date))
raise
try:
es.create_all(models=[annotation.Annotation, document.Document],
analysis_settings=elasticsearch_analyzers.ANALYSIS)
except elasticsearch.exceptions.RequestError as e:
if e.error.startswith('MergeMappingException'):
date = time.strftime('%Y-%m-%d')
log.fatal("Elasticsearch index mapping is incorrect! Please "
"reindex it. You can use reindex.py for this, e.g. "
"python reindex.py --host {0} {1} {1}-{2}".format(
es.host,
es.index,
date))
raise

@app.before_request
def before_request():
Expand Down
13 changes: 6 additions & 7 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import os
from flask import Flask, g, request

from annotator import es, auth, authz, annotation, store, document
from annotator import es, auth, authz, annotation, document, \
elasticsearch_analyzers, store

from .helpers import MockUser, MockConsumer

Expand Down Expand Up @@ -30,15 +31,13 @@ class TestCase(object):
@classmethod
def setup_class(cls):
cls.app = create_app()
annotation.Annotation.drop_all()
document.Document.drop_all()
es.drop_all()

def setup(self):
annotation.Annotation.create_all()
document.Document.create_all()
es.create_all(models=[annotation.Annotation, document.Document],
analysis_settings=elasticsearch_analyzers.ANALYSIS)
es.conn.cluster.health(wait_for_status='yellow')
self.cli = self.app.test_client()

def teardown(self):
annotation.Annotation.drop_all()
document.Document.drop_all()
es.drop_all()