From dd6b149f5b7d4f406ad61952f4a7cfcd612d3a12 Mon Sep 17 00:00:00 2001 From: Bas Couwenberg Date: Tue, 19 Jul 2022 12:24:50 +0200 Subject: [PATCH 01/10] Add stetl.filters.bagfilter.LeveringFilter class --- stetl/filters/bagfilter.py | 58 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 stetl/filters/bagfilter.py diff --git a/stetl/filters/bagfilter.py b/stetl/filters/bagfilter.py new file mode 100644 index 0000000..ece19ef --- /dev/null +++ b/stetl/filters/bagfilter.py @@ -0,0 +1,58 @@ +# BAG related filters + +from stetl.component import Config +from stetl.util import Util +from stetl.filter import Filter +from stetl.packet import FORMAT + +log = Util.get_log("bagfilter") + + +class LeveringFilter(Filter): + """ + Convert Leveringsdocument-BAG-Extract.xml content to record for + insertion into nlx_bag_info table. + """ + + @Config(ptype=str, default='sleutel', required=False) + def key_column(self): + """ + Column name for key + """ + pass + + @Config(ptype=str, default='levering_xml', required=False) + def key_value(self): + """ + Column value for key + """ + pass + + @Config(ptype=str, default='waarde', required=False) + def value_column(self): + """ + Column name for value + """ + pass + + # Constructor + def __init__(self, configdict, section, consumes=FORMAT.string, produces=FORMAT.record_array): + Filter.__init__(self, configdict, section, consumes, produces) + + def invoke(self, packet): + if packet.data is None or packet.is_end_of_stream(): + return packet + + with open(packet.data, 'rt') as f: + data = f.read() + + record = { + self.key_column: self.key_value, + self.value_column: data, + } + + # record_array is used to avoid ValueError: + # https://github.com/geopython/stetl/issues/125 + packet.data = [record] + + return packet From a3bc06bc7720e7e89ade5dd41053707e5f543c16 Mon Sep 17 00:00:00 2001 From: Bas Couwenberg Date: Tue, 19 Jul 2022 12:25:54 +0200 Subject: [PATCH 02/10] Add stetl.outputs.bagoutput.BAGOutput class --- stetl/bagutil.py | 50 + stetl/outputs/bagoutput.py | 3056 ++++++++++++++++++++++++++++++++++++ 2 files changed, 3106 insertions(+) create mode 100644 stetl/bagutil.py create mode 100644 stetl/outputs/bagoutput.py diff --git a/stetl/bagutil.py b/stetl/bagutil.py new file mode 100644 index 0000000..13d8112 --- /dev/null +++ b/stetl/bagutil.py @@ -0,0 +1,50 @@ +import os +import shutil +import zipfile + +from stetl.util import Util + + +log = Util.get_log('bagutil') + + +class BAGUtil: + """ + Helper functions for BAG 2.0 Extract processing + """ + + @staticmethod + def extract_zip_file(zip_file, temp_dir): + extracted = [] + + with zipfile.ZipFile(zip_file) as z: + for name in z.namelist(): + temp_file = os.path.join(temp_dir, name) + + log.info( + "Extracting %s from %s to %s" % ( + name, + zip_file, + temp_file, + ) + ) + + z.extract(name, path=temp_dir) + + extracted.append(temp_file) + + return extracted + + @staticmethod + def remove_temp_file(temp_file): + log.info("Removing temp file: %s" % temp_file) + os.unlink(temp_file) + + return True + + @staticmethod + def remove_temp_dir(temp_dir): + log.info("Removing temp dir: %s" % temp_dir) + shutil.rmtree(temp_dir) + + return True diff --git a/stetl/outputs/bagoutput.py b/stetl/outputs/bagoutput.py new file mode 100644 index 0000000..1a88f15 --- /dev/null +++ b/stetl/outputs/bagoutput.py @@ -0,0 +1,3056 @@ +import csv +import os +import pprint +import re +import zipfile + +from lxml import etree +from osgeo import ogr +from psycopg2 import sql + +from stetl.bagutil import BAGUtil +from stetl.output import Output +from stetl.util import Util +from stetl.packet import FORMAT +from stetl.component import Config +from stetl.postgis import PostGIS + +log = Util.get_log('bagoutput') + +xmlns = { + 'xsi': ( + 'http://www.w3.org/2001/XMLSchema-instance' + ), + 'xs': ( + 'http://www.w3.org/2001/XMLSchema' + ), + + 'gml': ( + 'http://www.opengis.net/gml/3.2' + ), + + 'xb': ( + 'http://www.kadaster.nl/schemas/lvbag/extract-levering/v20200601' + ), + 'selecties-extract': ( + 'http://www.kadaster.nl/schemas/lvbag/extract-selecties/v20200601' + ), + + 'sl-bag-extract': ( + 'http://www.kadaster.nl/schemas/lvbag/extract-deelbestand-lvc/v20200601' + ), + 'sl': ( + 'http://www.kadaster.nl/schemas/standlevering-generiek/1.0' + ), + + 'bagtypes': ( + 'www.kadaster.nl/schemas/lvbag/gem-wpl-rel/bag-types/v20200601' + ), + 'gwr-bestand': ( + 'www.kadaster.nl/schemas/lvbag/gem-wpl-rel/gwr-deelbestand-lvc/v20200601' + ), + 'gwr-product': ( + 'www.kadaster.nl/schemas/lvbag/gem-wpl-rel/gwr-producten-lvc/v20200601' + ), + + 'DatatypenNEN3610': ( + 'www.kadaster.nl/schemas/lvbag/imbag/datatypennen3610/v20200601' + ), + 'Historie': ( + 'www.kadaster.nl/schemas/lvbag/imbag/historie/v20200601' + ), + 'KenmerkInOnderzoek': ( + 'www.kadaster.nl/schemas/lvbag/imbag/kenmerkinonderzoek/v20200601' + ), + 'nen5825': ( + 'www.kadaster.nl/schemas/lvbag/imbag/nen5825/v20200601' + ), + 'Objecten': ( + 'www.kadaster.nl/schemas/lvbag/imbag/objecten/v20200601' + ), + 'Objecten-ref': ( + 'www.kadaster.nl/schemas/lvbag/imbag/objecten-ref/v20200601' + ), +} + + +class BAGOutput(Output): + """ + Process BAG 2.0 Extract files + """ + + @Config(ptype=str, required=True, default=None) + def database(self): + """ + Database name + """ + pass + + @Config(ptype=str, required=False, default=None) + def user(self): + """ + Dabase username + """ + pass + + @Config(ptype=str, required=False, default=None) + def password(self): + """ + Database password + """ + pass + + @Config(ptype=str, required=False, default=None) + def host(self): + """ + Database host + """ + pass + + @Config(ptype=str, required=False, default='public') + def schema(self): + """ + Database schema + """ + pass + + @Config(ptype=bool, required=False, default=False) + def truncate(self): + """ + Truncate database tables + """ + pass + + @Config(ptype=bool, required=False, default=True) + def process_inactief(self): + """ + Process Inactief data + """ + pass + + @Config(ptype=bool, required=False, default=True) + def process_in_onderzoek(self): + """ + Process InOnderzoek data + """ + pass + + @Config(ptype=bool, required=False, default=True) + def process_niet_bag(self): + """ + Process NietBag data + """ + pass + + @Config(ptype=bool, required=False, default=False) + def process_lig(self): + """ + Process LIG (Ligplaats) data + """ + pass + + @Config(ptype=bool, required=False, default=False) + def process_num(self): + """ + Process NUM (Nummeraanduiding) data + """ + pass + + @Config(ptype=bool, required=False, default=False) + def process_opr(self): + """ + Process OPR (OpenbareRuimte) data + """ + pass + + @Config(ptype=bool, required=False, default=False) + def process_pnd(self): + """ + Process PND (Pand) data + """ + pass + + @Config(ptype=bool, required=False, default=False) + def process_sta(self): + """ + Process STA (Standplaats) data + """ + pass + + @Config(ptype=bool, required=False, default=False) + def process_vbo(self): + """ + Process VBO (Verblijfsobject) data + """ + pass + + @Config(ptype=bool, required=False, default=False) + def process_wpl(self): + """ + Process WPL (Woonplaats) data + """ + pass + + @Config(ptype=bool, required=False, default=False) + def process_gwr(self): + """ + Process GemeenteWoonplaatsRelatie data + """ + pass + + @Config(ptype=bool, required=False, default=False) + def process_levering(self): + """ + Process Leveringsdocument data + """ + pass + + @Config(ptype=str, required=False, default='/tmp') + def temp_dir(self): + """ + Path for temporary directory + """ + pass + + @Config(ptype=int, default=(1024 * 1024 * 1024), required=False) + def buffer_size(self): + """ + Buffer size for read buffer during extraction + """ + pass + + def __init__(self, configdict, section, consumes=FORMAT.record): + Output.__init__(self, configdict, section, consumes=consumes) + self.db = None + + def init(self): + log.info('Init: connect to DB') + self.db = PostGIS(self.cfg.get_dict()) + self.db.connect() + + def exit(self): + log.info('Exit: disconnect from DB') + self.db.disconnect() + + def update_record(self, table, record, identifiers): + sqlstr = r'UPDATE {table} SET' + format = { + 'table': sql.Identifier(table), + } + param = [] + + i = 0 + for key in record: + if key in identifiers: + continue + + if i > 0: + sqlstr += ',' + + column = 'col' + str(i) + + sqlstr += r' {' + column + r'} = %s' + + format[column] = sql.Identifier(key) + param.append(record[key]) + + i += 1 + + column = 'col' + str(i) + + sqlstr += r' WHERE ' + + i = 0 + for key in identifiers: + if i > 0: + sqlstr += ' AND ' + + column = 'con' + str(i) + + sqlstr += r'{' + column + r'} = %s' + + format[column] = sql.Identifier(key) + param.append(identifiers[key]) + + i += 1 + + log.debug("sqlstr: %s" % sqlstr) + log.debug("format: %s" % format) + + query = sql.SQL(sqlstr).format(**format) + + log.debug("query: %s" % query) + log.debug("param: %s" % pprint.pformat(param)) + + rowcount = self.db.execute(query, param) + + if rowcount == -1: + raise Exception('Query failed: %s' % query) + + def insert_record(self, table, record): + sqlstr = r'INSERT INTO {table} (' + format = { + 'table': sql.Identifier(table), + } + values = r') VALUES (' + param = [] + + i = 0 + for key in record: + if i > 0: + sqlstr += ', ' + values += ', ' + + column = 'col' + str(i) + + sqlstr += r'{' + column + r'}' + values += r'%s' + + format[column] = sql.Identifier(key) + param.append(record[key]) + + i += 1 + + sqlstr += values + r')' + + log.debug("sqlstr: %s" % sqlstr) + log.debug("format: %s" % format) + + query = sql.SQL(sqlstr).format(**format) + + log.debug("query: %s" % query) + log.debug("param: %s" % pprint.pformat(param)) + + rowcount = self.db.execute(query, param) + + if rowcount == -1: + raise Exception('Query failed: %s' % query) + + def upsert_record( + self, + table, + record, + primary_key='gid', + identifiers=None, + ): + update = False + + if identifiers is not None: + sqlstr = r'SELECT {primary_key} FROM {table} WHERE ' + format = { + 'primary_key': sql.Identifier(primary_key), + 'table': sql.Identifier(table), + } + param = [] + + i = 0 + for key in identifiers: + if i > 0: + sqlstr += ' AND ' + + column = 'con' + str(i) + + sqlstr += r'{' + column + r'} = %s' + + format[column] = sql.Identifier(key) + param.append(identifiers[key]) + + i += 1 + + log.debug("sqlstr: %s" % sqlstr) + log.debug("format: %s" % format) + + query = sql.SQL(sqlstr).format(**format) + + log.debug("query: %s" % query) + log.debug("param: %s" % pprint.pformat(param)) + + rowcount = self.db.execute(query, param) + + log.debug("rowcount: %s" % rowcount) + + if rowcount == -1: + raise Exception('Query failed: %s' % query) + elif rowcount == 0: + update = False + elif rowcount == 1: + update = True + else: + raise Exception( + '%d rows returned for query: %s' % ( + rowcount, + query, + ) + ) + + if update: + self.update_record(table, record, identifiers) + else: + self.insert_record(table, record) + + def create_enum(self, name, values): + log.info("Creating ENUM: %s" % name) + + format = { + 'name': sql.Identifier(name.lower()), + } + + sqlstr = r'CREATE TYPE {name} AS ENUM (' + + i = 0 + for value in values: + if i > 0: + sqlstr += ', ' + + sqlstr += r'%s' + + i += 1 + + sqlstr += ')' + + log.debug("sqlstr: %s" % sqlstr) + log.debug("format: %s" % format) + + query = sql.SQL(sqlstr).format(**format) + + log.debug("query: %s" % query) + log.debug("param: %s" % pprint.pformat(values)) + + self.db.cursor.execute(query, values) + + def enum_exists(self, name): + query = """ + SELECT typname + FROM pg_type + WHERE LOWER(typname) = %(name)s + """ + param = { + 'name': name.lower(), + } + + log.debug("query: %s" % query) + log.debug("param: %s" % pprint.pformat(param)) + + self.db.cursor.execute(query, param) + + if self.db.cursor.rowcount == 0: + return False + else: + return True + + def truncate_table(self, table): + log.info("Truncating table: %s" % table) + + format = { + 'table': sql.Identifier(table), + } + + query = sql.SQL('TRUNCATE {table}').format(**format) + + log.debug("query: %s" % query) + + self.db.cursor.execute(query) + + def create_table(self, table): + log.info("Creating table: %s" % table) + + enum_type = { + 'GebruiksdoelType': [ + 'woonfunctie', + 'bijeenkomstfunctie', + 'celfunctie', + 'gezondheidszorgfunctie', + 'industriefunctie', + 'kantoorfunctie', + 'logiesfunctie', + 'onderwijsfunctie', + 'sportfunctie', + 'winkelfunctie', + 'overige gebruiksfunctie', + ], + 'StatusNaamgeving': [ + 'Naamgeving uitgegeven', + 'Naamgeving ingetrokken', + ], + 'StatusPand': [ + 'Bouwvergunning verleend', + 'Niet gerealiseerd pand', + 'Bouw gestart', + 'Pand in gebruik (niet ingemeten)', + 'Pand in gebruik', + 'Verbouwing pand', + 'Sloopvergunning verleend', + 'Pand gesloopt', + 'Pand buiten gebruik', + 'Pand ten onrechte opgevoerd', + ], + 'StatusPlaats': [ + 'Plaats aangewezen', + 'Plaats ingetrokken', + ], + 'StatusVerblijfsobject': [ + 'Verblijfsobject gevormd', + 'Niet gerealiseerd verblijfsobject', + 'Verblijfsobject in gebruik (niet ingemeten)', + 'Verblijfsobject in gebruik', + 'Verbouwing verblijfsobject', + 'Verblijfsobject ingetrokken', + 'Verblijfsobject buiten gebruik', + 'Verblijfsobject ten onrechte opgevoerd', + ], + 'StatusWoonplaats': [ + 'Woonplaats aangewezen', + 'Woonplaats ingetrokken', + ], + 'TypeAdresseerbaarObject': [ + 'Verblijfsobject', + 'Standplaats', + 'Ligplaats', + ], + 'TypeOpenbareRuimte': [ + 'Weg', + 'Water', + 'Spoorbaan', + 'Terrein', + 'Kunstwerk', + 'Landschappelijk gebied', + 'Administratief gebied', + ], + + 'InOnderzoekLigplaats': [ + 'geometrie', + 'status', + 'heeft als hoofdadres', + 'heeft als nevenadres', + ], + 'InOnderzoekNummeraanduiding': [ + 'huisnummer', + 'huisletter', + 'huisnummertoevoeging', + 'postcode', + 'type adresseerbaar object', + 'status', + 'ligt in', + 'ligt aan', + ], + 'InOnderzoekOpenbareRuimte': [ + 'naam', + 'type', + 'status', + 'ligt in', + ], + 'InOnderzoekPand': [ + 'geometrie', + 'oorspronkelijk bouwjaar', + 'status', + ], + 'InOnderzoekStandplaats': [ + 'geometrie', + 'status', + 'heeft als hoofdadres', + 'heeft als nevenadres', + ], + 'InOnderzoekVerblijfsobject': [ + 'geometrie', + 'gebruiksdoel', + 'oppervlakte', + 'status', + 'maakt deel uit van', + 'heeft als hoofdadres', + 'heeft als nevenadres', + ], + 'InOnderzoekWoonplaats': [ + 'naam', + 'geometrie', + 'status', + ], + } + + table_enum = { + 'woonplaats': [ + 'StatusWoonplaats', + ], + 'openbareruimte': [ + 'TypeOpenbareRuimte', + 'StatusNaamgeving', + ], + 'nummeraanduiding': [ + 'TypeAdresseerbaarObject', + 'StatusNaamgeving', + ], + 'verblijfsobject': [ + 'StatusVerblijfsobject', + ], + 'ligplaats': [ + 'StatusPlaats', + ], + 'standplaats': [ + 'StatusPlaats', + ], + 'pand': [ + 'StatusPand', + ], + + 'inonderzoek_woonplaats': [ + 'InOnderzoekWoonplaats', + ], + 'inonderzoek_openbareruimte': [ + 'InOnderzoekOpenbareRuimte', + ], + 'inonderzoek_nummeraanduiding': [ + 'InOnderzoekNummeraanduiding', + ], + 'inonderzoek_verblijfsobject': [ + 'InOnderzoekVerblijfsobject', + ], + 'inonderzoek_ligplaats': [ + 'InOnderzoekLigplaats', + ], + 'inonderzoek_standplaats': [ + 'InOnderzoekStandplaats', + ], + 'inonderzoek_pand': [ + 'InOnderzoekPand', + ], + } + + table_column = { + 'gid': { + 'name': 'gid', + 'type': 'serial', + 'null': False, + }, + 'identificatie': { + 'name': 'identificatie', + 'type': 'varchar', + 'null': False, + }, + 'naam': { + 'name': 'naam', + 'type': 'varchar(80)', + 'null': False, + }, + + 'documentdatum': { + 'name': 'documentdatum', + 'type': 'date', + 'null': False, + }, + 'documentnummer': { + 'name': 'documentnummer', + 'type': 'varchar(40)', + 'null': False, + }, + + 'voorkomenidentificatie': { + 'name': 'voorkomenidentificatie', + 'type': 'int', + 'null': False, + }, + 'begingeldigheid': { + 'name': 'begingeldigheid', + 'type': 'date', + 'null': False, + }, + 'eindgeldigheid': { + 'name': 'eindgeldigheid', + 'type': 'date', + 'null': True, + }, + 'tijdstipregistratie': { + 'name': 'tijdstipregistratie', + 'type': 'timestamp', + 'null': False, + }, + 'eindregistratie': { + 'name': 'eindregistratie', + 'type': 'timestamp', + 'null': True, + }, + 'tijdstipinactief': { + 'name': 'tijdstipinactief', + 'type': 'timestamp', + 'null': True, + }, + 'tijdstipregistratielv': { + 'name': 'tijdstipregistratielv', + 'type': 'timestamp', + 'null': False, + }, + 'tijdstipeindregistratielv': { + 'name': 'tijdstipeindregistratielv', + 'type': 'timestamp', + 'null': True, + }, + 'tijdstipinactieflv': { + 'name': 'tijdstipinactieflv', + 'type': 'timestamp', + 'null': True, + }, + 'tijdstipnietbaglv': { + 'name': 'tijdstipnietbaglv', + 'type': 'timestamp', + 'null': True, + }, + } + + table_column_group = { + 'common': [ + { + 'name': 'geconstateerd', + 'type': 'bool', + 'null': False, + }, + { + 'include': 'documentdatum', + }, + { + 'include': 'documentnummer', + }, + ], + 'inonderzoek_common': [ + { + 'name': 'inonderzoek', + 'type': 'bool', + 'null': False, + }, + { + 'include': 'documentdatum', + }, + { + 'include': 'documentnummer', + }, + ], + 'voorkomen': [ + { + 'include': 'voorkomenidentificatie', + }, + { + 'include': 'begingeldigheid', + }, + { + 'include': 'eindgeldigheid', + }, + { + 'include': 'tijdstipregistratie', + }, + { + 'include': 'eindregistratie', + }, + { + 'include': 'tijdstipinactief', + }, + { + 'include': 'tijdstipregistratielv', + }, + { + 'include': 'tijdstipeindregistratielv', + }, + { + 'include': 'tijdstipinactieflv', + }, + { + 'include': 'tijdstipnietbaglv', + }, + ], + 'historieinonderzoek': [ + { + 'include': 'tijdstipregistratie', + }, + { + 'include': 'eindregistratie', + }, + { + 'include': 'begingeldigheid', + }, + { + 'include': 'eindgeldigheid', + }, + { + 'include': 'tijdstipregistratielv', + }, + { + 'include': 'tijdstipeindregistratielv', + }, + ], + 'adresseerbaarobject': [ + { + 'name': 'hoofdadresnummeraanduidingref', + 'type': 'varchar', + 'null': False, + }, + { + 'include_group': 'voorkomen', + }, + { + 'name': 'nevenadresnummeraanduidingref', + 'type': 'varchar[]', + 'null': True, + }, + ], + } + + table_structure = { + 'woonplaats': { + 'columns': [ + { + 'include': 'gid', + }, + { + 'include': 'identificatie', + }, + { + 'include': 'naam', + }, + { + 'name': 'status', + 'type': 'StatusWoonplaats', + 'null': False, + }, + { + 'include_group': 'common', + }, + { + 'include_group': 'voorkomen', + }, + { + 'name': 'wkb_geometry', + 'type': 'geometry(MultiPolygon,28992)', + 'null': False, + }, + ], + 'primary_key': 'gid', + }, + 'openbareruimte': { + 'columns': [ + { + 'include': 'gid', + }, + { + 'include': 'identificatie', + }, + { + 'include': 'naam', + }, + { + 'name': 'type', + 'type': 'TypeOpenbareRuimte', + 'null': False, + }, + { + 'name': 'status', + 'type': 'StatusNaamgeving', + 'null': False, + }, + { + 'include_group': 'common', + }, + { + 'name': 'woonplaatsref', + 'type': 'varchar', + 'null': False, + }, + { + 'include_group': 'voorkomen', + }, + { + 'name': 'verkortenaam', + 'type': 'varchar(24)', + 'null': True, + }, + ], + 'primary_key': 'gid', + }, + 'nummeraanduiding': { + 'columns': [ + { + 'include': 'gid', + }, + { + 'include': 'identificatie', + }, + { + 'name': 'huisnummer', + 'type': 'int', + 'null': False, + }, + { + 'name': 'huisletter', + 'type': 'varchar(1)', + 'null': True, + }, + { + 'name': 'huisnummertoevoeging', + 'type': 'varchar(4)', + 'null': True, + }, + { + 'name': 'postcode', + 'type': 'varchar(6)', + 'null': True, + }, + { + 'name': 'typeadresseerbaarobject', + 'type': 'TypeAdresseerbaarObject', + 'null': False, + }, + { + 'name': 'status', + 'type': 'StatusNaamgeving', + 'null': False, + }, + { + 'include_group': 'common', + }, + { + 'name': 'woonplaatsref', + 'type': 'varchar', + 'null': True, + }, + { + 'include_group': 'voorkomen', + }, + { + 'name': 'openbareruimteref', + 'type': 'varchar', + 'null': False, + }, + ], + 'primary_key': 'gid', + }, + 'verblijfsobject': { + 'columns': [ + { + 'include': 'gid', + }, + { + 'include': 'identificatie', + }, + { + 'name': 'gebruiksdoel', + 'type': 'varchar[]', + 'null': False, + }, + { + 'name': 'oppervlakte', + 'type': 'int', + 'null': False, + }, + { + 'name': 'status', + 'type': 'StatusVerblijfsobject', + 'null': False, + }, + { + 'include_group': 'common', + }, + { + 'name': 'pandref', + 'type': 'varchar[]', + 'null': False, + }, + { + 'include_group': 'adresseerbaarobject', + }, + { + 'name': 'wkb_geometry', + 'type': 'geometry(Point,28992)', + 'null': False, + }, + ], + 'primary_key': 'gid', + }, + 'ligplaats': { + 'columns': [ + { + 'include': 'gid', + }, + { + 'include': 'identificatie', + }, + { + 'name': 'status', + 'type': 'StatusPlaats', + 'null': False, + }, + { + 'include_group': 'common', + }, + { + 'include_group': 'adresseerbaarobject', + }, + { + 'name': 'wkb_geometry', + 'type': 'geometry(Polygon,28992)', + 'null': False, + }, + ], + 'primary_key': 'gid', + }, + 'standplaats': { + 'columns': [ + { + 'include': 'gid', + }, + { + 'include': 'identificatie', + }, + { + 'name': 'status', + 'type': 'StatusPlaats', + 'null': False, + }, + { + 'include_group': 'common', + }, + { + 'include_group': 'adresseerbaarobject', + }, + { + 'name': 'wkb_geometry', + 'type': 'geometry(Polygon,28992)', + 'null': False, + }, + ], + 'primary_key': 'gid', + }, + 'pand': { + 'columns': [ + { + 'include': 'gid', + }, + { + 'include': 'identificatie', + }, + { + 'name': 'oorspronkelijkbouwjaar', + 'type': 'int', + 'null': False, + }, + { + 'name': 'status', + 'type': 'StatusPand', + 'null': False, + }, + { + 'include_group': 'common', + }, + { + 'include_group': 'voorkomen', + }, + { + 'name': 'wkb_geometry', + 'type': 'geometry(Polygon,28992)', + 'null': False, + }, + ], + 'primary_key': 'gid', + }, + + 'inonderzoek_woonplaats': { + 'columns': [ + { + 'include': 'gid', + }, + { + 'name': 'kenmerk', + 'type': 'InOnderzoekWoonplaats', + 'null': False, + }, + { + 'include': 'identificatie', + }, + { + 'include_group': 'inonderzoek_common', + }, + { + 'include_group': 'historieinonderzoek', + }, + ], + 'primary_key': 'gid', + }, + 'inonderzoek_openbareruimte': { + 'columns': [ + { + 'include': 'gid', + }, + { + 'name': 'kenmerk', + 'type': 'InOnderzoekOpenbareRuimte', + 'null': False, + }, + { + 'include': 'identificatie', + }, + { + 'include_group': 'inonderzoek_common', + }, + { + 'include_group': 'historieinonderzoek', + }, + ], + 'primary_key': 'gid', + }, + 'inonderzoek_nummeraanduiding': { + 'columns': [ + { + 'include': 'gid', + }, + { + 'name': 'kenmerk', + 'type': 'InOnderzoekNummeraanduiding', + 'null': False, + }, + { + 'include': 'identificatie', + }, + { + 'include_group': 'inonderzoek_common', + }, + { + 'include_group': 'historieinonderzoek', + }, + ], + 'primary_key': 'gid', + }, + 'inonderzoek_verblijfsobject': { + 'columns': [ + { + 'include': 'gid', + }, + { + 'name': 'kenmerk', + 'type': 'InOnderzoekVerblijfsobject', + 'null': False, + }, + { + 'include': 'identificatie', + }, + { + 'include_group': 'inonderzoek_common', + }, + { + 'include_group': 'historieinonderzoek', + }, + ], + 'primary_key': 'gid', + }, + 'inonderzoek_ligplaats': { + 'columns': [ + { + 'include': 'gid', + }, + { + 'name': 'kenmerk', + 'type': 'InOnderzoekLigplaats', + 'null': False, + }, + { + 'include': 'identificatie', + }, + { + 'include_group': 'inonderzoek_common', + }, + { + 'include_group': 'historieinonderzoek', + }, + ], + 'primary_key': 'gid', + }, + 'inonderzoek_standplaats': { + 'columns': [ + { + 'include': 'gid', + }, + { + 'name': 'kenmerk', + 'type': 'InOnderzoekStandplaats', + 'null': False, + }, + { + 'include': 'identificatie', + }, + { + 'include_group': 'inonderzoek_common', + }, + { + 'include_group': 'historieinonderzoek', + }, + ], + 'primary_key': 'gid', + }, + 'inonderzoek_pand': { + 'columns': [ + { + 'include': 'gid', + }, + { + 'name': 'kenmerk', + 'type': 'InOnderzoekPand', + 'null': False, + }, + { + 'include': 'identificatie', + }, + { + 'include_group': 'inonderzoek_common', + }, + { + 'include_group': 'historieinonderzoek', + }, + ], + 'primary_key': 'gid', + }, + } + + for prefix in [ + 'inactief', + 'nietbag', + ]: + for suffix in [ + 'woonplaats', + 'openbareruimte', + 'nummeraanduiding', + 'verblijfsobject', + 'ligplaats', + 'standplaats', + 'pand', + ]: + key = '%s_%s' % (prefix, suffix) + + table_structure[key] = table_structure[suffix] + + table_enum[key] = table_enum[suffix] + + format = { + 'table': sql.Identifier(table), + } + + if table not in table_structure: + raise Exception('Unsupported table: %s' % table) + + if table in table_enum: + for name in table_enum[table]: + if not self.enum_exists(name): + self.create_enum(name, enum_type[name]) + + sqlstr = r'CREATE TABLE {table} (' + + def add_column(i, column): + sqlstr = '' + + if 'include' in column: + sqlstr += add_column( + i, + table_column[column['include']], + ) + elif 'include_group' in column: + for c in table_column_group[column['include_group']]: + sqlstr += add_column( + i, + c, + ) + else: + if i > 0: + sqlstr += ', ' + + sqlstr += '%(name)s %(type)s' % column + + if( + column['null'] is False and # noqa: W504 + column['type'] != 'serial' + ): + sqlstr += ' NOT NULL' + + i += 1 + + return sqlstr + + i = 0 + for column in table_structure[table]['columns']: + sqlstr += add_column(i, column) + + i += 1 + + if 'primary_key' in table_structure[table]: + sqlstr += ', PRIMARY KEY ({primary_key})' + + key = 'primary_key' + + format[key] = sql.Identifier(table_structure[table][key]) + + sqlstr += r')' + + log.debug("sqlstr: %s" % sqlstr) + log.debug("format: %s" % format) + + query = sql.SQL(sqlstr).format(**format) + + log.debug("query: %s" % query) + + self.db.execute(query) + + def table_exists(self, table): + query = """ + SELECT tablename + FROM pg_tables + WHERE tablename = %(table)s + AND schemaname = %(schema)s + """ + param = { + 'table': table, + 'schema': self.schema, + } + + log.debug("query: %s" % query) + log.debug("param: %s" % pprint.pformat(param)) + + self.db.cursor.execute(query, param) + + if self.db.cursor.rowcount == 0: + return False + else: + return True + + def copy_from_csv(self, csv_file, table, fields, delimiter, null): + if not self.table_exists(table): + self.create_table(table) + elif self.truncate: + self.truncate_table(table) + + log.info("Copying records from CSV file: %s" % csv_file) + + format = { + 'table': sql.Identifier(table), + } + + sqlstr = r'COPY {table} (' + + i = 0 + for field in fields: + if i > 0: + sqlstr += ', ' + + column = 'col' + str(i) + + sqlstr += r'{' + column + r'}' + + format[column] = sql.Identifier(field) + + i += 1 + + sqlstr += ') FROM STDIN WITH' + sqlstr += ' DELIMITER {delimiter}' + sqlstr += ' NULL {null}' + sqlstr += ' CSV HEADER' + + format['delimiter'] = sql.Literal(delimiter) + format['null'] = sql.Literal(null) + + log.debug("sqlstr: %s" % sqlstr) + log.debug("format: %s" % format) + + query = sql.SQL(sqlstr).format(**format) + + log.debug("query: %s" % query) + + with open(csv_file, 'r') as f: + self.db.cursor.copy_expert( + query, + f, + ) + + def process_gml(self, gml, convert_to=None): + geom = ogr.CreateGeometryFromGML( + etree.tostring( + gml, + ).decode() + ) + + log.info("Processing GML: %s" % geom.GetGeometryName()) + + if geom.Is3D(): + log.debug("Geometry is 3D, flattening to 2D") + + geom.FlattenTo2D() + + if( + convert_to is not None and # noqa: W504 + convert_to == 'multipolygon' + ): + log.debug("Converting to MultiPolygon") + + geom = ogr.ForceToMultiPolygon(geom) + elif( + convert_to is not None and # noqa: W504 + convert_to == 'point' + ): + log.debug("Converting to Point") + + geom = geom.Centroid() + + wkb = geom.ExportToWkb().hex().upper() + + return wkb + + def process_levering_xml(self, tree, root, xml_file): + log.info("Processing: BAG Extract Levering") + + table = 'nlx_bag_info' + + elements = root.xpath( + ( + '.' + '/xb:SelectieGegevens' + '/selecties-extract:LVC-Extract' + '/selecties-extract:StandTechnischeDatum' + ), + namespaces=xmlns, + ) + + if len(elements) == 0: + raise Exception("Failed to find StandTechnischeDatum element") + + extract_datum = elements[0].text + + record = { + 'sleutel': 'extract_datum', + 'waarde': extract_datum, + } + + key = 'sleutel' + + self.upsert_record( + table, + record, + identifiers={ + key: record[key], + } + ) + + with open(xml_file, 'rt') as f: + data = f.read() + + record = { + 'sleutel': 'levering_xml', + 'waarde': data, + } + + self.upsert_record( + table, + record, + identifiers={ + key: record[key], + } + ) + + self.db.commit(close=False) + + def process_gwr_element( + self, + element, + row, + name, + key, + xpath, + required=True, + ): + results = element.xpath( + xpath, + namespaces=xmlns, + ) + + if len(results) == 0 and required is True: + raise Exception( + 'Failed to find %s elements' % name + ) + elif len(results) != 0: + row[key] = results[0].text + + return row + + def process_gwr_xml(self, tree, root, xml_file): + log.info("Processing: Gemeente Woonplaats Relatie") + + table = 'gemeente_woonplaats' + + elements = root.xpath( + ( + '.' + '/gwr-bestand:Product' + '/gwr-product:GemeenteWoonplaatsRelatieProduct' + '/gwr-product:GemeenteWoonplaatsRelatie' + ), + namespaces=xmlns, + ) + + if len(elements) == 0: + raise Exception( + 'Failed to find GemeenteWoonplaatsRelatie elements' + ) + + csv_file = os.path.join(self.temp_dir, 'gemeente_woonplaats.csv') + + fields = [ + 'begingeldigheid', + 'eindgeldigheid', + 'woonplaatscode', + 'gemeentecode', + 'status', + ] + + delimiter = r';' + null = r'\N' + + log.info("Writing CSV file: %s" % csv_file) + + with open(csv_file, 'w', newline='') as f: + writer = csv.DictWriter( + f, + fieldnames=fields, + delimiter=delimiter, + lineterminator='\n', + ) + + writer.writeheader() + + for element in elements: + row = { + 'begingeldigheid': '', + 'eindgeldigheid': null, + 'woonplaatscode': '', + 'gemeentecode': '', + 'gemeentecode': '', + } + + # begindatumTijdvakGeldigheid + xpath = ( + '.' + '/gwr-product:tijdvakgeldigheid' + '/bagtypes:begindatumTijdvakGeldigheid' + ) + + row = self.process_gwr_element( + element, + row, + 'begindatumTijdvakGeldigheid', + 'begingeldigheid', + xpath, + ) + + # einddatumTijdvakGeldigheid + xpath = ( + '.' + '/gwr-product:tijdvakgeldigheid' + '/bagtypes:einddatumTijdvakGeldigheid' + ) + + row = self.process_gwr_element( + element, + row, + 'einddatumTijdvakGeldigheid', + 'eindgeldigheid', + xpath, + required=False, + ) + + # gerelateerdeWoonplaats + xpath = ( + '.' + '/gwr-product:gerelateerdeWoonplaats' + '/gwr-product:identificatie' + ) + + row = self.process_gwr_element( + element, + row, + 'gerelateerdeWoonplaats', + 'woonplaatscode', + xpath, + ) + + # gerelateerdeGemeente + xpath = ( + '.' + '/gwr-product:gerelateerdeGemeente' + '/gwr-product:identificatie' + ) + + row = self.process_gwr_element( + element, + row, + 'gerelateerdeGemeente', + 'gemeentecode', + xpath, + ) + + # status + xpath = ( + '.' + '/gwr-product:status' + ) + + row = self.process_gwr_element( + element, + row, + 'status', + 'status', + xpath, + ) + + writer.writerow(row) + + self.copy_from_csv(csv_file, table, fields, delimiter, null) + + BAGUtil.remove_temp_file(csv_file) + + self.db.commit(close=False) + + def process_in_onderzoek_element( + self, + object_type, + object_kenmerk_element, + element, + row, + name, + key=None, + xpath=None, + required=True, + ): + if key is None: + key = name.lower() + + if xpath is None: + xpath = ( + '.' + '/KenmerkInOnderzoek:%s' + '/KenmerkInOnderzoek:%s' + ) % ( + object_kenmerk_element[object_type], + name, + ) + + results = element.xpath( + xpath, + namespaces=xmlns, + ) + + if len(results) == 0 and required is True: + raise Exception( + 'Failed to find %s elements' % name + ) + elif len(results) != 0: + row[key] = results[0].text + + return row + + def process_in_onderzoek_historie_element( + self, + object_type, + object_kenmerk_element, + element, + row, + name, + key=None, + xpath=None, + required=True, + ): + if xpath is None: + xpath = ( + '.' + '/KenmerkInOnderzoek:%s' + '/KenmerkInOnderzoek:historieInOnderzoek' + '/Historie:HistorieInOnderzoek' + '/Historie:%s' + ) % ( + object_kenmerk_element[object_type], + name, + ) + + return self.process_in_onderzoek_element( + object_type, + object_kenmerk_element, + element, + row, + name, + key, + xpath, + required, + ) + + def process_in_onderzoek_beschikbaarlv_element( + self, + object_type, + object_kenmerk_element, + element, + row, + name, + key=None, + xpath=None, + required=True, + ): + if xpath is None: + xpath = ( + '.' + '/KenmerkInOnderzoek:%s' + '/KenmerkInOnderzoek:historieInOnderzoek' + '/Historie:HistorieInOnderzoek' + '/Historie:BeschikbaarLVInOnderzoek' + '/Historie:%s' + ) % ( + object_kenmerk_element[object_type], + name, + ) + + return self.process_in_onderzoek_element( + object_type, + object_kenmerk_element, + element, + row, + name, + key, + xpath, + required, + ) + + def process_in_onderzoek_xml( + self, + tree, + root, + status_type, + object_type, + csv_file, + delimiter, + null, + ): + object_kenmerk_element = { + 'LIG': 'KenmerkLigplaatsInOnderzoek', + 'NUM': 'KenmerkNummeraanduidingInOnderzoek', + 'OPR': 'KenmerkOpenbareruimteInOnderzoek', + 'PND': 'KenmerkPandInOnderzoek', + 'STA': 'KenmerkStandplaatsInOnderzoek', + 'VBO': 'KenmerkVerblijfsobjectInOnderzoek', + 'WPL': 'KenmerkWoonplaatsInOnderzoek', + } + + object_identificatie_element = { + 'LIG': 'identificatieVanLigplaats', + 'NUM': 'identificatieVanNummeraanduiding', + 'OPR': 'identificatieVanOpenbareruimte', + 'PND': 'identificatieVanPand', + 'STA': 'identificatieVanStandplaats', + 'VBO': 'identificatieVanVerblijfsobject', + 'WPL': 'identificatieVanWoonplaats', + } + + elements = root.xpath( + ( + '.' + '/sl:standBestand' + '/sl:stand' + '/sl-bag-extract:kenmerkInOnderzoek' + ), + namespaces=xmlns, + ) + + if len(elements) == 0: + log.warning('No kenmerkInOnderzoek elements found') + + return + + fields = [ + 'kenmerk', + 'identificatie', + 'inonderzoek', + 'documentdatum', + 'documentnummer', + 'tijdstipregistratie', + 'eindregistratie', + 'begingeldigheid', + 'eindgeldigheid', + 'tijdstipregistratielv', + 'tijdstipeindregistratielv', + ] + + log.info("Writing CSV file: %s" % csv_file) + + with open(csv_file, 'w', newline='') as f: + writer = csv.DictWriter( + f, + fieldnames=fields, + delimiter=delimiter, + lineterminator='\n', + ) + + writer.writeheader() + + for element in elements: + row = { + 'kenmerk': '', + 'identificatie': '', + 'inonderzoek': '', + 'documentdatum': '', + 'documentnummer': '', + 'tijdstipregistratie': '', + 'eindregistratie': null, + 'begingeldigheid': '', + 'eindgeldigheid': null, + 'tijdstipregistratielv': '', + 'tijdstipeindregistratielv': null, + } + + # kenmerk + row = self.process_in_onderzoek_element( + object_type, + object_kenmerk_element, + element, + row, + 'kenmerk', + ) + + # identificatie + row = self.process_in_onderzoek_element( + object_type, + object_kenmerk_element, + element, + row, + object_identificatie_element[object_type], + 'identificatie', + ) + + # inOnderzoek + row = self.process_in_onderzoek_element( + object_type, + object_kenmerk_element, + element, + row, + 'inOnderzoek', + ) + + if row['inonderzoek'] == 'J': + row['inonderzoek'] = True + else: + row['inonderzoek'] = False + + # documentdatum + row = self.process_in_onderzoek_element( + object_type, + object_kenmerk_element, + element, + row, + 'documentdatum', + ) + + # documentnummer + row = self.process_in_onderzoek_element( + object_type, + object_kenmerk_element, + element, + row, + 'documentnummer', + ) + + # tijdstipRegistratie + row = self.process_in_onderzoek_historie_element( + object_type, + object_kenmerk_element, + element, + row, + 'tijdstipRegistratie', + ) + + # eindRegistratie + row = self.process_in_onderzoek_historie_element( + object_type, + object_kenmerk_element, + element, + row, + 'eindRegistratie', + required=False, + ) + + # beginGeldigheid + row = self.process_in_onderzoek_historie_element( + object_type, + object_kenmerk_element, + element, + row, + 'beginGeldigheid', + ) + + # eindGeldigheid + row = self.process_in_onderzoek_historie_element( + object_type, + object_kenmerk_element, + element, + row, + 'eindGeldigheid', + required=False, + ) + + # tijdstipRegistratieLV + row = self.process_in_onderzoek_beschikbaarlv_element( + object_type, + object_kenmerk_element, + element, + row, + 'tijdstipRegistratieLV', + ) + + # tijdstipEindRegistratieLV + row = self.process_in_onderzoek_beschikbaarlv_element( + object_type, + object_kenmerk_element, + element, + row, + 'tijdstipEindRegistratieLV', + required=False, + ) + + writer.writerow(row) + + return fields + + def process_bag_object_subelement( + self, + object_type, + object_element, + element, + row, + name, + key=None, + xpath=None, + required=True, + gml=False, + convert_to=None, + array=False, + ): + if key is None: + key = name.lower() + + if xpath is None: + xpath = ( + '.' + '/Objecten:%s' + '/Objecten:%s' + ) % ( + object_element[object_type], + name, + ) + + results = element.xpath( + xpath, + namespaces=xmlns, + ) + + if len(results) == 0 and required is True: + raise Exception( + 'Failed to find %s elements' % name + ) + elif len(results) == 1 and array is False: + if gml is True: + row[key] = self.process_gml( + results[0], + convert_to, + ) + else: + row[key] = results[0].text + elif len(results) >= 1 and array is True: + value = '{' + + i = 0 + for result in results: + if i > 0: + value += ',' + + value += result.text + + i += 1 + + value += '}' + + row[key] = value + + return row + + def process_bag_object_voorkomen_subelement( + self, + object_type, + object_element, + element, + row, + name, + key=None, + xpath=None, + required=True, + gml=False, + convert_to=None, + array=False, + ): + if xpath is None: + xpath = ( + '.' + '/Objecten:%s' + '/Objecten:voorkomen' + '/Historie:Voorkomen' + '/Historie:%s' + ) % ( + object_element[object_type], + name, + ) + + return self.process_bag_object_subelement( + object_type, + object_element, + element, + row, + name, + key, + xpath, + required, + ) + + def process_bag_object_beschikbaarlv_subelement( + self, + object_type, + object_element, + element, + row, + name, + key=None, + xpath=None, + required=True, + gml=False, + convert_to=None, + array=False, + ): + if xpath is None: + xpath = ( + '.' + '/Objecten:%s' + '/Objecten:voorkomen' + '/Historie:Voorkomen' + '/Historie:BeschikbaarLV' + '/Historie:%s' + ) % ( + object_element[object_type], + name, + ) + + return self.process_bag_object_subelement( + object_type, + object_element, + element, + row, + name, + key, + xpath, + required, + ) + + def process_bag_object_element( + self, + object_type, + object_element, + element, + row, + ): + # identificatie + row = self.process_bag_object_subelement( + object_type, + object_element, + element, + row, + 'identificatie', + ) + + # naam + if object_type in [ + 'WPL', + 'OPR', + ]: + row = self.process_bag_object_subelement( + object_type, + object_element, + element, + row, + 'naam', + ) + + if object_type == 'OPR': + # type + row = self.process_bag_object_subelement( + object_type, + object_element, + element, + row, + 'type', + ) + + # huisnummer + # huisletter + # huisnummertoevoeging + # postcode + # typeAdresseerbaarObject + if object_type == 'NUM': + row = self.process_bag_object_subelement( + object_type, + object_element, + element, + row, + 'huisnummer', + ) + + row = self.process_bag_object_subelement( + object_type, + object_element, + element, + row, + 'huisletter', + required=False, + ) + + row = self.process_bag_object_subelement( + object_type, + object_element, + element, + row, + 'huisnummertoevoeging', + required=False, + ) + + row = self.process_bag_object_subelement( + object_type, + object_element, + element, + row, + 'postcode', + required=False, + ) + + row = self.process_bag_object_subelement( + object_type, + object_element, + element, + row, + 'typeAdresseerbaarObject', + ) + + # geometrie + if object_type == 'WPL': + xpath = ( + '.' + '/Objecten:%s' + '/Objecten:geometrie' + '/Objecten:vlak' + '/*' + ' | ' + '.' + '/Objecten:%s' + '/Objecten:geometrie' + '/Objecten:multivlak' + '/*' + ) % ( + object_element[object_type], + object_element[object_type], + ) + + row = self.process_bag_object_subelement( + object_type, + object_element, + element, + row, + 'geometrie', + 'wkb_geometry', + xpath, + gml=True, + convert_to='multipolygon', + ) + elif object_type == 'VBO': + xpath = ( + '.' + '/Objecten:%s' + '/Objecten:geometrie' + '/Objecten:punt' + '/*' + ' | ' + '.' + '/Objecten:%s' + '/Objecten:geometrie' + '/Objecten:vlak' + '/*' + ) % ( + object_element[object_type], + object_element[object_type], + ) + + row = self.process_bag_object_subelement( + object_type, + object_element, + element, + row, + 'geometrie', + 'wkb_geometry', + xpath, + gml=True, + convert_to='point', + ) + elif object_type in [ + 'LIG', + 'STA', + 'PND', + ]: + xpath = ( + '.' + '/Objecten:%s' + '/Objecten:geometrie' + '/*' + ) % ( + object_element[object_type], + ) + + row = self.process_bag_object_subelement( + object_type, + object_element, + element, + row, + 'geometrie', + 'wkb_geometry', + xpath, + gml=True, + ) + + # gebruiksdoel + # oppervlakte + if object_type == 'VBO': + row = self.process_bag_object_subelement( + object_type, + object_element, + element, + row, + 'gebruiksdoel', + array=True, + ) + + row = self.process_bag_object_subelement( + object_type, + object_element, + element, + row, + 'oppervlakte', + ) + + # oorspronkelijkBouwjaar + if object_type == 'PND': + row = self.process_bag_object_subelement( + object_type, + object_element, + element, + row, + 'oorspronkelijkBouwjaar', + ) + + # status + row = self.process_bag_object_subelement( + object_type, + object_element, + element, + row, + 'status', + ) + + # geconstateerd + row = self.process_bag_object_subelement( + object_type, + object_element, + element, + row, + 'geconstateerd', + ) + + if row['geconstateerd'] == 'J': + row['geconstateerd'] = True + else: + row['geconstateerd'] = False + + # documentdatum + row = self.process_bag_object_subelement( + object_type, + object_element, + element, + row, + 'documentdatum', + ) + + # documentnummer + row = self.process_bag_object_subelement( + object_type, + object_element, + element, + row, + 'documentnummer', + ) + + # ligtIn + if object_type in [ + 'OPR', + 'NUM', + ]: + if object_type == 'OPR': + required = True + elif object_type == 'NUM': + required = False + + xpath = ( + '.' + '/Objecten:%s' + '/Objecten:ligtIn' + '/Objecten-ref:WoonplaatsRef' + ) % ( + object_element[object_type], + ) + + row = self.process_bag_object_subelement( + object_type, + object_element, + element, + row, + 'WoonplaatsRef', + xpath=xpath, + required=required + ) + + # maaktDeelUitVan + if object_type == 'VBO': + xpath = ( + '.' + '/Objecten:%s' + '/Objecten:maaktDeelUitVan' + '/Objecten-ref:PandRef' + ) % ( + object_element[object_type], + ) + + row = self.process_bag_object_subelement( + object_type, + object_element, + element, + row, + 'PandRef', + xpath=xpath, + array=True, + ) + + # heeftAlsHoofdadres + # heeftAlsNevenadres + if object_type in [ + 'VBO', + 'LIG', + 'STA', + ]: + xpath = ( + '.' + '/Objecten:%s' + '/Objecten:heeftAlsHoofdadres' + '/Objecten-ref:NummeraanduidingRef' + ) % ( + object_element[object_type], + ) + + row = self.process_bag_object_subelement( + object_type, + object_element, + element, + row, + 'heeftAlsHoofdadres', + 'hoofdadresnummeraanduidingref', + xpath=xpath, + ) + + xpath = ( + '.' + '/Objecten:%s' + '/Objecten:heeftAlsNevenadres' + '/Objecten-ref:NummeraanduidingRef' + ) % ( + object_element[object_type], + ) + + row = self.process_bag_object_subelement( + object_type, + object_element, + element, + row, + 'heeftAlsNevenadres', + 'nevenadresnummeraanduidingref', + xpath=xpath, + required=False, + array=True, + ) + + # voorkomenidentificatie + row = self.process_bag_object_voorkomen_subelement( + object_type, + object_element, + element, + row, + 'voorkomenidentificatie', + ) + + # beginGeldigheid + row = self.process_bag_object_voorkomen_subelement( + object_type, + object_element, + element, + row, + 'beginGeldigheid', + ) + + # eindGeldigheid + row = self.process_bag_object_voorkomen_subelement( + object_type, + object_element, + element, + row, + 'eindGeldigheid', + required=False, + ) + + # tijdstipRegistratie + row = self.process_bag_object_voorkomen_subelement( + object_type, + object_element, + element, + row, + 'tijdstipRegistratie', + ) + + # eindRegistratie + row = self.process_bag_object_voorkomen_subelement( + object_type, + object_element, + element, + row, + 'eindRegistratie', + required=False, + ) + + # tijdstipInactief + row = self.process_bag_object_voorkomen_subelement( + object_type, + object_element, + element, + row, + 'tijdstipInactief', + required=False, + ) + + # tijdstipRegistratieLV + row = self.process_bag_object_beschikbaarlv_subelement( + object_type, + object_element, + element, + row, + 'tijdstipRegistratieLV', + ) + + # tijdstipEindRegistratieLV + row = self.process_bag_object_beschikbaarlv_subelement( + object_type, + object_element, + element, + row, + 'tijdstipEindRegistratieLV', + required=False, + ) + + # tijdstipInactiefLV + row = self.process_bag_object_beschikbaarlv_subelement( + object_type, + object_element, + element, + row, + 'tijdstipInactiefLV', + required=False, + ) + + # tijdstipNietBagLV + row = self.process_bag_object_beschikbaarlv_subelement( + object_type, + object_element, + element, + row, + 'tijdstipNietBagLV', + required=False, + ) + + # verkorteNaam + if object_type == 'OPR': + row = self.process_bag_object_subelement( + object_type, + object_element, + element, + row, + 'verkorteNaam', + required=False, + ) + + # ligtAan + if object_type == 'NUM': + xpath = ( + '.' + '/Objecten:%s' + '/Objecten:ligtAan' + '/Objecten-ref:OpenbareRuimteRef' + ) % ( + object_element[object_type], + ) + + row = self.process_bag_object_subelement( + object_type, + object_element, + element, + row, + 'OpenbareRuimteRef', + xpath=xpath, + ) + + return row + + def process_bag_object_xml( + self, + tree, + root, + status_type, + object_type, + csv_file, + delimiter, + null, + ): + object_element = { + 'LIG': 'Ligplaats', + 'NUM': 'Nummeraanduiding', + 'OPR': 'OpenbareRuimte', + 'PND': 'Pand', + 'STA': 'Standplaats', + 'VBO': 'Verblijfsobject', + 'WPL': 'Woonplaats', + } + + elements = root.xpath( + ( + '.' + '/sl:standBestand' + '/sl:stand' + '/sl-bag-extract:bagObject' + ), + namespaces=xmlns, + ) + + if len(elements) == 0: + log.warning('No bagObject elements found') + + return + + if object_type == 'WPL': + fields = [ + 'identificatie', + 'naam', + 'wkb_geometry', + 'status', + 'geconstateerd', + 'documentdatum', + 'documentnummer', + 'voorkomenidentificatie', + 'begingeldigheid', + 'eindgeldigheid', + 'tijdstipregistratie', + 'eindregistratie', + 'tijdstipinactief', + 'tijdstipregistratielv', + 'tijdstipeindregistratielv', + 'tijdstipinactieflv', + 'tijdstipnietbaglv', + ] + elif object_type == 'OPR': + fields = [ + 'identificatie', + 'naam', + 'type', + 'status', + 'geconstateerd', + 'documentdatum', + 'documentnummer', + 'woonplaatsref', + 'voorkomenidentificatie', + 'begingeldigheid', + 'eindgeldigheid', + 'tijdstipregistratie', + 'eindregistratie', + 'tijdstipinactief', + 'tijdstipregistratielv', + 'tijdstipeindregistratielv', + 'tijdstipinactieflv', + 'tijdstipnietbaglv', + 'verkortenaam', + ] + elif object_type == 'NUM': + fields = [ + 'identificatie', + 'huisnummer', + 'huisletter', + 'huisnummertoevoeging', + 'postcode', + 'typeadresseerbaarobject', + 'status', + 'geconstateerd', + 'documentdatum', + 'documentnummer', + 'woonplaatsref', + 'voorkomenidentificatie', + 'begingeldigheid', + 'eindgeldigheid', + 'tijdstipregistratie', + 'eindregistratie', + 'tijdstipinactief', + 'tijdstipregistratielv', + 'tijdstipeindregistratielv', + 'tijdstipinactieflv', + 'tijdstipnietbaglv', + 'openbareruimteref', + ] + elif object_type == 'VBO': + fields = [ + 'identificatie', + 'wkb_geometry', + 'gebruiksdoel', + 'oppervlakte', + 'status', + 'geconstateerd', + 'documentdatum', + 'documentnummer', + 'pandref', + 'hoofdadresnummeraanduidingref', + 'nevenadresnummeraanduidingref', + 'voorkomenidentificatie', + 'begingeldigheid', + 'eindgeldigheid', + 'tijdstipregistratie', + 'eindregistratie', + 'tijdstipinactief', + 'tijdstipregistratielv', + 'tijdstipeindregistratielv', + 'tijdstipinactieflv', + 'tijdstipnietbaglv', + ] + elif object_type == 'LIG': + fields = [ + 'identificatie', + 'status', + 'wkb_geometry', + 'geconstateerd', + 'documentdatum', + 'documentnummer', + 'hoofdadresnummeraanduidingref', + 'nevenadresnummeraanduidingref', + 'voorkomenidentificatie', + 'begingeldigheid', + 'eindgeldigheid', + 'tijdstipregistratie', + 'eindregistratie', + 'tijdstipinactief', + 'tijdstipregistratielv', + 'tijdstipeindregistratielv', + 'tijdstipinactieflv', + 'tijdstipnietbaglv', + ] + elif object_type == 'STA': + fields = [ + 'identificatie', + 'status', + 'wkb_geometry', + 'geconstateerd', + 'documentdatum', + 'documentnummer', + 'hoofdadresnummeraanduidingref', + 'nevenadresnummeraanduidingref', + 'voorkomenidentificatie', + 'begingeldigheid', + 'eindgeldigheid', + 'tijdstipregistratie', + 'eindregistratie', + 'tijdstipinactief', + 'tijdstipregistratielv', + 'tijdstipeindregistratielv', + 'tijdstipinactieflv', + 'tijdstipnietbaglv', + ] + elif object_type == 'PND': + fields = [ + 'identificatie', + 'wkb_geometry', + 'oorspronkelijkbouwjaar', + 'status', + 'geconstateerd', + 'documentdatum', + 'documentnummer', + 'voorkomenidentificatie', + 'begingeldigheid', + 'eindgeldigheid', + 'tijdstipregistratie', + 'eindregistratie', + 'tijdstipinactief', + 'tijdstipregistratielv', + 'tijdstipeindregistratielv', + 'tijdstipinactieflv', + 'tijdstipnietbaglv', + ] + + log.info("Writing CSV file: %s" % csv_file) + + with open(csv_file, 'w', newline='') as f: + writer = csv.DictWriter( + f, + fieldnames=fields, + delimiter=delimiter, + lineterminator='\n', + ) + + writer.writeheader() + + for element in elements: + row = { + 'identificatie': '', + 'status': '', + 'geconstateerd': '', + 'documentdatum': '', + 'documentnummer': '', + 'voorkomenidentificatie': '', + 'begingeldigheid': '', + 'eindgeldigheid': null, + 'tijdstipregistratie': '', + 'eindregistratie': null, + 'tijdstipinactief': null, + 'tijdstipregistratielv': '', + 'tijdstipeindregistratielv': null, + 'tijdstipinactieflv': null, + 'tijdstipnietbaglv': null, + } + + if object_type == 'WPL': + row['naam'] = '' + row['wkb_geometry'] = '' + elif object_type == 'OPR': + row['naam'] = '' + row['type'] = '' + row['woonplaatsref'] = '' + row['verkortenaam'] = null + elif object_type == 'NUM': + row['huisnummer'] = '' + row['huisletter'] = null + row['huisnummertoevoeging'] = null + row['postcode'] = null + row['typeadresseerbaarobject'] = '' + row['woonplaatsref'] = null + row['openbareruimteref'] = '' + elif object_type == 'VBO': + row['wkb_geometry'] = '' + row['gebruiksdoel'] = [] + row['oppervlakte'] = '' + row['pandref'] = [] + row['hoofdadresnummeraanduidingref'] = '' + row['nevenadresnummeraanduidingref'] = null + elif object_type == 'LIG': + row['wkb_geometry'] = '' + row['hoofdadresnummeraanduidingref'] = '' + row['nevenadresnummeraanduidingref'] = null + elif object_type == 'STA': + row['wkb_geometry'] = '' + row['hoofdadresnummeraanduidingref'] = '' + row['nevenadresnummeraanduidingref'] = null + elif object_type == 'PND': + row['wkb_geometry'] = '' + row['oorspronkelijkbouwjaar'] = '' + + row = self.process_bag_object_element( + object_type, + object_element, + element, + row, + ) + + writer.writerow(row) + + return fields + + def process_stand_xml(self, tree, root, xml_file): + log.info("Processing: BAG Stand") + + filename = os.path.basename(xml_file) + + match = re.search( + r'^\d{4}(IA|IO|NB|)(LIG|NUM|OPR|PND|STA|VBO|WPL)\d{8}\-\d{6}\.xml$', + filename, + ) + + if not match: + raise Exception('Failed to parse filename: %s' % filename) + + ( + status_type, + object_type, + ) = match.groups() + + log.debug("status_type: %s" % status_type) + log.debug("object_type: %s" % object_type) + + object_table = { + 'LIG': 'ligplaats', + 'NUM': 'nummeraanduiding', + 'OPR': 'openbareruimte', + 'PND': 'pand', + 'STA': 'standplaats', + 'VBO': 'verblijfsobject', + 'WPL': 'woonplaats', + } + + if status_type == '': + # Normal objects + + table = object_table[object_type] + elif status_type == 'IA': + # Inactive (Inactief) + + table = 'inactief_' + object_table[object_type] + elif status_type == 'IO': + # Under investigation (InOnderzoek) + + table = 'inonderzoek_' + object_table[object_type] + elif status_type == 'NB': + # Not BAG (NietBag) + + table = 'nietbag_' + object_table[object_type] + else: + raise Exception("Unsupported status objects: %s (%s)" % (status_type, object_type)) + + csv_file = os.path.join(self.temp_dir, '%s.csv' % table) + + fields = [] + delimiter = r';' + null = r'\N' + + if status_type == 'IO': + fields = self.process_in_onderzoek_xml( + tree, + root, + status_type, + object_type, + csv_file, + delimiter, + null, + ) + else: + fields = self.process_bag_object_xml( + tree, + root, + status_type, + object_type, + csv_file, + delimiter, + null, + ) + + if fields is not None: + self.copy_from_csv(csv_file, table, fields, delimiter, null) + + BAGUtil.remove_temp_file(csv_file) + + self.db.commit(close=False) + + def process_xml_file(self, xml_file): + log.info("Processing XML file: %s" % xml_file) + + tree = etree.parse(xml_file) + + root = tree.getroot() + + log.debug("root: %s" % root.tag) + + if root.tag == '{%s}BAG-Extract-Levering' % xmlns['xb']: + self.process_levering_xml(tree, root, xml_file) + elif root.tag == '{%s}BAG-GWR-Deelbestand-LVC' % xmlns['gwr-bestand']: + self.process_gwr_xml(tree, root, xml_file) + elif root.tag == '{%s}bagStand' % xmlns['sl-bag-extract']: + self.process_stand_xml(tree, root, xml_file) + else: + log.warning("Skipping unsupported file: %s" % xml_file) + + BAGUtil.remove_temp_file(xml_file) + + def process_zip_file(self, zip_file): + extracted = BAGUtil.extract_zip_file(zip_file, self.temp_dir) + + BAGUtil.remove_temp_file(zip_file) + + for entry in extracted: + if( + os.path.isdir(entry) or # noqa: W504 + os.path.basename(entry).startswith('.') or # noqa: W504 + not os.path.exists(entry) + ): + log.info("Not processing: %s" % entry) + + if os.path.isdir(entry) and os.path.exists(entry): + BAGUtil.remove_temp_dir(entry) + elif os.path.exists(entry): + BAGUtil.remove_temp_file(entry) + + continue + + if entry.endswith('.xml'): + self.process_xml_file(entry) + elif entry.endswith('.zip'): + self.process_zip_file(entry) + else: + log.warning("Skipping unsupported file: %s" % entry) + + BAGUtil.remove_temp_file(entry) + + def process_input_zip_file(self, packet): + temp_file = os.path.join(self.temp_dir, packet.data['name']) + + log.info( + "Extracting %s from %s to %s" % ( + packet.data['name'], + packet.data['file_path'], + temp_file, + ) + ) + + with zipfile.ZipFile(packet.data['file_path']) as z: + with open(temp_file, 'wb') as f: + with z.open(packet.data['name']) as zf: + while True: + buffer = zf.read(self.buffer_size) + if not buffer: + break + f.write(buffer) + + return temp_file + + def process_xml_file_packet(self, packet): + temp_xml_file = self.process_input_zip_file(packet) + + self.process_xml_file(temp_xml_file) + + return packet + + def process_zip_file_packet(self, packet): + temp_zip_file = self.process_input_zip_file(packet) + + self.process_zip_file(temp_zip_file) + + return packet + + def write(self, packet): + if packet.data is None or len(packet.data) == 0: + return packet + + if( + ( + not self.process_inactief and # noqa: W504 + re.search( + r'^\d{4}Inactief\d{8}\.zip$', + packet.data['name'] + ) + ) or # noqa: W504 + ( + not self.process_in_onderzoek and # noqa: W504 + re.search( + r'^\d{4}InOnderzoek\d{8}\.zip$', + packet.data['name'] + ) + ) or # noqa: W504 + ( + not self.process_niet_bag and # noqa: W504 + re.search( + r'^\d{4}NietBag\d{8}\.zip$', + packet.data['name'] + ) + ) or # noqa: W504 + ( + not self.process_lig and # noqa: W504 + re.search( + r'^\d{4}LIG\d{8}\.zip$', + packet.data['name'] + ) + ) or # noqa: W504 + ( + not self.process_num and # noqa: W504 + re.search( + r'^\d{4}NUM\d{8}\.zip$', + packet.data['name'] + ) + ) or # noqa: W504 + ( + not self.process_opr and # noqa: W504 + re.search( + r'^\d{4}OPR\d{8}\.zip$', + packet.data['name'] + ) + ) or # noqa: W504 + ( + not self.process_pnd and # noqa: W504 + re.search( + r'^\d{4}PND\d{8}\.zip$', + packet.data['name'] + ) + ) or # noqa: W504 + ( + not self.process_sta and # noqa: W504 + re.search( + r'^\d{4}STA\d{8}\.zip$', + packet.data['name'] + ) + ) or # noqa: W504 + ( + not self.process_vbo and # noqa: W504 + re.search( + r'^\d{4}VBO\d{8}\.zip$', + packet.data['name'] + ) + ) or # noqa: W504 + ( + not self.process_wpl and # noqa: W504 + re.search( + r'^\d{4}WPL\d{8}\.zip$', + packet.data['name'] + ) + ) or # noqa: W504 + ( + not self.process_gwr and # noqa: W504 + re.search( + r'^GEM\-WPL\-RELATIE\-\d{8}\.zip$', + packet.data['name'] + ) + ) or # noqa: W504 + ( + not self.process_levering and # noqa: W504 + re.search( + r'^Leveringsdocument\-BAG\-Extract\.xml$', + packet.data['name'] + ) + ) + ): + log.info("Skipping processing of: %s" % packet.data['name']) + + return packet + + if packet.data['name'].endswith('.xml'): + return self.process_xml_file_packet(packet) + elif packet.data['name'].endswith('.zip'): + return self.process_zip_file_packet(packet) + else: + log.warning("Skipping unsupported file: %s" % packet.data['name']) + + return packet From 1d4c5a9a6207fb8494b7b8d9987457fac5bf1425 Mon Sep 17 00:00:00 2001 From: Bas Couwenberg Date: Tue, 19 Jul 2022 12:26:33 +0200 Subject: [PATCH 03/10] Add stetl.inputs.baginput.BAGInput class --- stetl/inputs/baginput.py | 131 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 131 insertions(+) create mode 100644 stetl/inputs/baginput.py diff --git a/stetl/inputs/baginput.py b/stetl/inputs/baginput.py new file mode 100644 index 0000000..7a95131 --- /dev/null +++ b/stetl/inputs/baginput.py @@ -0,0 +1,131 @@ +import os +import pprint + +from stetl.bagutil import BAGUtil +from stetl.component import Config +from stetl.input import Input +from stetl.util import Util +from stetl.packet import FORMAT + +log = Util.get_log('baginput') + + +class BAGInput(Input): + """ + Parse BAG 2.0 Extract ZIP files into a stream of records containing zipfile path and file names. + + produces=FORMAT.record + """ + + @Config(ptype=str, required=True, default=None) + def file_path(self): + """ + Path to BAG 2.0 Extact ZIP file (lvbag-extract-nl.zip) + """ + pass + + @Config(ptype=str, required=False, default='/tmp') + def temp_dir(self): + """ + Path for temporary directory + """ + pass + + def exit(self): + log.info('Exit: removing temp files') + + for entry in self.extracted: + if os.path.exists(entry): + BAGUtil.remove_temp_file(entry) + + def process_zip_file(self, zip_file): + zip_content = BAGUtil.zip_file_content(zip_file) + + contains_zip_file = False + + for entry in zip_content: + if entry.endswith('.zip'): + contains_zip_file = True + break + + if contains_zip_file: + extracted = BAGUtil.extract_zip_file(zip_file, self.temp_dir) + + for entry in extracted: + if( + os.path.isdir(entry) or # noqa: W504 + os.path.basename(entry).startswith('.') or # noqa: W504 + not os.path.exists(entry) + ): + log.info("Not processing: %s" % entry) + + if os.path.isdir(entry) and os.path.exists(entry): + BAGUtil.remove_temp_dir(entry) + elif os.path.exists(entry): + BAGUtil.remove_temp_file(entry) + + zip_content.remove(os.path.basename(entry)) + + continue + + if( + not entry.endswith('.xml') and # noqa: W504 + not entry.endswith('.zip') + ): + log.warning("Skipping unsupported file: %s" % entry) + + BAGUtil.remove_temp_file(entry) + + zip_content.remove(os.path.basename(entry)) + + self.extracted.append(entry) + + for entry in sorted(zip_content): + if( + entry.startswith('.') or # noqa: W504 + entry.startswith('_') + ): + continue + + if entry.endswith('.zip'): + self.process_zip_file( + os.path.join( + self.temp_dir, + entry, + ) + ) + elif entry.endswith('.xml'): + item = { + 'file_path': zip_file, + 'name': entry, + } + + self.file_list.append(item) + else: + log.warning("Ignoring entry: %s" % entry) + + def __init__(self, configdict, section, produces=FORMAT.record): + Input.__init__(self, configdict, section, produces) + + self.file_list = [] + self.extracted = [] + + self.process_zip_file(self.file_path) + + log.debug("file_list:\n%s" % pprint.pformat(self.file_list)) + + def read(self, packet): + if not len(self.file_list): + packet.set_end_of_stream() + + log.info("End of file list") + + return packet + + entry = self.file_list.pop(0) + + log.info("Read entry: %s" % entry) + + packet.data = entry + + return packet From 2dca882b5e580984b4e750d85c0ae3032d9e17b4 Mon Sep 17 00:00:00 2001 From: Bas Couwenberg Date: Tue, 19 Jul 2022 12:27:52 +0200 Subject: [PATCH 04/10] Add multiprocessing support to BAGInput BAGOutput classes --- stetl/bagutil.py | 20 +++ stetl/inputs/baginput.py | 147 ++++++++++++++++----- stetl/outputs/bagoutput.py | 259 +++++++++++++++++++++++-------------- 3 files changed, 296 insertions(+), 130 deletions(-) diff --git a/stetl/bagutil.py b/stetl/bagutil.py index 13d8112..827f8b7 100644 --- a/stetl/bagutil.py +++ b/stetl/bagutil.py @@ -13,6 +13,18 @@ class BAGUtil: Helper functions for BAG 2.0 Extract processing """ + @staticmethod + def zip_file_content(zip_file): + log.info("Retrieving content from: %s" % zip_file) + + zip_content = [] + + with zipfile.ZipFile(zip_file) as z: + for name in z.namelist(): + zip_content.append(name) + + return zip_content + @staticmethod def extract_zip_file(zip_file, temp_dir): extracted = [] @@ -35,6 +47,14 @@ def extract_zip_file(zip_file, temp_dir): return extracted + @staticmethod + def extract_from_zip_file(name, zip_file, temp_dir): + with zipfile.ZipFile(zip_file) as z: + if name not in z.namelist(): + raise Exception("Cannot extract file: %s" % name) + + return z.extract(name, path=temp_dir) + @staticmethod def remove_temp_file(temp_file): log.info("Removing temp file: %s" % temp_file) diff --git a/stetl/inputs/baginput.py b/stetl/inputs/baginput.py index 7a95131..7433982 100644 --- a/stetl/inputs/baginput.py +++ b/stetl/inputs/baginput.py @@ -1,5 +1,6 @@ import os import pprint +import re from stetl.bagutil import BAGUtil from stetl.component import Config @@ -31,6 +32,13 @@ def temp_dir(self): """ pass + @Config(ptype=bool, required=False, default=False) + def multiprocessing(self): + """ + Process multiple files in parallel + """ + pass + def exit(self): log.info('Exit: removing temp files') @@ -38,17 +46,21 @@ def exit(self): if os.path.exists(entry): BAGUtil.remove_temp_file(entry) - def process_zip_file(self, zip_file): + def process_zip_file(self, zip_file, initial=True): zip_content = BAGUtil.zip_file_content(zip_file) - contains_zip_file = False - - for entry in zip_content: - if entry.endswith('.zip'): - contains_zip_file = True - break - - if contains_zip_file: + if( + initial is True or # noqa: W504 + re.search( + r'^\d{4}(?:Inactief|InOnderzoek|NietBag)\d{8}\.zip$', + os.path.basename(zip_file), + ) + ): + extract_zip_file = True + else: + extract_zip_file = False + + if extract_zip_file: extracted = BAGUtil.extract_zip_file(zip_file, self.temp_dir) for entry in extracted: @@ -80,29 +92,85 @@ def process_zip_file(self, zip_file): self.extracted.append(entry) - for entry in sorted(zip_content): - if( - entry.startswith('.') or # noqa: W504 - entry.startswith('_') - ): - continue - - if entry.endswith('.zip'): - self.process_zip_file( - os.path.join( - self.temp_dir, - entry, + if self.multiprocessing: + if initial is True: + leverings_xml = 'Leveringsdocument-BAG-Extract.xml' + + if leverings_xml in zip_content: + self.file_list.append( + os.path.join( + self.temp_dir, + leverings_xml, + ) ) - ) - elif entry.endswith('.xml'): - item = { - 'file_path': zip_file, - 'name': entry, - } - - self.file_list.append(item) + + for entry in sorted(zip_content): + if( + re.search( + r'^\d{4}(?:LIG|NUM|OPR|PND|STA|VBO|WPL)\d{8}\.zip$', + entry, + ) or # noqa: W504 + re.search( + r'^GEM\-WPL\-RELATIE\-\d{8}\.zip$', + entry, + ) + ): + self.file_list.append( + os.path.join( + self.temp_dir, + entry, + ) + ) + + for entry in sorted(zip_content): + if re.search( + r'^\d{4}(?:Inactief|InOnderzoek|NietBag)\d{8}\.zip$', + entry, + ): + self.process_zip_file( + os.path.join( + self.temp_dir, + entry, + ), + initial=False, + ) else: - log.warning("Ignoring entry: %s" % entry) + for entry in sorted(zip_content): + if re.search( + r'^\d{4}(?:IA|IO|NB)(?:LIG|NUM|OPR|PND|STA|VBO|WPL)\d{8}\.zip$', + entry, + ): + self.file_list.append( + os.path.join( + self.temp_dir, + entry, + ) + ) + else: + for entry in zip_content: + if( + entry.startswith('.') or # noqa: W504 + entry.startswith('_') + ): + continue + + if entry.endswith('.zip'): + self.process_zip_file( + os.path.join( + self.temp_dir, + entry, + ), + initial=False, + ) + elif entry.endswith('.xml'): + item = { + 'file_path': zip_file, + 'name': entry, + } + + self.file_list.append(item) + else: + log.warning("Ignoring entry: %s" % entry) def __init__(self, configdict, section, produces=FORMAT.record): Input.__init__(self, configdict, section, produces) @@ -118,14 +186,25 @@ def read(self, packet): if not len(self.file_list): packet.set_end_of_stream() - log.info("End of file list") + log.info("Empty file list") return packet - entry = self.file_list.pop(0) + if self.multiprocessing: + file_list = self.file_list + + log.info("Read: file list") + + packet.data = { + 'file_list': file_list, + } + + self.file_list = [] + else: + entry = self.file_list.pop(0) - log.info("Read entry: %s" % entry) + log.info("Read entry: %s" % entry) - packet.data = entry + packet.data = entry return packet diff --git a/stetl/outputs/bagoutput.py b/stetl/outputs/bagoutput.py index 1a88f15..7e6ad7e 100644 --- a/stetl/outputs/bagoutput.py +++ b/stetl/outputs/bagoutput.py @@ -1,4 +1,5 @@ import csv +import multiprocessing import os import pprint import re @@ -212,26 +213,48 @@ def temp_dir(self): """ pass - @Config(ptype=int, default=(1024 * 1024 * 1024), required=False) + @Config(ptype=int, required=False, default=(1024 * 1024 * 1024)) def buffer_size(self): """ Buffer size for read buffer during extraction """ pass + @Config(ptype=bool, required=False, default=False) + def multiprocessing(self): + """ + Process multiple files in parallel + """ + pass + + @Config(ptype=int, required=False, default=(os.cpu_count() - 1)) + def workers(self): + """ + Number of parallel processing workers + """ + pass + def __init__(self, configdict, section, consumes=FORMAT.record): Output.__init__(self, configdict, section, consumes=consumes) self.db = None - def init(self): + def db_connect(self): log.info('Init: connect to DB') self.db = PostGIS(self.cfg.get_dict()) self.db.connect() - def exit(self): + def db_disconnect(self): log.info('Exit: disconnect from DB') self.db.disconnect() + def init(self): + if not self.multiprocessing: + self.db_connect() + + def exit(self): + if not self.multiprocessing: + self.db_disconnect() + def update_record(self, table, record, identifiers): sqlstr = r'UPDATE {table} SET' format = { @@ -2916,6 +2939,38 @@ def process_zip_file(self, zip_file): BAGUtil.remove_temp_file(entry) + def process_input_file(self, input_file): + if self.multiprocessing: + self.db_connect() + + if input_file.endswith('.xml'): + self.process_xml_file(input_file) + elif input_file.endswith('.zip'): + zip_content = BAGUtil.zip_file_content(input_file) + + for entry in zip_content: + if( + entry.startswith('.') or # noqa: W504 + entry.startswith('_') + ): + continue + + if entry.endswith('.xml'): + xml_file = BAGUtil.extract_from_zip_file( + entry, + input_file, + self.temp_dir, + ) + + self.process_xml_file(xml_file) + else: + log.warning("Ignoring entry: %s" % entry) + else: + log.warning("Skipping unsupported file: %s" % input_file) + + if self.multiprocessing: + self.db_disconnect() + def process_input_zip_file(self, packet): temp_file = os.path.join(self.temp_dir, packet.data['name']) @@ -2956,101 +3011,113 @@ def write(self, packet): if packet.data is None or len(packet.data) == 0: return packet - if( - ( - not self.process_inactief and # noqa: W504 - re.search( - r'^\d{4}Inactief\d{8}\.zip$', - packet.data['name'] - ) - ) or # noqa: W504 - ( - not self.process_in_onderzoek and # noqa: W504 - re.search( - r'^\d{4}InOnderzoek\d{8}\.zip$', - packet.data['name'] - ) - ) or # noqa: W504 - ( - not self.process_niet_bag and # noqa: W504 - re.search( - r'^\d{4}NietBag\d{8}\.zip$', - packet.data['name'] - ) - ) or # noqa: W504 - ( - not self.process_lig and # noqa: W504 - re.search( - r'^\d{4}LIG\d{8}\.zip$', - packet.data['name'] - ) - ) or # noqa: W504 - ( - not self.process_num and # noqa: W504 - re.search( - r'^\d{4}NUM\d{8}\.zip$', - packet.data['name'] + if 'file_list' in packet.data: + with multiprocessing.Pool( + processes=self.workers, + maxtasksperchild=1, + ) as p: + p.map( + self.process_input_file, + packet.data['file_list'], ) - ) or # noqa: W504 - ( - not self.process_opr and # noqa: W504 - re.search( - r'^\d{4}OPR\d{8}\.zip$', - packet.data['name'] - ) - ) or # noqa: W504 - ( - not self.process_pnd and # noqa: W504 - re.search( - r'^\d{4}PND\d{8}\.zip$', - packet.data['name'] - ) - ) or # noqa: W504 - ( - not self.process_sta and # noqa: W504 - re.search( - r'^\d{4}STA\d{8}\.zip$', - packet.data['name'] - ) - ) or # noqa: W504 - ( - not self.process_vbo and # noqa: W504 - re.search( - r'^\d{4}VBO\d{8}\.zip$', - packet.data['name'] - ) - ) or # noqa: W504 - ( - not self.process_wpl and # noqa: W504 - re.search( - r'^\d{4}WPL\d{8}\.zip$', - packet.data['name'] - ) - ) or # noqa: W504 - ( - not self.process_gwr and # noqa: W504 - re.search( - r'^GEM\-WPL\-RELATIE\-\d{8}\.zip$', - packet.data['name'] - ) - ) or # noqa: W504 - ( - not self.process_levering and # noqa: W504 - re.search( - r'^Leveringsdocument\-BAG\-Extract\.xml$', - packet.data['name'] - ) - ) - ): - log.info("Skipping processing of: %s" % packet.data['name']) return packet - - if packet.data['name'].endswith('.xml'): - return self.process_xml_file_packet(packet) - elif packet.data['name'].endswith('.zip'): - return self.process_zip_file_packet(packet) else: - log.warning("Skipping unsupported file: %s" % packet.data['name']) + if( + ( + not self.process_inactief and # noqa: W504 + re.search( + r'^\d{4}Inactief\d{8}\.zip$', + packet.data['name'] + ) + ) or # noqa: W504 + ( + not self.process_in_onderzoek and # noqa: W504 + re.search( + r'^\d{4}InOnderzoek\d{8}\.zip$', + packet.data['name'] + ) + ) or # noqa: W504 + ( + not self.process_niet_bag and # noqa: W504 + re.search( + r'^\d{4}NietBag\d{8}\.zip$', + packet.data['name'] + ) + ) or # noqa: W504 + ( + not self.process_lig and # noqa: W504 + re.search( + r'^\d{4}LIG\d{8}\.zip$', + packet.data['name'] + ) + ) or # noqa: W504 + ( + not self.process_num and # noqa: W504 + re.search( + r'^\d{4}NUM\d{8}\.zip$', + packet.data['name'] + ) + ) or # noqa: W504 + ( + not self.process_opr and # noqa: W504 + re.search( + r'^\d{4}OPR\d{8}\.zip$', + packet.data['name'] + ) + ) or # noqa: W504 + ( + not self.process_pnd and # noqa: W504 + re.search( + r'^\d{4}PND\d{8}\.zip$', + packet.data['name'] + ) + ) or # noqa: W504 + ( + not self.process_sta and # noqa: W504 + re.search( + r'^\d{4}STA\d{8}\.zip$', + packet.data['name'] + ) + ) or # noqa: W504 + ( + not self.process_vbo and # noqa: W504 + re.search( + r'^\d{4}VBO\d{8}\.zip$', + packet.data['name'] + ) + ) or # noqa: W504 + ( + not self.process_wpl and # noqa: W504 + re.search( + r'^\d{4}WPL\d{8}\.zip$', + packet.data['name'] + ) + ) or # noqa: W504 + ( + not self.process_gwr and # noqa: W504 + re.search( + r'^GEM\-WPL\-RELATIE\-\d{8}\.zip$', + packet.data['name'] + ) + ) or # noqa: W504 + ( + not self.process_levering and # noqa: W504 + re.search( + r'^Leveringsdocument\-BAG\-Extract\.xml$', + packet.data['name'] + ) + ) + ): + log.info("Skipping processing of: %s" % packet.data['name']) - return packet + return packet + + if packet.data['name'].endswith('.xml'): + return self.process_xml_file_packet(packet) + elif packet.data['name'].endswith('.zip'): + return self.process_zip_file_packet(packet) + else: + log.warning("Skipping unsupported file: %s" % packet.data['name']) + + return packet From 6b4d17921837cc71c614016664270d6bc1a1e0d4 Mon Sep 17 00:00:00 2001 From: Peter Saalbrink Date: Tue, 19 Jul 2022 12:45:48 +0200 Subject: [PATCH 05/10] style changes --- stetl/outputs/bagoutput.py | 75 +++++++++++++++++++------------------- 1 file changed, 37 insertions(+), 38 deletions(-) diff --git a/stetl/outputs/bagoutput.py b/stetl/outputs/bagoutput.py index 7e6ad7e..6d8399c 100644 --- a/stetl/outputs/bagoutput.py +++ b/stetl/outputs/bagoutput.py @@ -257,7 +257,7 @@ def exit(self): def update_record(self, table, record, identifiers): sqlstr = r'UPDATE {table} SET' - format = { + sqlfmt = { 'table': sql.Identifier(table), } param = [] @@ -274,7 +274,7 @@ def update_record(self, table, record, identifiers): sqlstr += r' {' + column + r'} = %s' - format[column] = sql.Identifier(key) + sqlfmt[column] = sql.Identifier(key) param.append(record[key]) i += 1 @@ -292,17 +292,17 @@ def update_record(self, table, record, identifiers): sqlstr += r'{' + column + r'} = %s' - format[column] = sql.Identifier(key) + sqlfmt[column] = sql.Identifier(key) param.append(identifiers[key]) i += 1 log.debug("sqlstr: %s" % sqlstr) - log.debug("format: %s" % format) + log.debug("sqlfmt: %s" % sqlfmt) - query = sql.SQL(sqlstr).format(**format) + query = sql.SQL(sqlstr).format(**sqlfmt) - log.debug("query: %s" % query) + log.debug("query: %s" % query.as_string(context=self.db.cursor)) log.debug("param: %s" % pprint.pformat(param)) rowcount = self.db.execute(query, param) @@ -312,7 +312,7 @@ def update_record(self, table, record, identifiers): def insert_record(self, table, record): sqlstr = r'INSERT INTO {table} (' - format = { + sqlfmt = { 'table': sql.Identifier(table), } values = r') VALUES (' @@ -329,7 +329,7 @@ def insert_record(self, table, record): sqlstr += r'{' + column + r'}' values += r'%s' - format[column] = sql.Identifier(key) + sqlfmt[column] = sql.Identifier(key) param.append(record[key]) i += 1 @@ -337,11 +337,11 @@ def insert_record(self, table, record): sqlstr += values + r')' log.debug("sqlstr: %s" % sqlstr) - log.debug("format: %s" % format) + log.debug("sqlfmt: %s" % sqlfmt) - query = sql.SQL(sqlstr).format(**format) + query = sql.SQL(sqlstr).format(**sqlfmt) - log.debug("query: %s" % query) + log.debug("query: %s" % query.as_string(context=self.db.cursor)) log.debug("param: %s" % pprint.pformat(param)) rowcount = self.db.execute(query, param) @@ -360,7 +360,7 @@ def upsert_record( if identifiers is not None: sqlstr = r'SELECT {primary_key} FROM {table} WHERE ' - format = { + sqlfmt = { 'primary_key': sql.Identifier(primary_key), 'table': sql.Identifier(table), } @@ -375,17 +375,17 @@ def upsert_record( sqlstr += r'{' + column + r'} = %s' - format[column] = sql.Identifier(key) + sqlfmt[column] = sql.Identifier(key) param.append(identifiers[key]) i += 1 log.debug("sqlstr: %s" % sqlstr) - log.debug("format: %s" % format) + log.debug("sqlfmt: %s" % sqlfmt) - query = sql.SQL(sqlstr).format(**format) + query = sql.SQL(sqlstr).format(**sqlfmt) - log.debug("query: %s" % query) + log.debug("query: %s" % query.as_string(context=self.db.cursor)) log.debug("param: %s" % pprint.pformat(param)) rowcount = self.db.execute(query, param) @@ -414,7 +414,7 @@ def upsert_record( def create_enum(self, name, values): log.info("Creating ENUM: %s" % name) - format = { + sqlfmt = { 'name': sql.Identifier(name.lower()), } @@ -432,11 +432,11 @@ def create_enum(self, name, values): sqlstr += ')' log.debug("sqlstr: %s" % sqlstr) - log.debug("format: %s" % format) + log.debug("sqlfmt: %s" % sqlfmt) - query = sql.SQL(sqlstr).format(**format) + query = sql.SQL(sqlstr).format(**sqlfmt) - log.debug("query: %s" % query) + log.debug("query: %s" % query.as_string(context=self.db.cursor)) log.debug("param: %s" % pprint.pformat(values)) self.db.cursor.execute(query, values) @@ -1245,7 +1245,7 @@ def create_table(self, table): table_enum[key] = table_enum[suffix] - format = { + sqlfmt = { 'table': sql.Identifier(table), } @@ -1300,16 +1300,16 @@ def add_column(i, column): key = 'primary_key' - format[key] = sql.Identifier(table_structure[table][key]) + sqlfmt[key] = sql.Identifier(table_structure[table][key]) sqlstr += r')' log.debug("sqlstr: %s" % sqlstr) - log.debug("format: %s" % format) + log.debug("sqlfmt: %s" % sqlfmt) - query = sql.SQL(sqlstr).format(**format) + query = sql.SQL(sqlstr).format(**sqlfmt) - log.debug("query: %s" % query) + log.debug("query: %s" % query.as_string(context=self.db.cursor)) self.db.execute(query) @@ -1343,7 +1343,7 @@ def copy_from_csv(self, csv_file, table, fields, delimiter, null): log.info("Copying records from CSV file: %s" % csv_file) - format = { + sqlfmt = { 'table': sql.Identifier(table), } @@ -1358,7 +1358,7 @@ def copy_from_csv(self, csv_file, table, fields, delimiter, null): sqlstr += r'{' + column + r'}' - format[column] = sql.Identifier(field) + sqlfmt[column] = sql.Identifier(field) i += 1 @@ -1367,15 +1367,15 @@ def copy_from_csv(self, csv_file, table, fields, delimiter, null): sqlstr += ' NULL {null}' sqlstr += ' CSV HEADER' - format['delimiter'] = sql.Literal(delimiter) - format['null'] = sql.Literal(null) + sqlfmt['delimiter'] = sql.Literal(delimiter) + sqlfmt['null'] = sql.Literal(null) log.debug("sqlstr: %s" % sqlstr) - log.debug("format: %s" % format) + log.debug("sqlfmt: %s" % sqlfmt) - query = sql.SQL(sqlstr).format(**format) + query = sql.SQL(sqlstr).format(**sqlfmt) - log.debug("query: %s" % query) + log.debug("query: %s" % query.as_string(context=self.db.cursor)) with open(csv_file, 'r') as f: self.db.cursor.copy_expert( @@ -1390,21 +1390,21 @@ def process_gml(self, gml, convert_to=None): ).decode() ) - log.info("Processing GML: %s" % geom.GetGeometryName()) + log.debug("Processing GML: %s" % geom.GetGeometryName()) if geom.Is3D(): log.debug("Geometry is 3D, flattening to 2D") geom.FlattenTo2D() - if( + if ( convert_to is not None and # noqa: W504 convert_to == 'multipolygon' ): log.debug("Converting to MultiPolygon") geom = ogr.ForceToMultiPolygon(geom) - elif( + elif ( convert_to is not None and # noqa: W504 convert_to == 'point' ): @@ -1543,7 +1543,6 @@ def process_gwr_xml(self, tree, root, xml_file): 'eindgeldigheid': null, 'woonplaatscode': '', 'gemeentecode': '', - 'gemeentecode': '', } # begindatumTijdvakGeldigheid @@ -2949,7 +2948,7 @@ def process_input_file(self, input_file): zip_content = BAGUtil.zip_file_content(input_file) for entry in zip_content: - if( + if ( entry.startswith('.') or # noqa: W504 entry.startswith('_') ): @@ -3023,7 +3022,7 @@ def write(self, packet): return packet else: - if( + if ( ( not self.process_inactief and # noqa: W504 re.search( From f227ac19eaa8a95486882b7ca0ed6b02e380bd30 Mon Sep 17 00:00:00 2001 From: Peter Saalbrink Date: Tue, 19 Jul 2022 12:47:20 +0200 Subject: [PATCH 06/10] update regex to include BAG Mutation files --- stetl/inputs/baginput.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/stetl/inputs/baginput.py b/stetl/inputs/baginput.py index 7433982..39ecc7f 100644 --- a/stetl/inputs/baginput.py +++ b/stetl/inputs/baginput.py @@ -107,7 +107,7 @@ def process_zip_file(self, zip_file, initial=True): for entry in sorted(zip_content): if( re.search( - r'^\d{4}(?:LIG|NUM|OPR|PND|STA|VBO|WPL)\d{8}\.zip$', + r'^\d{4}(MUT)\d{8}-\d{8}\.zip$|^\d{4}(?:LIG|NUM|OPR|PND|STA|VBO|WPL)\d{8}\.zip$', entry, ) or # noqa: W504 re.search( @@ -137,7 +137,7 @@ def process_zip_file(self, zip_file, initial=True): else: for entry in sorted(zip_content): if re.search( - r'^\d{4}(?:IA|IO|NB)(?:LIG|NUM|OPR|PND|STA|VBO|WPL)\d{8}\.zip$', + r'^\d{4}(?:IO|)(MUT)\d{8}-\d{8}\.zip$|^\d{4}(?:IA|IO|NB)(?:LIG|NUM|OPR|PND|STA|VBO|WPL)\d{8}\.zip$', entry, ): self.file_list.append( From 6e458e0a01c3742f2732e1badd9783c41db68510 Mon Sep 17 00:00:00 2001 From: Peter Saalbrink Date: Tue, 19 Jul 2022 12:48:12 +0200 Subject: [PATCH 07/10] update missing spatial reference --- stetl/outputs/bagoutput.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/stetl/outputs/bagoutput.py b/stetl/outputs/bagoutput.py index 6d8399c..51cad9c 100644 --- a/stetl/outputs/bagoutput.py +++ b/stetl/outputs/bagoutput.py @@ -6,7 +6,7 @@ import zipfile from lxml import etree -from osgeo import ogr +from osgeo import ogr, osr from psycopg2 import sql from stetl.bagutil import BAGUtil @@ -18,6 +18,9 @@ log = Util.get_log('bagoutput') +sr = osr.SpatialReference() +sr.ImportFromEPSG(28992) + xmlns = { 'xsi': ( 'http://www.w3.org/2001/XMLSchema-instance' @@ -1412,6 +1415,11 @@ def process_gml(self, gml, convert_to=None): geom = geom.Centroid() + if not geom.GetSpatialReference(): + log.debug("Converting SRID to 28992") + + geom.AssignSpatialReference(sr) + wkb = geom.ExportToWkb().hex().upper() return wkb From bddb852fe05003505ad3a38c83b9b783e70b785f Mon Sep 17 00:00:00 2001 From: Peter Saalbrink Date: Tue, 19 Jul 2022 12:52:22 +0200 Subject: [PATCH 08/10] add PostGIS.truncate_table --- stetl/outputs/bagoutput.py | 19 +++++-------------- stetl/outputs/dboutput.py | 11 +++++++++++ stetl/postgis.py | 15 +++++++++++++++ 3 files changed, 31 insertions(+), 14 deletions(-) diff --git a/stetl/outputs/bagoutput.py b/stetl/outputs/bagoutput.py index 51cad9c..b7ae9c4 100644 --- a/stetl/outputs/bagoutput.py +++ b/stetl/outputs/bagoutput.py @@ -465,17 +465,8 @@ def enum_exists(self, name): return True def truncate_table(self, table): - log.info("Truncating table: %s" % table) - - format = { - 'table': sql.Identifier(table), - } - - query = sql.SQL('TRUNCATE {table}').format(**format) - - log.debug("query: %s" % query) - - self.db.cursor.execute(query) + self.db.truncate_table(table) + self.db.commit(close=False) def create_table(self, table): log.info("Creating table: %s" % table) @@ -1338,10 +1329,10 @@ def table_exists(self, table): else: return True - def copy_from_csv(self, csv_file, table, fields, delimiter, null): + def copy_from_csv(self, csv_file, table, fields, delimiter, null, truncate=False): if not self.table_exists(table): self.create_table(table) - elif self.truncate: + elif truncate or self.truncate: self.truncate_table(table) log.info("Copying records from CSV file: %s" % csv_file) @@ -1630,7 +1621,7 @@ def process_gwr_xml(self, tree, root, xml_file): writer.writerow(row) - self.copy_from_csv(csv_file, table, fields, delimiter, null) + self.copy_from_csv(csv_file, table, fields, delimiter, null, truncate=True) BAGUtil.remove_temp_file(csv_file) diff --git a/stetl/outputs/dboutput.py b/stetl/outputs/dboutput.py index 9d0ae64..25e2764 100644 --- a/stetl/outputs/dboutput.py +++ b/stetl/outputs/dboutput.py @@ -121,6 +121,13 @@ def key(self): """ pass + @Config(ptype=bool, required=False, default=False) + def truncate(self): + """ + Truncate database tables + """ + pass + # End attribute config meta def __init__(self, configdict, section, consumes=FORMAT.record): @@ -198,6 +205,10 @@ def write(self, packet): if self.replace and self.key and not self.update_query: self.update_query = self.create_update_query(first_record) + # TRUNCATE TABLE + if self.truncate: + self.db.truncate_table(self.cfg.get('table')) + # Check if record is single (dict) or array (list of dict) if type(record) is dict: # Do insert with values from the single record diff --git a/stetl/postgis.py b/stetl/postgis.py index a826eae..fea00c3 100755 --- a/stetl/postgis.py +++ b/stetl/postgis.py @@ -9,6 +9,8 @@ try: import psycopg2 import psycopg2.extensions + import psycopg2.extras + from psycopg2 import sql as pg2sql except ImportError: log.error("cannot find package psycopg2 for Postgres client support, please install psycopg2 first!") # sys.exit(-1) @@ -136,6 +138,19 @@ def file_execute(self, sqlfile): self.log_action("file_execute", "n.v.t", "fout=%s" % str(e), True) log.warn("can't execute SQL script, error: %s" % (str(e))) + def truncate_table(self, table): + log.info("Truncating table: %s" % table) + + sqlfmt = { + 'table': pg2sql.Identifier(table), + } + + query = pg2sql.SQL('TRUNCATE {table}').format(**sqlfmt) + + log.debug("query: %s" % query.as_string(context=self.cursor)) + + self.cursor.execute(query) + def tx_execute(self, sql, parameters=None): self.e = None try: From ad4178a82bb9aed52f1937fbbf9043ca71573837 Mon Sep 17 00:00:00 2001 From: Peter Saalbrink Date: Tue, 19 Jul 2022 12:59:11 +0200 Subject: [PATCH 09/10] add PostGIS.executemany and PostGIS.execute_batch, and use keepalives for connection --- stetl/postgis.py | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/stetl/postgis.py b/stetl/postgis.py index fea00c3..6a5d3f9 100755 --- a/stetl/postgis.py +++ b/stetl/postgis.py @@ -43,7 +43,13 @@ def connect(self, initdb=False): self.config.get('port', '5432')) log.info('Connecting to %s' % conn_str) conn_str += ' password=%s' % self.config['password'] - self.connection = psycopg2.connect(conn_str) + self.connection = psycopg2.connect( + conn_str, + keepalives=1, + keepalives_idle=60, + keepalives_interval=10, + keepalives_count=5, + ) self.cursor = self.connection.cursor() self.set_schema() @@ -122,6 +128,28 @@ def execute(self, sql, parameters=None): return self.cursor.rowcount + def executemany(self, sql, parameters): + try: + self.cursor.executemany(sql, parameters) + # log.debug(self.cursor.statusmessage) + except Exception as e: + log.exception("error '%s' in query: %s" % (str(e), str(sql))) + return -1 + return self.cursor.rowcount + + def execute_batch(self, sql, parameters): + try: + psycopg2.extras.execute_batch( + self.cursor, + sql, + parameters, + ) + # log.debug(self.cursor.statusmessage) + except Exception as e: + log.exception("error '%s' in query: %s" % (str(e), str(sql))) + return -1 + return self.cursor.rowcount + def file_execute(self, sqlfile): self.e = None try: From 098a55f4a49632dc29876cb52c7376c7f98cd8eb Mon Sep 17 00:00:00 2001 From: Peter Saalbrink Date: Tue, 19 Jul 2022 13:00:07 +0200 Subject: [PATCH 10/10] add stetl.outputs.bagmutoutput.BAGMutOutput class for BAG Mutations --- stetl/outputs/bagmutoutput.py | 1512 +++++++++++++++++++++++++++++++++ stetl/outputs/bagoutput.py | 6 + 2 files changed, 1518 insertions(+) create mode 100644 stetl/outputs/bagmutoutput.py diff --git a/stetl/outputs/bagmutoutput.py b/stetl/outputs/bagmutoutput.py new file mode 100644 index 0000000..c1727b4 --- /dev/null +++ b/stetl/outputs/bagmutoutput.py @@ -0,0 +1,1512 @@ +import os +import re +from functools import partial + +from lxml import etree +from psycopg2 import sql + +from stetl.bagutil import BAGUtil +from stetl.outputs.bagoutput import BAGOutput, xmlns +from stetl.util import Util +from stetl.packet import FORMAT + +log = Util.get_log('bagmutoutput') + +xmlns.update({ + 'Mutaties': ( + 'http://www.kadaster.nl/schemas/lvbag/extract-deelbestand-mutaties-lvc/v20200601' + ), + 'ml': ( + 'http://www.kadaster.nl/schemas/mutatielevering-generiek/1.0' + ), + 'mlm': ( + 'http://www.kadaster.nl/schemas/lvbag/extract-deelbestand-mutaties-lvc/v20200601' + ), +}) + + +class BAGMutOutput(BAGOutput): + """Process BAG 2.0 Extract mutation files""" + + def __init__(self, configdict, section, consumes=FORMAT.record): + super().__init__(configdict, section, consumes=consumes) + self.db = None + + def create_table(self, table): + log.info("Creating table: %s" % table) + + enum_type = { + 'gebruiksdoelverblijfsobject': [ + 'woonfunctie', + 'bijeenkomstfunctie', + 'celfunctie', + 'gezondheidszorgfunctie', + 'industriefunctie', + 'kantoorfunctie', + 'logiesfunctie', + 'onderwijsfunctie', + 'sportfunctie', + 'winkelfunctie', + 'overige gebruiksfunctie', + ], + 'nummeraanduidingstatus': [ + 'Naamgeving uitgegeven', + 'Naamgeving ingetrokken', + ], + 'pandstatus': [ + 'Bouwvergunning verleend', + 'Niet gerealiseerd pand', + 'Bouw gestart', + 'Pand in gebruik (niet ingemeten)', + 'Pand in gebruik', + 'Verbouwing pand', + 'Sloopvergunning verleend', + 'Pand gesloopt', + 'Pand buiten gebruik', + 'Pand ten onrechte opgevoerd', + ], + 'ligplaatsstatus': [ + 'Plaats aangewezen', + 'Plaats ingetrokken', + ], + 'verblijfsobjectstatus': [ + 'Verblijfsobject gevormd', + 'Niet gerealiseerd verblijfsobject', + 'Verblijfsobject in gebruik (niet ingemeten)', + 'Verblijfsobject in gebruik', + 'Verbouwing verblijfsobject', + 'Verblijfsobject ingetrokken', + 'Verblijfsobject buiten gebruik', + 'Verblijfsobject ten onrechte opgevoerd', + ], + 'woonplaatsstatus': [ + 'Woonplaats aangewezen', + 'Woonplaats ingetrokken', + ], + 'typeadresseerbaarobject': [ + 'Verblijfsobject', + 'Standplaats', + 'Ligplaats', + ], + 'openbareruimtetype': [ + 'Weg', + 'Water', + 'Spoorbaan', + 'Terrein', + 'Kunstwerk', + 'Landschappelijk gebied', + 'Administratief gebied', + ], + + 'InOnderzoekLigplaats': [ + 'geometrie', + 'status', + 'heeft als hoofdadres', + 'heeft als nevenadres', + ], + 'InOnderzoekNummeraanduiding': [ + 'huisnummer', + 'huisletter', + 'huisnummertoevoeging', + 'postcode', + 'type adresseerbaar object', + 'status', + 'ligt in', + 'ligt aan', + ], + 'InOnderzoekOpenbareRuimte': [ + 'naam', + 'type', + 'status', + 'ligt in', + ], + 'InOnderzoekPand': [ + 'geometrie', + 'oorspronkelijk bouwjaar', + 'status', + ], + 'InOnderzoekStandplaats': [ + 'geometrie', + 'status', + 'heeft als hoofdadres', + 'heeft als nevenadres', + ], + 'InOnderzoekVerblijfsobject': [ + 'geometrie', + 'gebruiksdoel', + 'oppervlakte', + 'status', + 'maakt deel uit van', + 'heeft als hoofdadres', + 'heeft als nevenadres', + ], + 'InOnderzoekWoonplaats': [ + 'naam', + 'geometrie', + 'status', + ], + + 'MutatieGroep': [ + 'toevoeging', + 'wijziging', + ], + 'MutatieToestand': [ + 'was', + 'wordt', + ], + } + + table_enum = { + 'woonplaats': [ + 'woonplaatsstatus', + ], + 'openbareruimte': [ + 'openbareruimtetype', + 'nummeraanduidingstatus', + ], + 'nummeraanduiding': [ + 'typeadresseerbaarobject', + 'nummeraanduidingstatus', + ], + 'verblijfsobject': [ + 'verblijfsobjectstatus', + ], + 'ligplaats': [ + 'ligplaatsstatus', + ], + 'standplaats': [ + 'ligplaatsstatus', + ], + 'pand': [ + 'pandstatus', + ], + + 'inonderzoek_woonplaats': [ + 'InOnderzoekWoonplaats', + ], + 'inonderzoek_openbareruimte': [ + 'InOnderzoekOpenbareRuimte', + ], + 'inonderzoek_nummeraanduiding': [ + 'InOnderzoekNummeraanduiding', + ], + 'inonderzoek_verblijfsobject': [ + 'InOnderzoekVerblijfsobject', + ], + 'inonderzoek_ligplaats': [ + 'InOnderzoekLigplaats', + ], + 'inonderzoek_standplaats': [ + 'InOnderzoekStandplaats', + ], + 'inonderzoek_pand': [ + 'InOnderzoekPand', + ], + + 'mutaties_woonplaats': [ + 'MutatieGroep', + 'MutatieToestand', + ], + 'mutaties_openbareruimte': [ + 'MutatieGroep', + 'MutatieToestand', + ], + 'mutaties_nummeraanduiding': [ + 'MutatieGroep', + 'MutatieToestand', + ], + 'mutaties_verblijfsobject': [ + 'MutatieGroep', + 'MutatieToestand', + ], + 'mutaties_ligplaats': [ + 'MutatieGroep', + 'MutatieToestand', + ], + 'mutaties_standplaats': [ + 'MutatieGroep', + 'MutatieToestand', + ], + 'mutaties_pand': [ + 'MutatieGroep', + 'MutatieToestand', + ], + } + + table_column = { + 'gid': { + 'name': 'gid', + 'type': 'serial', + 'null': False, + }, + 'identificatie': { + 'name': 'identificatie', + 'type': 'varchar(16)', + 'null': False, + }, + 'naam': { + 'name': 'naam', + 'type': 'varchar(80)', + 'null': False, + }, + + 'documentdatum': { + 'name': 'documentdatum', + 'type': 'date', + 'null': False, + }, + 'documentnummer': { + 'name': 'documentnummer', + 'type': 'varchar(40)', + 'null': False, + }, + + 'voorkomenidentificatie': { + 'name': 'voorkomenidentificatie', + 'type': 'int', + 'null': False, + }, + 'begingeldigheid': { + 'name': 'begingeldigheid', + 'type': 'date', + 'null': False, + }, + 'eindgeldigheid': { + 'name': 'eindgeldigheid', + 'type': 'date', + 'null': True, + }, + 'tijdstipregistratie': { + 'name': 'tijdstipregistratie', + 'type': 'timestamp', + 'null': False, + }, + 'eindregistratie': { + 'name': 'eindregistratie', + 'type': 'timestamp', + 'null': True, + }, + 'tijdstipinactief': { + 'name': 'tijdstipinactief', + 'type': 'timestamp', + 'null': True, + }, + 'tijdstipregistratielv': { + 'name': 'tijdstipregistratielv', + 'type': 'timestamp', + 'null': False, + }, + 'tijdstipeindregistratielv': { + 'name': 'tijdstipeindregistratielv', + 'type': 'timestamp', + 'null': True, + }, + 'tijdstipinactieflv': { + 'name': 'tijdstipinactieflv', + 'type': 'timestamp', + 'null': True, + }, + 'tijdstipnietbaglv': { + 'name': 'tijdstipnietbaglv', + 'type': 'timestamp', + 'null': True, + }, + } + + table_column_group = { + 'common': [ + { + 'name': 'geconstateerd', + 'type': 'bool', + 'null': False, + }, + { + 'include': 'documentdatum', + }, + { + 'include': 'documentnummer', + }, + ], + 'inonderzoek_common': [ + { + 'name': 'inonderzoek', + 'type': 'bool', + 'null': False, + }, + { + 'include': 'documentdatum', + }, + { + 'include': 'documentnummer', + }, + ], + 'voorkomen': [ + { + 'include': 'voorkomenidentificatie', + }, + { + 'include': 'begingeldigheid', + }, + { + 'include': 'eindgeldigheid', + }, + { + 'include': 'tijdstipregistratie', + }, + { + 'include': 'eindregistratie', + }, + { + 'include': 'tijdstipinactief', + }, + { + 'include': 'tijdstipregistratielv', + }, + { + 'include': 'tijdstipeindregistratielv', + }, + { + 'include': 'tijdstipinactieflv', + }, + { + 'include': 'tijdstipnietbaglv', + }, + ], + 'historieinonderzoek': [ + { + 'include': 'tijdstipregistratie', + }, + { + 'include': 'eindregistratie', + }, + { + 'include': 'begingeldigheid', + }, + { + 'include': 'eindgeldigheid', + }, + { + 'include': 'tijdstipregistratielv', + }, + { + 'include': 'tijdstipeindregistratielv', + }, + ], + 'adresseerbaarobject': [ + { + 'name': 'hoofdadresnummeraanduidingref', + 'type': 'char(16)', + 'null': False, + }, + { + 'include_group': 'voorkomen', + }, + { + 'name': 'nevenadresnummeraanduidingref', + 'type': 'char(16)[]', + 'null': True, + }, + ], + 'mutatie': [ + { + 'name': 'mutatiegroep', + 'type': 'MutatieGroep', + 'null': False, + }, + { + 'name': 'toestand', + 'type': 'MutatieToestand', + 'null': False, + }, + ], + } + + if not table.startswith("mutaties_"): + raise Exception + + table_structure = { + 'woonplaats': { + 'columns': [ + { + 'include': 'gid', + }, + { + 'include': 'identificatie', + }, + { + 'include': 'naam', + }, + { + 'name': 'status', + 'type': 'woonplaatsstatus', + 'null': False, + }, + { + 'include_group': 'common', + }, + { + 'include_group': 'voorkomen', + }, + { + 'name': 'wkb_geometry', + 'type': 'geometry(MultiPolygon,28992)', + 'null': False, + }, + ], + 'primary_key': 'gid', + }, + 'openbareruimte': { + 'columns': [ + { + 'include': 'gid', + }, + { + 'include': 'identificatie', + }, + { + 'include': 'naam', + }, + { + 'name': 'type', + 'type': 'openbareruimtetype', + 'null': False, + }, + { + 'name': 'status', + 'type': 'nummeraanduidingstatus', + 'null': False, + }, + { + 'include_group': 'common', + }, + { + 'name': 'woonplaatsref', + 'type': 'char(4)', + 'null': False, + }, + { + 'include_group': 'voorkomen', + }, + { + 'name': 'verkortenaam', + 'type': 'varchar(24)', + 'null': True, + }, + ], + 'primary_key': 'gid', + }, + 'nummeraanduiding': { + 'columns': [ + { + 'include': 'gid', + }, + { + 'include': 'identificatie', + }, + { + 'name': 'huisnummer', + 'type': 'int', + 'null': False, + }, + { + 'name': 'huisletter', + 'type': 'varchar(1)', + 'null': True, + }, + { + 'name': 'huisnummertoevoeging', + 'type': 'varchar(4)', + 'null': True, + }, + { + 'name': 'postcode', + 'type': 'char(6)', + 'null': True, + }, + { + 'name': 'typeadresseerbaarobject', + 'type': 'typeadresseerbaarobject', + 'null': False, + }, + { + 'name': 'status', + 'type': 'nummeraanduidingstatus', + 'null': False, + }, + { + 'include_group': 'common', + }, + { + 'name': 'woonplaatsref', + 'type': 'char(4)', + 'null': True, + }, + { + 'include_group': 'voorkomen', + }, + { + 'name': 'openbareruimteref', + 'type': 'char(16)', + 'null': False, + }, + ], + 'primary_key': 'gid', + }, + 'verblijfsobject': { + 'columns': [ + { + 'include': 'gid', + }, + { + 'include': 'identificatie', + }, + { + 'name': 'gebruiksdoel', + 'type': 'varchar[]', + 'null': False, + }, + { + 'name': 'oppervlakte', + 'type': 'int', + 'null': False, + }, + { + 'name': 'status', + 'type': 'verblijfsobjectstatus', + 'null': False, + }, + { + 'include_group': 'common', + }, + { + 'name': 'pandref', + 'type': 'char(16)[]', + 'null': False, + }, + { + 'include_group': 'adresseerbaarobject', + }, + { + 'name': 'wkb_geometry', + 'type': 'geometry(Point,28992)', + 'null': False, + }, + ], + 'primary_key': 'gid', + }, + 'ligplaats': { + 'columns': [ + { + 'include': 'gid', + }, + { + 'include': 'identificatie', + }, + { + 'name': 'status', + 'type': 'ligplaatsstatus', + 'null': False, + }, + { + 'include_group': 'common', + }, + { + 'include_group': 'adresseerbaarobject', + }, + { + 'name': 'wkb_geometry', + 'type': 'geometry(Polygon,28992)', + 'null': False, + }, + ], + 'primary_key': 'gid', + }, + 'standplaats': { + 'columns': [ + { + 'include': 'gid', + }, + { + 'include': 'identificatie', + }, + { + 'name': 'status', + 'type': 'ligplaatsstatus', + 'null': False, + }, + { + 'include_group': 'common', + }, + { + 'include_group': 'adresseerbaarobject', + }, + { + 'name': 'wkb_geometry', + 'type': 'geometry(Polygon,28992)', + 'null': False, + }, + ], + 'primary_key': 'gid', + }, + 'pand': { + 'columns': [ + { + 'include': 'gid', + }, + { + 'include': 'identificatie', + }, + { + 'name': 'oorspronkelijkbouwjaar', + 'type': 'int', + 'null': False, + }, + { + 'name': 'status', + 'type': 'pandstatus', + 'null': False, + }, + { + 'include_group': 'common', + }, + { + 'include_group': 'voorkomen', + }, + { + 'name': 'wkb_geometry', + 'type': 'geometry(Polygon,28992)', + 'null': False, + }, + ], + 'primary_key': 'gid', + }, + + 'inonderzoek_woonplaats': { + 'columns': [ + { + 'include': 'gid', + }, + { + 'name': 'kenmerk', + 'type': 'InOnderzoekWoonplaats', + 'null': False, + }, + { + 'include': 'identificatie', + }, + { + 'include_group': 'inonderzoek_common', + }, + { + 'include_group': 'historieinonderzoek', + }, + ], + 'primary_key': 'gid', + }, + 'inonderzoek_openbareruimte': { + 'columns': [ + { + 'include': 'gid', + }, + { + 'name': 'kenmerk', + 'type': 'InOnderzoekOpenbareRuimte', + 'null': False, + }, + { + 'include': 'identificatie', + }, + { + 'include_group': 'inonderzoek_common', + }, + { + 'include_group': 'historieinonderzoek', + }, + ], + 'primary_key': 'gid', + }, + 'inonderzoek_nummeraanduiding': { + 'columns': [ + { + 'include': 'gid', + }, + { + 'name': 'kenmerk', + 'type': 'InOnderzoekNummeraanduiding', + 'null': False, + }, + { + 'include': 'identificatie', + }, + { + 'include_group': 'inonderzoek_common', + }, + { + 'include_group': 'historieinonderzoek', + }, + ], + 'primary_key': 'gid', + }, + 'inonderzoek_verblijfsobject': { + 'columns': [ + { + 'include': 'gid', + }, + { + 'name': 'kenmerk', + 'type': 'InOnderzoekVerblijfsobject', + 'null': False, + }, + { + 'include': 'identificatie', + }, + { + 'include_group': 'inonderzoek_common', + }, + { + 'include_group': 'historieinonderzoek', + }, + ], + 'primary_key': 'gid', + }, + 'inonderzoek_ligplaats': { + 'columns': [ + { + 'include': 'gid', + }, + { + 'name': 'kenmerk', + 'type': 'InOnderzoekLigplaats', + 'null': False, + }, + { + 'include': 'identificatie', + }, + { + 'include_group': 'inonderzoek_common', + }, + { + 'include_group': 'historieinonderzoek', + }, + ], + 'primary_key': 'gid', + }, + 'inonderzoek_standplaats': { + 'columns': [ + { + 'include': 'gid', + }, + { + 'name': 'kenmerk', + 'type': 'InOnderzoekStandplaats', + 'null': False, + }, + { + 'include': 'identificatie', + }, + { + 'include_group': 'inonderzoek_common', + }, + { + 'include_group': 'historieinonderzoek', + }, + ], + 'primary_key': 'gid', + }, + 'inonderzoek_pand': { + 'columns': [ + { + 'include': 'gid', + }, + { + 'name': 'kenmerk', + 'type': 'InOnderzoekPand', + 'null': False, + }, + { + 'include': 'identificatie', + }, + { + 'include_group': 'inonderzoek_common', + }, + { + 'include_group': 'historieinonderzoek', + }, + ], + 'primary_key': 'gid', + }, + + 'mutaties_woonplaats': { + 'columns': [ + { + 'include': 'gid', + }, + { + 'include': 'identificatie', + }, + { + 'include': 'naam', + }, + { + 'name': 'status', + 'type': 'woonplaatsstatus', + 'null': False, + }, + { + 'include_group': 'common', + }, + { + 'include_group': 'voorkomen', + }, + { + 'include_group': 'mutatie', + }, + { + 'name': 'wkb_geometry', + 'type': 'geometry(MultiPolygon,28992)', + 'null': False, + }, + ], + 'primary_key': 'gid', + }, + 'mutaties_openbareruimte': { + 'columns': [ + { + 'include': 'gid', + }, + { + 'include': 'identificatie', + }, + { + 'include': 'naam', + }, + { + 'name': 'type', + 'type': 'openbareruimtetype', + 'null': False, + }, + { + 'name': 'status', + 'type': 'nummeraanduidingstatus', + 'null': False, + }, + { + 'include_group': 'common', + }, + { + 'include_group': 'voorkomen', + }, + { + 'name': 'woonplaatsref', + 'type': 'char(4)', + 'null': False, + }, + { + 'include_group': 'mutatie', + }, + { + 'name': 'verkortenaam', + 'type': 'varchar(24)', + 'null': True, + }, + ], + 'primary_key': 'gid', + }, + 'mutaties_nummeraanduiding': { + 'columns': [ + { + 'include': 'gid', + }, + { + 'include': 'identificatie', + }, + { + 'name': 'huisnummer', + 'type': 'int', + 'null': False, + }, + { + 'name': 'huisletter', + 'type': 'varchar(1)', + 'null': True, + }, + { + 'name': 'huisnummertoevoeging', + 'type': 'varchar(4)', + 'null': True, + }, + { + 'name': 'postcode', + 'type': 'char(6)', + 'null': True, + }, + { + 'name': 'typeadresseerbaarobject', + 'type': 'typeadresseerbaarobject', + 'null': False, + }, + { + 'name': 'status', + 'type': 'nummeraanduidingstatus', + 'null': False, + }, + { + 'include_group': 'common', + }, + { + 'include_group': 'voorkomen', + }, + { + 'name': 'woonplaatsref', + 'type': 'char(4)', + 'null': True, + }, + { + 'include_group': 'mutatie', + }, + { + 'name': 'openbareruimteref', + 'type': 'char(16)', + 'null': False, + }, + ], + 'primary_key': 'gid', + }, + 'mutaties_verblijfsobject': { + 'columns': [ + { + 'include': 'gid', + }, + { + 'include': 'identificatie', + }, + { + 'name': 'gebruiksdoel', + 'type': 'varchar[]', + 'null': False, + }, + { + 'name': 'oppervlakte', + 'type': 'int', + 'null': False, + }, + { + 'name': 'status', + 'type': 'verblijfsobjectstatus', + 'null': False, + }, + { + 'include_group': 'common', + }, + { + 'name': 'pandref', + 'type': 'char(16)[]', + 'null': False, + }, + { + 'include_group': 'adresseerbaarobject', + }, + { + 'include_group': 'mutatie', + }, + { + 'name': 'wkb_geometry', + 'type': 'geometry(Point,28992)', + 'null': False, + }, + ], + 'primary_key': 'gid', + }, + 'mutaties_ligplaats': { + 'columns': [ + { + 'include': 'gid', + }, + { + 'include': 'identificatie', + }, + { + 'name': 'status', + 'type': 'ligplaatsstatus', + 'null': False, + }, + { + 'include_group': 'common', + }, + { + 'include_group': 'adresseerbaarobject', + }, + { + 'include_group': 'mutatie', + }, + { + 'name': 'wkb_geometry', + 'type': 'geometry(Polygon,28992)', + 'null': False, + }, + ], + 'primary_key': 'gid', + }, + 'mutaties_standplaats': { + 'columns': [ + { + 'include': 'gid', + }, + { + 'include': 'identificatie', + }, + { + 'name': 'status', + 'type': 'ligplaatsstatus', + 'null': False, + }, + { + 'include_group': 'common', + }, + { + 'include_group': 'adresseerbaarobject', + }, + { + 'include_group': 'mutatie', + }, + { + 'name': 'wkb_geometry', + 'type': 'geometry(Polygon,28992)', + 'null': False, + }, + ], + 'primary_key': 'gid', + }, + 'mutaties_pand': { + 'columns': [ + { + 'include': 'gid', + }, + { + 'include': 'identificatie', + }, + { + 'name': 'oorspronkelijkbouwjaar', + 'type': 'int', + 'null': False, + }, + { + 'name': 'status', + 'type': 'pandstatus', + 'null': False, + }, + { + 'include_group': 'common', + }, + { + 'include_group': 'voorkomen', + }, + { + 'include_group': 'mutatie', + }, + { + 'name': 'wkb_geometry', + 'type': 'geometry(Polygon,28992)', + 'null': False, + }, + ], + 'primary_key': 'gid', + }, + } + + for prefix in [ + 'inactief', + 'nietbag', + ]: + for suffix in [ + 'woonplaats', + 'openbareruimte', + 'nummeraanduiding', + 'verblijfsobject', + 'ligplaats', + 'standplaats', + 'pand', + ]: + key = '%s_%s' % (prefix, suffix) + + table_structure[key] = table_structure[suffix] + + table_enum[key] = table_enum[suffix] + + if table not in table_structure: + raise Exception('Unsupported table: %s' % table) + + if table in table_enum: + for name in table_enum[table]: + if not self.enum_exists(name): + self.create_enum(name, enum_type[name]) + + fields = set() + + def add_column(i_, column_): + sqlstr_ = '' + + if 'include' in column_: + sqlstr_ += add_column( + i_, + table_column[column_['include']], + ) + elif 'include_group' in column_: + for c in table_column_group[column_['include_group']]: + sqlstr_ += add_column( + i_, + c, + ) + else: + if i_ > 0: + sqlstr_ += ', ' + + sqlstr_ += '%(name)s %(type)s' % column_ + fields.add(column_["name"]) + + if( + column_['null'] is False and # noqa: W504 + column_['type'] != 'serial' + ): + sqlstr_ += ' NOT NULL' + + i_ += 1 + + return sqlstr_ + + sqlfmt = { + 'table': sql.Identifier(table), + } + + sqlstr = r'CREATE TABLE {table} (' + + i = 0 + for column in table_structure[table]['columns']: + sqlstr += add_column(i, column) + + i += 1 + + if 'primary_key' in table_structure[table]: + sqlstr += ', PRIMARY KEY ({primary_key})' + + key = 'primary_key' + + sqlfmt[key] = sql.Identifier(table_structure[table][key]) + + sqlstr += r')' + + if not self.table_exists(table): + + log.debug("sqlstr: %s" % sqlstr) + log.debug("sqlfmt: %s" % sqlfmt) + + query = sql.SQL(sqlstr).format(**sqlfmt) + + log.debug("query: %s" % query.as_string(context=self.db.cursor)) + + self.db.execute(query) + self.db.commit(close=False) + + return fields + + def insert_records(self, table, records): + fields_from_creation = self.create_table(table) + fields_from_creation.remove("gid") + + if self.truncate: + self.truncate_table(table) + + if not records: + log.warning("No records for '%s'" % table) + return + + fields_from_record = set(records[0].keys()) + log.debug("fields: %s" % fields_from_creation) + log.debug("fields: %s" % fields_from_record) + + fields_to_delete = tuple(fields_from_record - fields_from_creation) + for record in records: + for field in fields_to_delete: + del record[field] + + fields = str(tuple(sorted(fields_from_creation))).replace("'", '"') + values = ", ".join(f"%({field})s" for field in records[0]) + + if "wkb_geometry" in fields_from_creation: + values = values.replace("%(wkb_geometry)s", "ST_SetSRID(%(wkb_geometry)s::geometry, 28992)") + + query = f"INSERT INTO {table} {fields} VALUES ({values})" + + log.debug("query: %s" % query) + + result = self.db.execute_batch( + query, + records, + ) + self.db.commit(close=False) + + log.debug("Inserted records for '%s': %d" % (table, result)) + + def process_levering_xml(self, tree, root, xml_file): + log.info("Processing: BAG Extract Mutatie Levering") + + table = 'nlx_bag_info' + + elements = root.xpath( + ( + '.' + '/xb:SelectieGegevens' + '/selecties-extract:MUT-Extract' + '/selecties-extract:StandTechnischeDatum' + ), + namespaces=xmlns, + ) + + if not elements: + raise Exception("Failed to find StandTechnischeDatum element") + + extract_datum = elements[0].text + + record = { + 'sleutel': 'extract_datum', + 'waarde': extract_datum, + } + + key = 'sleutel' + + self.upsert_record( + table, + record, + identifiers={ + key: record[key], + } + ) + + with open(xml_file) as f: + data = f.read() + + record = { + 'sleutel': 'levering_xml', + 'waarde': data, + } + + self.upsert_record( + table, + record, + identifiers={ + key: record[key], + } + ) + + self.db.commit(close=False) + + def element_to_row( + self, + element, + fields, + object_type, + object_element, + mutation_type, + mutation_verb, + ): + row = { + field: None + for field in fields + } + + row = self.process_bag_object_element( + object_type, + object_element, + element, + row, + ) + + row["mutatiegroep"] = mutation_type + row["toestand"] = mutation_verb + + return row + + def process_bag_object_xml( + self, + tree, + root, + status_type, + object_type, + *args, + ): + if args: + raise Exception(f"Extra arguments not accepted for {type(self).__name__!r}.") + + object_element = { + 'LIG': 'Ligplaats', + 'NUM': 'Nummeraanduiding', + 'OPR': 'OpenbareRuimte', + 'PND': 'Pand', + 'STA': 'Standplaats', + 'VBO': 'Verblijfsobject', + 'WPL': 'Woonplaats', + } + mutations = ( + ("toevoeging", "wordt"), + ("verwijdering", "was"), + ("wijziging", "was"), + ("wijziging", "wordt"), + ) + + elements = root.xpath( + ( + '.' + '/ml:mutatieBericht' + '/ml:mutatieGroep' + '/*' + '/*' + '/mlm:bagObject' + ), + namespaces=xmlns, + ) + + if not elements: + log.warning('No bagObject elements found') + return + + fields = [ + "begingeldigheid", + "documentdatum", + "documentnummer", + "eindgeldigheid", + "eindregistratie", + "gebruiksdoel", + "geconstateerd", + "hoofdadresnummeraanduidingref", + "huisletter", + "huisnummer", + "huisnummertoevoeging", + "identificatie", + "mutatiegroep", + "naam", + "nevenadresnummeraanduidingref", + "nummeraanduidingref", + "oorspronkelijkbouwjaar", + "openbareruimteref", + "oppervlakte", + "pandref", + "postcode", + "status", + "tijdstipeindregistratielv", + "tijdstipinactief", + "tijdstipinactieflv", + "tijdstipnietbaglv", + "tijdstipregistratie", + "tijdstipregistratielv", + "toestand", + "type", + "typeadresseerbaarobject", + "verkortenaam", + "voorkomenidentificatie", + "wkb_geometry", + "woonplaatsref", + ] + + for key, obj in object_element.items(): + elements_filtered = [ + element + for element in elements + if element.getchildren()[0].tag.endswith(obj) + ] + + records = [] + table = f"mutaties_{obj.lower()}" + for mutation_type, mutation_verb in mutations: + elements_filtered_twice = [ + element + for element in elements_filtered + if element.getparent().tag.endswith(mutation_verb) + and element.getparent().getparent().tag.endswith(mutation_type) + ] + + if mutation_type == "verwijdering" and elements_filtered_twice: + log.warning("Not implemented for 'verwijdering'") + raise NotImplementedError("'verwijdering'") + + if not elements_filtered_twice: + log.debug( + "Skipping table=%s object_type=%s mutation_type=%s mutation_verb=%s" + % (table, key, mutation_type, mutation_verb) + ) + continue + + log.debug( + "Processing table=%s object_type=%s mutation_type=%s mutation_verb=%s" + % (table, key, mutation_type, mutation_verb) + ) + to_row = partial( + self.element_to_row, + fields=fields, + object_type=key, + object_element=object_element, + mutation_type=mutation_type, + mutation_verb=mutation_verb, + ) + records.extend(to_row(element) for element in elements_filtered_twice) + + self.insert_records(table, records) + + def process_mut_xml(self, tree, root, xml_file): + log.info("Processing: BAG Mutaties") + + filename = os.path.basename(xml_file) + + match = re.search( + r'^\d{4}(IO|)(MUT)\d{8}-\d{8}-\d{6}\.xml$', + filename, + ) + + if not match: + raise Exception('Failed to parse filename: %s' % filename) + + status_type, object_type = match.groups() + + log.debug("status_type: %s" % status_type) + log.debug("object_type: %s" % object_type) + + if object_type != "MUT": + raise Exception("Unsupported object type: %s" % object_type) + + if status_type == '': + # Normal objects + self.process_bag_object_xml( + tree, + root, + status_type, + object_type, + ) + elif status_type == 'IO': + # Under investigation (InOnderzoek) + log.debug("InOnderzoek niet ondersteund") + else: + raise Exception("Unsupported status objects: %s (%s)" % (status_type, object_type)) + + def process_xml_file(self, xml_file): + log.info("Processing XML file: %s" % xml_file) + + tree = etree.parse(xml_file) + + root = tree.getroot() + + log.debug("root: %s" % root.tag) + + if root.tag == '{%s}BAG-Extract-Levering' % xmlns['xb']: + self.process_levering_xml(tree, root, xml_file) + elif root.tag == '{%s}BAG-GWR-Deelbestand-LVC' % xmlns['gwr-bestand']: + self.process_gwr_xml(tree, root, xml_file) + elif root.tag == '{%s}bagStand' % xmlns['sl-bag-extract']: + self.process_stand_xml(tree, root, xml_file) + elif root.tag == '{%s}bagMutaties' % xmlns['Mutaties']: + if xml_file.endswith("000001.xml"): + log.warning("Skipping unsupported file: %s" % xml_file) + else: + self.process_mut_xml(tree, root, xml_file) + else: + log.warning("Skipping unsupported file: %s" % xml_file) + + BAGUtil.remove_temp_file(xml_file) diff --git a/stetl/outputs/bagoutput.py b/stetl/outputs/bagoutput.py index b7ae9c4..1e15c8d 100644 --- a/stetl/outputs/bagoutput.py +++ b/stetl/outputs/bagoutput.py @@ -1335,6 +1335,12 @@ def copy_from_csv(self, csv_file, table, fields, delimiter, null, truncate=False elif truncate or self.truncate: self.truncate_table(table) + mapping = { + "begingeldigheid": "begindatumtijdvakgeldigheid", + "eindgeldigheid": "einddatumtijdvakgeldigheid", + } + fields = [mapping.get(field, field) for field in fields] + log.info("Copying records from CSV file: %s" % csv_file) sqlfmt = {