Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add simplification logic for dataset for each entity #164

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions django_project/georepo/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,32 @@ def generate_simplified_geometry(modeladmin, request, queryset):
)


def generate_simplified_geometry_low_memory(modeladmin, request, queryset):
from georepo.tasks.simplify_geometry import simplify_geometry_in_dataset
from celery.result import AsyncResult
from core.celery import app
for dataset in queryset:
if dataset.simplification_task_id:
res = AsyncResult(dataset.simplification_task_id)
if not res.ready():
app.control.revoke(
dataset.simplification_task_id,
terminate=True
)
task = simplify_geometry_in_dataset.delay(dataset.id, True)
dataset.simplification_task_id = task.id
dataset.simplification_progress = 'Started'
dataset.save(
update_fields=['simplification_task_id',
'simplification_progress']
)
modeladmin.message_user(
request,
'Dataset entity simplification will be run in background!',
messages.SUCCESS
)


def do_dataset_patch(modeladmin, request, queryset):
from georepo.tasks.dataset_patch import dataset_patch
for dataset in queryset:
Expand Down Expand Up @@ -341,6 +367,7 @@ class DatasetAdmin(GuardedModelAdmin):
actions = [
delete_selected,
populate_default_tile_config, generate_simplified_geometry,
generate_simplified_geometry_low_memory,
do_dataset_patch, refresh_dynamic_views,
populate_default_admin_level_names,
generate_arcgis_config_action,
Expand Down
10 changes: 7 additions & 3 deletions django_project/georepo/tasks/simplify_geometry.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,22 @@
from georepo.models import Dataset, DatasetView
from georepo.utils.mapshaper import (
simplify_for_dataset,
simplify_for_dataset_view
simplify_for_dataset_view,
simplify_for_dataset_low_memory
)

logger = logging.getLogger(__name__)


@shared_task(name="simplify_geometry_in_dataset")
def simplify_geometry_in_dataset(dataset_id):
def simplify_geometry_in_dataset(dataset_id, use_low_memory=False):
"""Manual trigger of simplification in dataset."""
dataset = Dataset.objects.get(id=dataset_id)
logger.info(f'Running simplify geometry for dataset {dataset}')
simplify_for_dataset(dataset)
if use_low_memory:
simplify_for_dataset_low_memory(dataset)
else:
simplify_for_dataset(dataset)
logger.info(
f'Simplify geometry for dataset {dataset} is finished.')

Expand Down
248 changes: 209 additions & 39 deletions django_project/georepo/utils/mapshaper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import json
import fiona
import time
import traceback
from django.db.models import F
from django.db.models.expressions import RawSQL
from django.contrib.gis.geos import GEOSGeometry, Polygon, MultiPolygon
Expand Down Expand Up @@ -104,7 +105,79 @@ def export_entities_to_geojson(file_path, queryset, level):
f'to {file_path} with size {convert_size(file_size)}')


def do_simplify(input_file_path, tolerance, level):
def export_entities_to_geojson_with_neighbors(file_path, dataset,
entity_id, view=None):
with open(file_path, "w") as geojson_file:
geojson_file.write('{\n')
geojson_file.write('"type": "FeatureCollection",\n')
geojson_file.write('"features": [\n')
idx = 0
single_entity = GeographicalEntity.objects.filter(
id=entity_id
).annotate(
rhr_geom=AsGeoJSON(
ForcePolygonCCW(F('geometry')),
precision=6
)
).values('id', 'rhr_geom', 'level', 'geometry')[0]
data = SimpleGeographicalGeojsonSerializer(
single_entity,
many=False
).data
data['geometry'] = '{geom_placeholder}'
feature_str = json.dumps(data)
feature_str = feature_str.replace(
'"{geom_placeholder}"',
single_entity['rhr_geom']
)
geojson_file.write(feature_str)

# query other entities
other_entities = GeographicalEntity.objects.filter(
dataset=dataset,
level=single_entity['level']
)
if view:
other_entities = filter_entities_view(
view, single_entity['level'], other_entities)
other_entities = other_entities.filter(
geometry__touches=single_entity['geometry']
).exclude(
id=entity_id
).annotate(
rhr_geom=AsGeoJSON(
ForcePolygonCCW(F('geometry')),
precision=6
)
).values('id', 'rhr_geom')
total_count = other_entities.count()
if total_count > 0:
geojson_file.write(',\n')
else:
geojson_file.write('\n')

for entity in other_entities.iterator(chunk_size=2):
data = SimpleGeographicalGeojsonSerializer(
entity,
many=False
).data
data['geometry'] = '{geom_placeholder}'
feature_str = json.dumps(data)
feature_str = feature_str.replace(
'"{geom_placeholder}"',
entity['rhr_geom']
)
geojson_file.write(feature_str)
if idx == total_count - 1:
geojson_file.write('\n')
else:
geojson_file.write(',\n')
idx += 1
geojson_file.write(']\n')
geojson_file.write('}\n')


def do_simplify(input_file_path, tolerance, level, show_log=True):
if tolerance == 1:
return input_file_path
output_file = NamedTemporaryFile(
Expand All @@ -117,29 +190,36 @@ def do_simplify(input_file_path, tolerance, level):
output_file.name,
tolerance
)
logger.info('Mapshaper commands:')
logger.info(commands)
if show_log:
logger.info('Mapshaper commands:')
logger.info(commands)
result = subprocess.run(commands, capture_output=True)
output = result.stdout.decode()
logger.info(output)
if show_log:
logger.info(output)
if result.returncode != 0:
error = result.stderr.decode()
logger.error('Failed to simplify with commands')
logger.error(commands)
logger.error(error)
raise RuntimeError(error)
file_size = os.path.getsize(output_file.name)
logger.info(f'Entities level {level} are simplified '
f'to {output_file.name} with size {convert_size(file_size)}')
if show_log:
file_size = os.path.getsize(output_file.name)
logger.info(f'Entities level {level} are simplified '
f'to {output_file.name} with size '
f'{convert_size(file_size)}')
return output_file.name


def read_output_simplification(output_file_path, tolerance, view=None):
def read_output_simplification(output_file_path, tolerance,
view=None, input_ids=None):
"""Read output simplification geojson and insert into Temp table"""
data = []
with fiona.open(output_file_path, encoding='utf-8') as collection:
for feature in collection:
entity_id = feature['id']
entity_id = int(feature['id'])
if input_ids is not None and entity_id not in input_ids:
continue
entity = GeographicalEntity.objects.filter(
id=entity_id
).first()
Expand Down Expand Up @@ -234,11 +314,7 @@ def copy_entities(dataset: Dataset, level: int, view: DatasetView = None):
EntitySimplified.objects.bulk_create(data)


def simplify_for_dataset(
dataset: Dataset,
**kwargs
):
start = time.time()
def on_simplify_for_dataset_started(dataset: Dataset):
logger.info(f'Simplification config for dataset {dataset}')
dataset.simplification_progress = (
'Entity simplification starts'
Expand All @@ -249,6 +325,41 @@ def simplify_for_dataset(
'simplification_sync_status',
'simplification_progress_num'])
logger.info(dataset.simplification_progress)


def on_simplify_for_dataset_finished(dataset: Dataset, is_success: bool):
if is_success:
# success
dataset.simplification_progress = (
'Entity simplification finished'
)
dataset.is_simplified = True
dataset.simplification_sync_status = Dataset.SyncStatus.SYNCED
dataset.save(update_fields=[
'simplification_progress', 'is_simplified',
'simplification_sync_status'])
logger.info(dataset.simplification_progress)
else:
# error
logger.error('Dataset simplification got error '
f'at {dataset.simplification_progress}')
dataset.simplification_progress = (
'Entity simplification error '
f'at {dataset.simplification_progress}'
)
dataset.is_simplified = False
dataset.simplification_sync_status = Dataset.SyncStatus.ERROR
dataset.save(update_fields=[
'simplification_progress', 'is_simplified',
'simplification_sync_status'])


def simplify_for_dataset(
dataset: Dataset,
**kwargs
):
start = time.time()
on_simplify_for_dataset_started(dataset)
tolerances = get_dataset_simplification(dataset)
logger.info(tolerances)
total_simplification = 0
Expand Down Expand Up @@ -351,6 +462,7 @@ def simplify_for_dataset(
f'level {level} factor {simplify_factor}!'
)
logger.error(ex)
logger.error(traceback.format_exc())
raise ex
finally:
if (
Expand All @@ -364,34 +476,12 @@ def simplify_for_dataset(
finally:
if input_file and os.path.exists(input_file.name):
os.remove(input_file.name)
if processed_count == total_simplification:
# success
dataset.simplification_progress = (
'Entity simplification finished'
)
dataset.is_simplified = True
dataset.simplification_sync_status = Dataset.SyncStatus.SYNCED
dataset.save(update_fields=[
'simplification_progress', 'is_simplified',
'simplification_sync_status'])
logger.info(dataset.simplification_progress)
else:
# error
logger.error('Dataset simplification got error '
f'at {dataset.simplification_progress}')
dataset.simplification_progress = (
'Entity simplification error '
f'at {dataset.simplification_progress}'
)
dataset.is_simplified = False
dataset.simplification_sync_status = Dataset.SyncStatus.ERROR
dataset.save(update_fields=[
'simplification_progress', 'is_simplified',
'simplification_sync_status'])
is_simplify_success = processed_count == total_simplification
on_simplify_for_dataset_finished(dataset, is_simplify_success)
end = time.time()
if kwargs.get('log_object'):
kwargs.get('log_object').add_log('simplify_for_dataset', end - start)
return processed_count == total_simplification
return is_simplify_success


def simplify_for_dataset_view(
Expand Down Expand Up @@ -524,6 +614,7 @@ def simplify_for_dataset_view(
f'level {level} at factor {simplify_factor}!'
)
logger.error(ex)
logger.error(traceback.format_exc())
raise ex
finally:
if (
Expand Down Expand Up @@ -564,3 +655,82 @@ def simplify_for_dataset_view(
end - start
)
return processed_count == total_simplification


def simplify_for_dataset_low_memory(dataset: Dataset, **kwargs):
start = time.time()
on_simplify_for_dataset_started(dataset)
tolerances = get_dataset_simplification(dataset)
logger.info(tolerances)
dataset.simplification_progress = '0%'
dataset.simplification_progress_num = 0
dataset.save(update_fields=['simplification_progress',
'simplification_progress_num'])
entities = GeographicalEntity.objects.filter(
dataset=dataset,
level__in=list(tolerances.keys())
).values('id', 'level', 'internal_code').order_by('id')
total_entities = entities.count()
processed_count = 0
for entity in entities:
entity_id = entity['id']
level = entity['level']
code = entity['internal_code']
input_file = None
output_file_path = None
try:
# clear existing entities
existing_entities = EntitySimplified.objects.filter(
geographical_entity_id=entity_id
)
existing_entities._raw_delete(existing_entities.db)
tolerance_values = tolerances[level]
if len(tolerance_values) == 0:
processed_count += 1
continue
# export entities to geojson file
input_file = NamedTemporaryFile(
delete=False,
suffix='.geojson',
dir=getattr(settings, 'FILE_UPLOAD_TEMP_DIR', None)
)
# do export here
export_entities_to_geojson_with_neighbors(
input_file.name, dataset, entity_id)
for simplify_factor in tolerance_values:
if simplify_factor == 1:
entity_with_geom = GeographicalEntity.objects.get(
id=entity_id
)
EntitySimplified.objects.create(
geographical_entity=entity_with_geom,
simplify_tolerance=1,
simplified_geometry=entity_with_geom.geometry
)
else:
output_file_path = do_simplify(
input_file.name, simplify_factor, level, False
)
read_output_simplification(
output_file_path, simplify_factor,
input_ids=[entity_id]
)
if output_file_path and os.path.exists(output_file_path):
os.remove(output_file_path)
processed_count += 1
except Exception as ex:
logger.error(f'Failed to simplify dataset {dataset} - '
f'entity {entity_id} - {code}!')
logger.error(ex)
logger.error(traceback.format_exc())
finally:
if input_file and os.path.exists(input_file.name):
os.remove(input_file.name)
if output_file_path and os.path.exists(output_file_path):
os.remove(output_file_path)
is_simplify_success = processed_count == total_entities
on_simplify_for_dataset_finished(dataset, is_simplify_success)
end = time.time()
if kwargs.get('log_object'):
kwargs.get('log_object').add_log('simplify_for_dataset', end - start)
return is_simplify_success
Loading