Skip to content

Commit

Permalink
Merge pull request #29 from OCHA-DAP/pcode_formats
Browse files Browse the repository at this point in the history
HAPI-263 P-Code length conversion for higher admin levels
  • Loading branch information
mcarans authored Nov 1, 2023
2 parents cf80895 + e8be773 commit 3e54e8f
Show file tree
Hide file tree
Showing 4 changed files with 630 additions and 24 deletions.
23 changes: 21 additions & 2 deletions documentation/main.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,20 +128,39 @@ values of the form:
{"iso3": "AFG", "pcode": "AF01", "name": "Kabul"}

Method *setup_from_libhxl_dataset* takes a libhxl Dataset object, while
*setup_from_url* takes a URL which defaults to the global p-codes dataset on
HDX.
*setup_from_url* takes a URL which defaults to a resource in the global p-codes
dataset on HDX.

These methods also have optional parameter *countryiso3s* which is a tuple or
list of country ISO3 codes to be read or None if all countries are desired.

Examples of usage:

AdminLevel.looks_like_pcode("YEM123") # returns True
AdminLevel.looks_like_pcode("Yemen") # returns False
AdminLevel.looks_like_pcode("YEME123") # returns False
adminlevel = AdminLevel(config)
adminlevel.setup_from_admin_info(admin_info, countryiso3s=("YEM",))
adminlevel.get_pcode("YEM", "YEM030", logname="test") # returns ("YE30", True)
adminlevel.get_pcode("YEM", "Al Dhale"e / الضالع") # returns ("YE30", False)
adminlevel.get_pcode("YEM", "Al Dhale"e / الضالع", fuzzy_match=False) # returns (None, True)

There is basic admin 1 p-code length conversion by default. A more advanced
p-code length conversion can be activated by calling *load_pcode_formats*
which takes a URL that defaults to a resource in the global p-codes dataset on
HDX:

admintwo.load_pcode_formats()
admintwo.get_pcode("YEM", "YEM30001") # returns ("YE3001", True)

The length conversion can be further enhanced by supplying either parent
AdminLevel objects in a list or lists of p-codes per parent admin level:

admintwo.set_parent_admins_from_adminlevels([adminone])
admintwo.get_pcode("NER", "NE00409") # returns ("NER004009", True)
admintwo.set_parent_admins([adminone.pcodes])
admintwo.get_pcode("NER", "NE00409") # returns ("NER004009", True)

## Currencies

Various functions support the conversion of monetary amounts to USD. Note that the
Expand Down
241 changes: 221 additions & 20 deletions src/hdx/location/adminlevel.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import re
from typing import Dict, List, Optional, Tuple

import hxl
Expand All @@ -9,6 +10,7 @@
from hdx.location.country import Country
from hdx.location.names import clean_name
from hdx.location.phonetics import Phonetics
from hdx.utilities.dictandlist import dict_of_sets_add
from hdx.utilities.text import multiple_replace
from hdx.utilities.typehint import ListTuple

Expand Down Expand Up @@ -36,8 +38,11 @@ class AdminLevel:
admin_level_overrides (Dict): Countries at other admin levels.
"""

pcode_regex = re.compile(r"^([a-zA-Z]{2,3})(\d*)$")
_admin_url_default = "https://data.humdata.org/dataset/cb963915-d7d1-4ffa-90dc-31277e24406f/resource/f65bc260-4d8b-416f-ac07-f2433b4d5142/download/global_pcodes_adm_1_2.csv"
_admin_url = _admin_url_default
_formats_url_default = "https://data.humdata.org/dataset/cb963915-d7d1-4ffa-90dc-31277e24406f/resource/f1161807-dab4-4331-b7b0-4e5dac56e0e4/download/global_pcode_lengths.csv"
_formats_url = _formats_url_default

def __init__(
self,
Expand All @@ -58,10 +63,28 @@ def __init__(
self.name_to_pcode = {}
self.pcode_to_name = {}
self.pcode_to_iso3 = {}
self.pcode_formats = {}
self.zeroes = {}
self.parent_admins = []

self.init_matches_errors()
self.phonetics = Phonetics()

@classmethod
def looks_like_pcode(cls, string: str) -> bool:
"""Check if a string looks like a p-code using regex matching of format.
Checks for 2 or 3 letter country iso code at start and then numbers.
Args:
string (str): String to check
Returns:
bool: Whether string looks like a p-code
"""
if cls.pcode_regex.match(string):
return True
return False

@classmethod
def set_default_admin_url(cls, admin_url: Optional[str] = None) -> None:
"""
Expand Down Expand Up @@ -185,6 +208,57 @@ def setup_from_url(
admin_info = self.get_libhxl_dataset(admin_url)
self.setup_from_libhxl_dataset(admin_info, countryiso3s)

def load_pcode_formats(self, formats_url: str = _formats_url) -> None:
"""
Load p-code formats from a URL. Defaults to global p-codes dataset on HDX.
Args:
formats_url (str): URL from which to load data. Defaults to global p-codes dataset.
Returns:
None
"""
formats_info = self.get_libhxl_dataset(formats_url)
for row in formats_info:
pcode_format = [int(row.get("#country+len"))]
for admin_no in range(1, 4):
length = row.get(f"#adm{admin_no}+len")
if not length or "|" in length:
break
pcode_format.append(int(length))
self.pcode_formats[row.get("#country+code")] = pcode_format

for pcode in self.pcodes:
countryiso3 = self.pcode_to_iso3[pcode]
for x in re.finditer("0", pcode):
dict_of_sets_add(self.zeroes, countryiso3, x.start())

def set_parent_admins(self, parent_admins: List[List]) -> None:
"""
Set parent admins
Args:
parent_admins (List[List]): List of P-codes per parent admin
Returns:
None
"""
self.parent_admins = parent_admins

def set_parent_admins_from_adminlevels(
self, adminlevels: List["AdminLevel"]
) -> None:
"""
Set parent admins from AdminLevel objects
Args:
parent_admins (List[AdminLevel]): List of parent AdminLevel objects
Returns:
None
"""
self.parent_admins = [adminlevel.pcodes for adminlevel in adminlevels]

def get_pcode_list(self) -> List[str]:
"""Get list of all pcodes
Expand Down Expand Up @@ -229,15 +303,139 @@ def init_matches_errors(self) -> None:
self.ignored = set()
self.errors = set()

def convert_admin_pcode_length(
self, countryiso3: str, pcode: str, logname: Optional[str] = None
) -> Optional[str]:
"""Standardise pcode length by country and match to an internal pcode.
Requires that p-code formats be loaded (eg. using load_pcode_formats)
Args:
countryiso3 (str): ISO3 country code
pcode (str): P code to match
logname (Optional[str]): Identifying name to use when logging. Defaults to None (don't log).
Returns:
Optional[str]: Matched P code or None if no match
"""
match = self.pcode_regex.match(pcode)
if not match:
return None
pcode_format = self.pcode_formats.get(countryiso3)
if not pcode_format:
if self.get_admin_level(countryiso3) == 1:
return self.convert_admin1_pcode_length(
countryiso3, pcode, logname
)
return None
countryiso, digits = match.groups()
countryiso_length = len(countryiso)
if countryiso_length > pcode_format[0]:
countryiso2 = Country.get_iso2_from_iso3(countryiso3)
pcode_parts = [countryiso2, digits]
elif countryiso_length < pcode_format[0]:
pcode_parts = [countryiso3, digits]
else:
pcode_parts = [countryiso, digits]
new_pcode = "".join(pcode_parts)
if new_pcode in self.pcodes:
if logname:
self.matches.add(
(
logname,
countryiso3,
new_pcode,
self.pcode_to_name[new_pcode],
"pcode length conversion-country",
)
)
return new_pcode
total_length = sum(pcode_format[: self.admin_level + 1])
admin_changes = []
for admin_no in range(1, self.admin_level + 1):
len_new_pcode = len(new_pcode)
if len_new_pcode == total_length:
break
admin_length = pcode_format[admin_no]
pcode_part = pcode_parts[admin_no]
part_length = len(pcode_part)
if part_length == admin_length:
break
pos = sum(pcode_format[:admin_no])
if part_length < admin_length:
if pos in self.zeroes[countryiso3]:
pcode_parts[admin_no] = f"0{pcode_part}"
admin_changes.append(str(admin_no))
new_pcode = "".join(pcode_parts)
break
elif part_length > admin_length and admin_no == self.admin_level:
if pcode_part[0] == "0":
pcode_parts[admin_no] = pcode_part[1:]
admin_changes.append(str(admin_no))
new_pcode = "".join(pcode_parts)
break
if len_new_pcode < total_length:
if admin_length > 2 and pos in self.zeroes[countryiso3]:
pcode_part = f"0{pcode_part}"
if self.parent_admins and admin_no < self.admin_level:
parent_pcode = [
pcode_parts[i] for i in range(admin_no)
]
parent_pcode.append(pcode_part[:admin_length])
parent_pcode = "".join(parent_pcode)
if (
parent_pcode
not in self.parent_admins[admin_no - 1]
):
pcode_part = pcode_part[1:]
else:
admin_changes.append(str(admin_no))
else:
admin_changes.append(str(admin_no))
elif len_new_pcode > total_length:
if admin_length <= 2 and pcode_part[0] == "0":
pcode_part = pcode_part[1:]
if self.parent_admins and admin_no < self.admin_level:
parent_pcode = [
pcode_parts[i] for i in range(admin_no)
]
parent_pcode.append(pcode_part[:admin_length])
parent_pcode = "".join(parent_pcode)
if (
parent_pcode
not in self.parent_admins[admin_no - 1]
):
pcode_part = f"0{pcode_part}"
else:
admin_changes.append(str(admin_no))
else:
admin_changes.append(str(admin_no))
pcode_parts[admin_no] = pcode_part[:admin_length]
pcode_parts.append(pcode_part[admin_length:])
new_pcode = "".join(pcode_parts)
if new_pcode in self.pcodes:
if logname:
admin_changes_str = ",".join(admin_changes)
self.matches.add(
(
logname,
countryiso3,
new_pcode,
self.pcode_to_name[new_pcode],
f"pcode length conversion-admins {admin_changes_str}",
)
)
return new_pcode
return None

def convert_admin1_pcode_length(
self, countryiso3: str, pcode: str, logname: Optional[str] = None
) -> Optional[str]:
"""Standardise pcode length by country and match to an internal pcode. Only
works for admin1 pcodes.
"""Standardise pcode length by country and match to an internal pcode.
Only works for admin1 pcodes.
Args:
countryiso3 (str): Iso3 country code
pcode (str): P code for admin one
countryiso3 (str): ISO3 country code
pcode (str): P code for admin one to match
logname (Optional[str]): Identifying name to use when logging. Defaults to None (don't log).
Returns:
Expand Down Expand Up @@ -409,23 +607,26 @@ def get_pcode(
pcode = self.admin_name_mappings.get(name)
if pcode and self.pcode_to_iso3[pcode] == countryiso3:
return pcode, True
name_to_pcode = self.name_to_pcode.get(countryiso3)
if name_to_pcode is not None:
pcode = name_to_pcode.get(name.lower())
if pcode:
return pcode, True
if name in self.pcodes: # name is a pcode
return name, True
if self.get_admin_level(countryiso3) == 1:
pcode = self.convert_admin1_pcode_length(
countryiso3, name, logname
if self.looks_like_pcode(name):
pcode = name.upper()
if pcode in self.pcodes: # name is a p-code
return name, True
# name looks like a p-code, but doesn't match p-codes
# so try adjusting p-code length
pcode = self.convert_admin_pcode_length(
countryiso3, pcode, logname
)
if pcode:
return pcode, True
if not fuzzy_match:
return None, True
pcode = self.fuzzy_pcode(countryiso3, name, logname)
return pcode, False
return pcode, True
else:
name_to_pcode = self.name_to_pcode.get(countryiso3)
if name_to_pcode is not None:
pcode = name_to_pcode.get(name.lower())
if pcode:
return pcode, True
if not fuzzy_match:
return None, True
pcode = self.fuzzy_pcode(countryiso3, name, logname)
return pcode, False

def output_matches(self) -> List[str]:
"""Output log of matches
Expand Down
Loading

0 comments on commit 3e54e8f

Please sign in to comment.