Skip to content

Commit

Permalink
Merge pull request #29 from HBS-HBX/#25_evaluate_queryset_inside_workers
Browse files Browse the repository at this point in the history
closes #25 use pks for queryset inside workers
  • Loading branch information
codekiln authored Aug 13, 2018
2 parents 8dedb2b + 20a222d commit 0252c5b
Show file tree
Hide file tree
Showing 15 changed files with 2,201 additions and 58 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
Changelog
---------

0.7.2 (2018-08-13)
~~~~~~~~~~~~~~~~~~
* fix #21 wrong batch update total using multiprocessing in 0.7.1
* fix #23 KeyError _index_version_name in es_update --newer
* address #25 use pks for queryset inside workers #29

0.7.1 (2018-08-07)
~~~~~~~~~~~~~~~~~~
* fixed gh #8 es_dangerous_reset --es-only to sync database to ES
Expand Down
82 changes: 80 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,9 @@ params = {
call_command('dumpdata', **params)
```

An example of this is included with the
[moviegen management command](./management/commands/moviegen.py).

### Tuning
By default, `/.manage.py es_update` will divide the result of
`DEMDocType.get_queryset()` into batches of size `DocType.BATCH_SIZE`.
Expand All @@ -288,17 +291,92 @@ to see the available `make` targets.


### Requirements
* run `make requirements`* to run the pip install.

*`make upgrade`* upgrades the dependencies of the requirements to latest
version. This process also excludes `django` and `elasticsearch-dsl`
from the `requirements/test.txt` so they can be injected with different
versions by tox during matrix testing.

*`make requirements`* runs the pip install.

This project also uses [`pip-tools`](https://github.com/jazzband/pip-tools).
The `requirements.txt` files are generated and pinned to latest versions
with `make upgrade`.

### Populating Local `tests_movies` Database Table With Data

It may be helpful for you to populate a local database with Movies test
data to experiment with using `django-elastic-migrations`. First,
migrate the database:

`./manage.py migrate --run-syncdb --settings=test_settings`

Next, load the basic fixtures:

`./manage.py loaddata tests/100films.json`

You may wish to add more movies to the database. A management command
has been created for this purpose. Get a [Free OMDB API key here](https://www.omdbapi.com/apikey.aspx),
then run a query like this (replace `MYAPIKEY` with yours):

```
$> ./manage.py moviegen --title="Inception" --api-key="MYAPIKEY"
{'actors': 'Leonardo DiCaprio, Joseph Gordon-Levitt, Ellen Page, Tom Hardy',
'awards': 'Won 4 Oscars. Another 152 wins & 204 nominations.',
'boxoffice': '$292,568,851',
'country': 'USA, UK',
'director': 'Christopher Nolan',
'dvd': '07 Dec 2010',
'genre': 'Action, Adventure, Sci-Fi',
'imdbid': 'tt1375666',
'imdbrating': '8.8',
'imdbvotes': '1,721,888',
'language': 'English, Japanese, French',
'metascore': '74',
'plot': 'A thief, who steals corporate secrets through the use of '
'dream-sharing technology, is given the inverse task of planting an '
'idea into the mind of a CEO.',
'poster': 'https://m.media-amazon.com/images/M/MV5BMjAxMzY3NjcxNF5BMl5BanBnXkFtZTcwNTI5OTM0Mw@@._V1_SX300.jpg',
'production': 'Warner Bros. Pictures',
'rated': 'PG-13',
'ratings': [{'Source': 'Internet Movie Database', 'Value': '8.8/10'},
{'Source': 'Rotten Tomatoes', 'Value': '86%'},
{'Source': 'Metacritic', 'Value': '74/100'}],
'released': '16 Jul 2010',
'response': 'True',
'runtime': 148,
'title': 'Inception',
'type': 'movie',
'website': 'http://inceptionmovie.warnerbros.com/',
'writer': 'Christopher Nolan',
'year': '2010'}
```

To save the movie to the database, use the `--save` flag. Also useful is
the `--noprint` option, to suppress json. Also, if you add
`OMDB_API_KEY=MYAPIKEY` to your environment variables, you don't have
to specify it each time:

```
$ ./manage.py moviegen --title "Closer" --noprint --save
Saved 1 new movie(s) to the database: Closer
```

Now that it's been saved to the database, you may want to create a fixture,
so you can get back to this state in the future.

```
$ ./manage.py moviegen --makefixture=tests/myfixture.json
dumping fixture data to tests/myfixture.json ...
[...........................................................................]
```

Later, you can restore this database with the regular `loaddata` command:

```
$ ./manage.py loaddata tests/myfixture.json
Installed 101 object(s) from 1 fixture(s)
```

### Running Tests Locally

Run `make test`. To run all tests and quality checks locally,
Expand Down
4 changes: 3 additions & 1 deletion codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ coverage:

status:
project: yes
patch: yes
patch:
default:
enabled: no
changes: no

ignore:
Expand Down
3 changes: 1 addition & 2 deletions django_elastic_migrations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from django_elastic_migrations.utils import loading
from django_elastic_migrations.utils.django_elastic_migrations_log import get_logger

__version__ = '0.7.1'
__version__ = '0.7.2'

default_app_config = 'django_elastic_migrations.apps.DjangoElasticMigrationsConfig' # pylint: disable=invalid-name

Expand All @@ -25,7 +25,6 @@
'This should be the python path to the elasticsearch client '
'to use for indexing.')

logger.debug("using DJANGO_ELASTIC_MIGRATIONS_ES_CLIENT = {}".format(settings.DJANGO_ELASTIC_MIGRATIONS_ES_CLIENT))

try:
es_client = loading.import_module_element(settings.DJANGO_ELASTIC_MIGRATIONS_ES_CLIENT)
Expand Down
25 changes: 14 additions & 11 deletions django_elastic_migrations/indexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,10 +575,20 @@ def generate_batches(cls, qs=None, batch_size=BATCH_SIZE, total_items=None, upda
if qs is None:
qs = cls.get_queryset()

update_index_action.add_log("START getting all ids to update")
try:
qs_ids = list(qs.values_list(cls.PK_ATTRIBUTE, flat=True))
except TypeError as e:
if "values_list() got an unexpected keyword argument 'flat'" in e:
qs_ids = [str(id) for id in list(qs.values_list(cls.PK_ATTRIBUTE))]
else:
raise
update_index_action.add_log("END getting all ids to update")

if total_items is None:
total_items = cls.get_queryset_count(qs)
total_items = len(qs_ids)

total_docs = cls.get_total_docs(qs)
total_docs = cls.get_total_docs(cls.get_queryset().filter(id__in=qs_ids))

batches = []

Expand All @@ -588,15 +598,8 @@ def generate_batches(cls, qs=None, batch_size=BATCH_SIZE, total_items=None, upda
for start_index in range(0, total_items, batch_size):
# See https://docs.djangoproject.com/en/1.9/ref/models/querysets/#when-querysets-are-evaluated:
# "slicing an unevaluated QuerySet returns another unevaluated QuerySet"
end_index = min(start_index + batch_size, total_items - start_index)
batch_qs = qs[start_index:end_index]
try:
ids_in_batch = list(batch_qs.values_list(cls.PK_ATTRIBUTE, flat=True))
except TypeError as e:
if "values_list() got an unexpected keyword argument 'flat'" in e:
ids_in_batch = [str(id) for id in list(batch_qs.values_list(cls.PK_ATTRIBUTE))]
else:
raise
end_index = min(start_index + batch_size, total_items)
ids_in_batch = qs_ids[start_index:end_index]

batch_index_action = PartialUpdateIndexAction(
index=update_index_action.index,
Expand Down
74 changes: 45 additions & 29 deletions django_elastic_migrations/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from copy import deepcopy
from multiprocessing import cpu_count

from django.db import models, transaction
from django.db import models, transaction, OperationalError
from django.utils import timezone
from django.utils.encoding import python_2_unicode_compatible
from elasticsearch import TransportError
Expand Down Expand Up @@ -281,6 +281,7 @@ def dem_index(self):
def add_log(self, msg, commit=True, use_self_dict_format=False, level=logger.INFO):
if use_self_dict_format:
msg = msg.format(**self.__dict__)
msg = "[{}]: {}".format(str(datetime.datetime.utcnow()), msg)
logger.log(level, msg)
self.log = "{old_log}\n{msg}".format(old_log=self.log, msg=msg)
if commit and 'test' not in sys.argv:
Expand All @@ -300,17 +301,15 @@ def perform_action(self, dem_index, *args, **kwargs):
raise NotImplemented("override in subclasses")

def to_in_progress(self):
if self.status == self.STATUS_QUEUED:
self.start = timezone.now()
self.status = self.STATUS_IN_PROGRESS
self.argv = " ".join(sys.argv)
self.save()
self.start = timezone.now()
self.status = self.STATUS_IN_PROGRESS
self.argv = " ".join(sys.argv)
self.save()

def to_complete(self):
if self.status == self.STATUS_IN_PROGRESS:
self.status = self.STATUS_COMPLETE
self.end = timezone.now()
self.save()
self.status = self.STATUS_COMPLETE
self.end = timezone.now()
self.save()

def to_aborted(self):
self.status = self.STATUS_ABORTED
Expand Down Expand Up @@ -365,14 +364,32 @@ def add_to_parent_docs_affected(self, num_docs):
"""
parent_docs_affected = self.parent.docs_affected
if self.parent and num_docs:
with transaction.atomic():
parent = (
IndexAction.objects.select_for_update()
.get(id=self.parent.id)
)
parent.docs_affected += num_docs
parent_docs_affected = parent.docs_affected
parent.save()
max_retries = 5
try_num = 1
successful = False
while not successful and try_num < max_retries:
try:
with transaction.atomic():
parent = (
IndexAction.objects.select_for_update()
.get(id=self.parent.id)
)
parent.docs_affected += num_docs
parent_docs_affected = parent.docs_affected
parent.save()
successful = True
except OperationalError as oe:
if "database is locked" in str(oe):
# specific to sql-lite in testing
# https://docs.djangoproject.com/en/2.1/ref/databases/#database-is-locked-errors
try_num += 1
time.sleep(random.random())
if try_num >= max_retries:
msg = "Exceeded number of retries while updating parent docs affected for {}"
msg.format(str(self))
logger.warning(msg)
else:
raise
return parent_docs_affected

def check_child_statuses(self):
Expand All @@ -390,6 +407,10 @@ def check_child_statuses(self):
else:
self.add_logs("No child tasks found! Please ensure there was work to be done.", level=logger.WARNING)

def get_task_kwargs(self):
if self.task_kwargs:
return json.loads(self.task_kwargs)
return {}

"""
↓ Action Mixins Below ↓
Expand Down Expand Up @@ -1052,8 +1073,9 @@ def perform_action(self, dem_index, *args, **kwargs):
runtimes = []
docs_per_batch = []
for child in self.children.all():
delta = child.end - child.start
runtimes.append(delta.total_seconds())
if child.end and child.start:
delta = child.end - child.start
runtimes.append(delta.total_seconds())
docs_per_batch.append(child.docs_affected)

self._avg_batch_runtime = 'unknown'
Expand Down Expand Up @@ -1197,6 +1219,7 @@ def perform_action(self, dem_index, *args, **kwargs):

start = kwargs["start_index"]
end = kwargs["end_index"]
pks = kwargs["pks"]
self._workers = kwargs["workers"]
self._total_docs_expected = kwargs["total_docs_expected"]
verbosity = kwargs["verbosity"]
Expand All @@ -1217,15 +1240,7 @@ def perform_action(self, dem_index, *args, **kwargs):
use_self_dict_format=True
)

if self._workers and self._total_docs_expected:
# ensure workers don't overload dbs by being sync'd up
# if we're less than 10% done, put a little randomness
# in between the workers to dither query load
if (self.parent.docs_affected / self._total_docs_expected) < 0.1:
time.sleep(random.random() * 2)

qs = doc_type.get_queryset()
current_qs = qs[start:end]
current_qs = doc_type.get_queryset().filter(id__in=pks)

retries = 0
success, failed = (0, 0)
Expand Down Expand Up @@ -1293,6 +1308,7 @@ def perform_action(self, dem_index, *args, **kwargs):
" # parent estimated runtime remaining: {_expected_parent_runtime}\n"
" # num workers {_workers}\n"
" # pid: {_pid}\n"
" # IndexAction id: {id}\n"
),
use_self_dict_format=True
)
Expand Down
27 changes: 18 additions & 9 deletions test_settings.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
from __future__ import (absolute_import, division, print_function, unicode_literals)
"""
These settings are here to use during tests, because django requires them.
In a real-world use case, apps in this project are installed into other
Django applications, so these settings will not be used.
"""

from __future__ import print_function
from __future__ import absolute_import, unicode_literals

import logging
import os
import subprocess
import sys
from logging import config as logging_config
import subprocess

import django
from os.path import abspath, dirname, join
Expand All @@ -30,14 +28,25 @@ def root(*args):
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': 'old.db',
'USER': '',
'PASSWORD': '',
'HOST': '',
'PORT': '',
'NAME': 'local.db',
},
'mp_testdb': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': 'test.db',
'TEST': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': 'test.db',
}
}
}

if 'test' in sys.argv and '--tag=multiprocessing' in sys.argv:
print("detected multiprocessing in sys.argv: {}".format(sys.argv))
DATABASES['default'] = DATABASES['mp_testdb']
from pprint import pprint
pprint(DATABASES['default'])


INSTALLED_APPS = (
'django.contrib.auth',
'django.contrib.contenttypes',
Expand Down
Loading

0 comments on commit 0252c5b

Please sign in to comment.