-
-
Notifications
You must be signed in to change notification settings - Fork 37
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #133 from opensanctions/permid-enricher
Add PermID enricher
- Loading branch information
Showing
5 changed files
with
371 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
import io | ||
import csv | ||
import json | ||
import logging | ||
from lxml import etree | ||
|
||
# from pprint import pprint | ||
from itertools import product | ||
from functools import lru_cache | ||
from typing import Set, Generator, Optional | ||
from urllib.parse import urljoin | ||
from followthemoney.types import registry | ||
|
||
from nomenklatura.entity import CE | ||
from nomenklatura.dataset import DS | ||
from nomenklatura.cache import Cache | ||
from nomenklatura.enrich.common import Enricher, EnricherConfig | ||
from nomenklatura.enrich.common import EnrichmentException | ||
|
||
|
||
log = logging.getLogger(__name__) | ||
|
||
GN = "{http://www.geonames.org/ontology#}" | ||
STATUS = { | ||
"tr-org:statusActive": "Active", | ||
"tr-org:statusInActive": "Inactive", | ||
} | ||
|
||
|
||
class PermIDEnricher(Enricher): | ||
MATCHING_API = "https://api-eit.refinitiv.com/permid/match" | ||
|
||
def __init__(self, dataset: DS, cache: Cache, config: EnricherConfig): | ||
super().__init__(dataset, cache, config) | ||
token_var = "${PERMID_API_TOKEN}" | ||
self.api_token: Optional[str] = self.get_config_expand("api_token", token_var) | ||
self.quota_exceeded = False | ||
if self.api_token == token_var: | ||
self.api_token = None | ||
if self.api_token is None: | ||
log.warning("PermID has no API token (%s)" % token_var) | ||
|
||
def entity_to_queries(self, entity: CE) -> bytes: | ||
names = entity.get_type_values(registry.name, matchable=True) | ||
countries = entity.get("jurisdiction", quiet=True) | ||
if not len(countries): | ||
countries = entity.get_type_values(registry.country, matchable=True) | ||
country_set = {c.upper()[:2] for c in countries} | ||
if len(country_set) == 0: | ||
country_set.add("") | ||
sio = io.StringIO() | ||
writer = csv.writer(sio, dialect=csv.unix_dialect, delimiter=",") | ||
# LocalID,Standard Identifier,Name,Country,Street,City,PostalCode,State,Website | ||
writer.writerow(["LocalID", "Standard Identifier", "Name", "Country"]) | ||
lei_code = entity.first("leiCode", quiet=True) | ||
if lei_code is not None: | ||
lei_code = f"LEI:{lei_code}" | ||
else: | ||
lei_code = "" | ||
for name, country in list(product(names, country_set))[:999]: | ||
writer.writerow([entity.id, lei_code, name, country]) | ||
sio.seek(0) | ||
return sio.getvalue().encode("utf-8") | ||
|
||
@lru_cache(maxsize=1000) | ||
def fetch_placename(self, value: Optional[str]) -> Optional[str]: | ||
if value is None: | ||
return None | ||
if not value.startswith("http://sws.geonames.org/"): | ||
raise ValueError("Not a GeoNames URL: %s" % value) | ||
url = urljoin(value, "about.rdf") | ||
res = self.http_get_cached(url, cache_days=500) | ||
doc = etree.fromstring(res.encode("utf=8")) | ||
for code in doc.findall(".//%scountryCode" % GN): | ||
return code.text | ||
for name in doc.findall(".//%sname" % GN): | ||
return name.text | ||
return value | ||
|
||
def fetch_permid(self, entity: CE, url: str) -> CE: | ||
params = {"format": "json-ld"} | ||
hidden = {"access-token": self.api_token} | ||
res_raw = self.http_get_cached(url, params=params, hidden=hidden) | ||
if not len(res_raw): | ||
raise EnrichmentException("Empty response from PermID") | ||
res = json.loads(res_raw) | ||
|
||
res.pop("@id", None) | ||
res.pop("@type", None) | ||
res.pop("@context", None) | ||
res.pop("hasPrimaryIndustryGroup", None) | ||
|
||
perm_id = res.pop("tr-common:hasPermId", url.rsplit("-", 1)[-1]) | ||
lei_code = res.pop("tr-org:hasLEI", None) | ||
match = self.make_entity(entity, "Company") | ||
match.id = f"lei-{lei_code}" if lei_code is not None else f"permid-{perm_id}" | ||
match.add("sourceUrl", url) | ||
match.add("leiCode", lei_code) | ||
match.add("name", res.pop("vcard:organization-name", None)) | ||
match.add("website", res.pop("hasURL", None)) | ||
match.add("country", self.fetch_placename(res.pop("isDomiciledIn", None))) | ||
incorporated = self.fetch_placename(res.pop("isIncorporatedIn", None)) | ||
match.add("jurisdiction", incorporated) | ||
inc_date = res.pop("hasLatestOrganizationFoundedDate", None) | ||
match.add("incorporationDate", inc_date) | ||
|
||
hq_addr = res.pop("mdaas:HeadquartersAddress", None) | ||
reg_addr = res.pop("mdaas:RegisteredAddress", None) | ||
for addr in (hq_addr, reg_addr): | ||
if addr is not None: | ||
addr = ", ".join(addr.split("\n")) | ||
addr = addr.replace(",,", ",").strip().strip(",") | ||
match.add("address", addr) | ||
status_uri = res.pop("hasActivityStatus", None) | ||
status = STATUS.get(status_uri) | ||
if status is None: | ||
log.warning("Unknown status: %s" % status_uri) | ||
match.add("status", status) | ||
match.add("phone", res.pop("tr-org:hasHeadquartersPhoneNumber", None)) | ||
match.add("phone", res.pop("tr-org:hasRegisteredPhoneNumber", None)) | ||
res.pop("tr-org:hasHeadquartersFaxNumber", None) | ||
res.pop("tr-org:hasRegisteredFaxNumber", None) | ||
# pprint(match.to_dict()) | ||
# pprint(res) | ||
return match | ||
|
||
def match(self, entity: CE) -> Generator[CE, None, None]: | ||
if not entity.schema.is_a("Organization"): | ||
return | ||
headers = { | ||
"x-openmatch-numberOfMatchesPerRecord": "3", | ||
"X-AG-Access-Token": self.api_token, | ||
"x-openmatch-dataType": "Organization", | ||
} | ||
cache_key = f"permid:{entity.id}" | ||
query = self.entity_to_queries(entity) | ||
res = self.http_post_json_cached( | ||
self.MATCHING_API, cache_key, data=query, headers=headers | ||
) | ||
seen_matches: Set[str] = set() | ||
for result in res.get("outputContentResponse", []): | ||
match_permid_url = result.get("Match OpenPermID") | ||
if match_permid_url is None or match_permid_url in seen_matches: | ||
continue | ||
seen_matches.add(match_permid_url) | ||
match = self.fetch_permid(entity, match_permid_url) | ||
yield match | ||
|
||
def expand(self, entity: CE, match: CE) -> Generator[CE, None, None]: | ||
yield match |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.