diff --git a/django_project/georepo/admin.py b/django_project/georepo/admin.py index d68f58a1..b50ae6da 100644 --- a/django_project/georepo/admin.py +++ b/django_project/georepo/admin.py @@ -171,6 +171,32 @@ def generate_simplified_geometry(modeladmin, request, queryset): ) +def generate_simplified_geometry_low_memory(modeladmin, request, queryset): + from georepo.tasks.simplify_geometry import simplify_geometry_in_dataset + from celery.result import AsyncResult + from core.celery import app + for dataset in queryset: + if dataset.simplification_task_id: + res = AsyncResult(dataset.simplification_task_id) + if not res.ready(): + app.control.revoke( + dataset.simplification_task_id, + terminate=True + ) + task = simplify_geometry_in_dataset.delay(dataset.id, True) + dataset.simplification_task_id = task.id + dataset.simplification_progress = 'Started' + dataset.save( + update_fields=['simplification_task_id', + 'simplification_progress'] + ) + modeladmin.message_user( + request, + 'Dataset entity simplification will be run in background!', + messages.SUCCESS + ) + + def do_dataset_patch(modeladmin, request, queryset): from georepo.tasks.dataset_patch import dataset_patch for dataset in queryset: @@ -341,6 +367,7 @@ class DatasetAdmin(GuardedModelAdmin): actions = [ delete_selected, populate_default_tile_config, generate_simplified_geometry, + generate_simplified_geometry_low_memory, do_dataset_patch, refresh_dynamic_views, populate_default_admin_level_names, generate_arcgis_config_action, diff --git a/django_project/georepo/tasks/simplify_geometry.py b/django_project/georepo/tasks/simplify_geometry.py index 528443b3..b9f6bcd1 100644 --- a/django_project/georepo/tasks/simplify_geometry.py +++ b/django_project/georepo/tasks/simplify_geometry.py @@ -3,18 +3,22 @@ from georepo.models import Dataset, DatasetView from georepo.utils.mapshaper import ( simplify_for_dataset, - simplify_for_dataset_view + simplify_for_dataset_view, + simplify_for_dataset_low_memory ) logger = logging.getLogger(__name__) @shared_task(name="simplify_geometry_in_dataset") -def simplify_geometry_in_dataset(dataset_id): +def simplify_geometry_in_dataset(dataset_id, use_low_memory=False): """Manual trigger of simplification in dataset.""" dataset = Dataset.objects.get(id=dataset_id) logger.info(f'Running simplify geometry for dataset {dataset}') - simplify_for_dataset(dataset) + if use_low_memory: + simplify_for_dataset_low_memory(dataset) + else: + simplify_for_dataset(dataset) logger.info( f'Simplify geometry for dataset {dataset} is finished.') diff --git a/django_project/georepo/utils/mapshaper.py b/django_project/georepo/utils/mapshaper.py index aa1cb59f..08f35001 100644 --- a/django_project/georepo/utils/mapshaper.py +++ b/django_project/georepo/utils/mapshaper.py @@ -4,6 +4,7 @@ import json import fiona import time +import traceback from django.db.models import F from django.db.models.expressions import RawSQL from django.contrib.gis.geos import GEOSGeometry, Polygon, MultiPolygon @@ -104,7 +105,79 @@ def export_entities_to_geojson(file_path, queryset, level): f'to {file_path} with size {convert_size(file_size)}') -def do_simplify(input_file_path, tolerance, level): +def export_entities_to_geojson_with_neighbors(file_path, dataset, + entity_id, view=None): + with open(file_path, "w") as geojson_file: + geojson_file.write('{\n') + geojson_file.write('"type": "FeatureCollection",\n') + geojson_file.write('"features": [\n') + idx = 0 + single_entity = GeographicalEntity.objects.filter( + id=entity_id + ).annotate( + rhr_geom=AsGeoJSON( + ForcePolygonCCW(F('geometry')), + precision=6 + ) + ).values('id', 'rhr_geom', 'level', 'geometry')[0] + data = SimpleGeographicalGeojsonSerializer( + single_entity, + many=False + ).data + data['geometry'] = '{geom_placeholder}' + feature_str = json.dumps(data) + feature_str = feature_str.replace( + '"{geom_placeholder}"', + single_entity['rhr_geom'] + ) + geojson_file.write(feature_str) + + # query other entities + other_entities = GeographicalEntity.objects.filter( + dataset=dataset, + level=single_entity['level'] + ) + if view: + other_entities = filter_entities_view( + view, single_entity['level'], other_entities) + other_entities = other_entities.filter( + geometry__touches=single_entity['geometry'] + ).exclude( + id=entity_id + ).annotate( + rhr_geom=AsGeoJSON( + ForcePolygonCCW(F('geometry')), + precision=6 + ) + ).values('id', 'rhr_geom') + total_count = other_entities.count() + if total_count > 0: + geojson_file.write(',\n') + else: + geojson_file.write('\n') + + for entity in other_entities.iterator(chunk_size=2): + data = SimpleGeographicalGeojsonSerializer( + entity, + many=False + ).data + data['geometry'] = '{geom_placeholder}' + feature_str = json.dumps(data) + feature_str = feature_str.replace( + '"{geom_placeholder}"', + entity['rhr_geom'] + ) + geojson_file.write(feature_str) + if idx == total_count - 1: + geojson_file.write('\n') + else: + geojson_file.write(',\n') + idx += 1 + geojson_file.write(']\n') + geojson_file.write('}\n') + + +def do_simplify(input_file_path, tolerance, level, show_log=True): if tolerance == 1: return input_file_path output_file = NamedTemporaryFile( @@ -117,29 +190,36 @@ def do_simplify(input_file_path, tolerance, level): output_file.name, tolerance ) - logger.info('Mapshaper commands:') - logger.info(commands) + if show_log: + logger.info('Mapshaper commands:') + logger.info(commands) result = subprocess.run(commands, capture_output=True) output = result.stdout.decode() - logger.info(output) + if show_log: + logger.info(output) if result.returncode != 0: error = result.stderr.decode() logger.error('Failed to simplify with commands') logger.error(commands) logger.error(error) raise RuntimeError(error) - file_size = os.path.getsize(output_file.name) - logger.info(f'Entities level {level} are simplified ' - f'to {output_file.name} with size {convert_size(file_size)}') + if show_log: + file_size = os.path.getsize(output_file.name) + logger.info(f'Entities level {level} are simplified ' + f'to {output_file.name} with size ' + f'{convert_size(file_size)}') return output_file.name -def read_output_simplification(output_file_path, tolerance, view=None): +def read_output_simplification(output_file_path, tolerance, + view=None, input_ids=None): """Read output simplification geojson and insert into Temp table""" data = [] with fiona.open(output_file_path, encoding='utf-8') as collection: for feature in collection: - entity_id = feature['id'] + entity_id = int(feature['id']) + if input_ids is not None and entity_id not in input_ids: + continue entity = GeographicalEntity.objects.filter( id=entity_id ).first() @@ -234,11 +314,7 @@ def copy_entities(dataset: Dataset, level: int, view: DatasetView = None): EntitySimplified.objects.bulk_create(data) -def simplify_for_dataset( - dataset: Dataset, - **kwargs -): - start = time.time() +def on_simplify_for_dataset_started(dataset: Dataset): logger.info(f'Simplification config for dataset {dataset}') dataset.simplification_progress = ( 'Entity simplification starts' @@ -249,6 +325,41 @@ def simplify_for_dataset( 'simplification_sync_status', 'simplification_progress_num']) logger.info(dataset.simplification_progress) + + +def on_simplify_for_dataset_finished(dataset: Dataset, is_success: bool): + if is_success: + # success + dataset.simplification_progress = ( + 'Entity simplification finished' + ) + dataset.is_simplified = True + dataset.simplification_sync_status = Dataset.SyncStatus.SYNCED + dataset.save(update_fields=[ + 'simplification_progress', 'is_simplified', + 'simplification_sync_status']) + logger.info(dataset.simplification_progress) + else: + # error + logger.error('Dataset simplification got error ' + f'at {dataset.simplification_progress}') + dataset.simplification_progress = ( + 'Entity simplification error ' + f'at {dataset.simplification_progress}' + ) + dataset.is_simplified = False + dataset.simplification_sync_status = Dataset.SyncStatus.ERROR + dataset.save(update_fields=[ + 'simplification_progress', 'is_simplified', + 'simplification_sync_status']) + + +def simplify_for_dataset( + dataset: Dataset, + **kwargs +): + start = time.time() + on_simplify_for_dataset_started(dataset) tolerances = get_dataset_simplification(dataset) logger.info(tolerances) total_simplification = 0 @@ -351,6 +462,7 @@ def simplify_for_dataset( f'level {level} factor {simplify_factor}!' ) logger.error(ex) + logger.error(traceback.format_exc()) raise ex finally: if ( @@ -364,34 +476,12 @@ def simplify_for_dataset( finally: if input_file and os.path.exists(input_file.name): os.remove(input_file.name) - if processed_count == total_simplification: - # success - dataset.simplification_progress = ( - 'Entity simplification finished' - ) - dataset.is_simplified = True - dataset.simplification_sync_status = Dataset.SyncStatus.SYNCED - dataset.save(update_fields=[ - 'simplification_progress', 'is_simplified', - 'simplification_sync_status']) - logger.info(dataset.simplification_progress) - else: - # error - logger.error('Dataset simplification got error ' - f'at {dataset.simplification_progress}') - dataset.simplification_progress = ( - 'Entity simplification error ' - f'at {dataset.simplification_progress}' - ) - dataset.is_simplified = False - dataset.simplification_sync_status = Dataset.SyncStatus.ERROR - dataset.save(update_fields=[ - 'simplification_progress', 'is_simplified', - 'simplification_sync_status']) + is_simplify_success = processed_count == total_simplification + on_simplify_for_dataset_finished(dataset, is_simplify_success) end = time.time() if kwargs.get('log_object'): kwargs.get('log_object').add_log('simplify_for_dataset', end - start) - return processed_count == total_simplification + return is_simplify_success def simplify_for_dataset_view( @@ -524,6 +614,7 @@ def simplify_for_dataset_view( f'level {level} at factor {simplify_factor}!' ) logger.error(ex) + logger.error(traceback.format_exc()) raise ex finally: if ( @@ -564,3 +655,82 @@ def simplify_for_dataset_view( end - start ) return processed_count == total_simplification + + +def simplify_for_dataset_low_memory(dataset: Dataset, **kwargs): + start = time.time() + on_simplify_for_dataset_started(dataset) + tolerances = get_dataset_simplification(dataset) + logger.info(tolerances) + dataset.simplification_progress = '0%' + dataset.simplification_progress_num = 0 + dataset.save(update_fields=['simplification_progress', + 'simplification_progress_num']) + entities = GeographicalEntity.objects.filter( + dataset=dataset, + level__in=list(tolerances.keys()) + ).values('id', 'level', 'internal_code').order_by('id') + total_entities = entities.count() + processed_count = 0 + for entity in entities: + entity_id = entity['id'] + level = entity['level'] + code = entity['internal_code'] + input_file = None + output_file_path = None + try: + # clear existing entities + existing_entities = EntitySimplified.objects.filter( + geographical_entity_id=entity_id + ) + existing_entities._raw_delete(existing_entities.db) + tolerance_values = tolerances[level] + if len(tolerance_values) == 0: + processed_count += 1 + continue + # export entities to geojson file + input_file = NamedTemporaryFile( + delete=False, + suffix='.geojson', + dir=getattr(settings, 'FILE_UPLOAD_TEMP_DIR', None) + ) + # do export here + export_entities_to_geojson_with_neighbors( + input_file.name, dataset, entity_id) + for simplify_factor in tolerance_values: + if simplify_factor == 1: + entity_with_geom = GeographicalEntity.objects.get( + id=entity_id + ) + EntitySimplified.objects.create( + geographical_entity=entity_with_geom, + simplify_tolerance=1, + simplified_geometry=entity_with_geom.geometry + ) + else: + output_file_path = do_simplify( + input_file.name, simplify_factor, level, False + ) + read_output_simplification( + output_file_path, simplify_factor, + input_ids=[entity_id] + ) + if output_file_path and os.path.exists(output_file_path): + os.remove(output_file_path) + processed_count += 1 + except Exception as ex: + logger.error(f'Failed to simplify dataset {dataset} - ' + f'entity {entity_id} - {code}!') + logger.error(ex) + logger.error(traceback.format_exc()) + finally: + if input_file and os.path.exists(input_file.name): + os.remove(input_file.name) + if output_file_path and os.path.exists(output_file_path): + os.remove(output_file_path) + is_simplify_success = processed_count == total_entities + on_simplify_for_dataset_finished(dataset, is_simplify_success) + end = time.time() + if kwargs.get('log_object'): + kwargs.get('log_object').add_log('simplify_for_dataset', end - start) + return is_simplify_success