diff --git a/Changes.txt b/Changes.txt index b27c58c..dd80a0f 100644 --- a/Changes.txt +++ b/Changes.txt @@ -1,3 +1,7 @@ +unreleased + Batch example: Guess if input is coordinate pair, if so then do reverse geocoding + Batch example: Give example of input file format + v2.3.0 Tue 04 Jul 2023 Batch example: Raise exception when API key fails (quota, missing API key) Batch example: Raise exception when input file contains an empty line. Better diff --git a/examples/batch.py b/examples/batch.py index b750fbf..b4a6375 100755 --- a/examples/batch.py +++ b/examples/batch.py @@ -3,54 +3,105 @@ # Background tutorial on async programming with Python # https://realpython.com/async-io-python/ -# Requires Python 3.7 or newer. Tested with 3.8 and 3.9. +# Requires Python 3.7 or newer. Tested with 3.8/3.9/3.10/3.11. # Installation: # pip3 install --upgrade opencage asyncio aiohttp backoff tqdm +# Example (forward) input file: +# 1,"Via Allende 8, Cascina, Toscana, Italia" +# 2,"Via Coppi, 17, Formigine, Emilia-Romagna, Italia" +# 3,"Via Dei Salici 20, Gallarate, Lombardia, Italia" +# 4,"Via Vittorio Veneto N7, San Giuliano Terme, Toscana, Italia" +# 5,"Via Tiro A Segno 8, Gallarate, Lombardia, Italia" + +# Example (reverse) input file: +# 1,"43.6783472,10.5533173" +# 2,"44.5655041,10.8412106" +# 3,"45.6823942,8.7919808" +# 4,"43.7804922,10.402925" +# 5,"45.6506236,8.8037173" +# or +# 1,43.6783472,10.5533173 +# 2,44.5655041,10.8412106 +# 3,45.6823942,8.7919808 +# 4,43.7804922,10.402925 +# 5,45.6506236,8.8037173 + +import os import sys import csv +import re import asyncio import traceback import backoff from tqdm import tqdm from opencage.geocoder import OpenCageGeocode, AioHttpError + + + + + API_KEY = '' FILENAME_INPUT_CSV = 'file_to_geocode.csv' FILENAME_OUTPUT_CSV = 'file_geocoded.csv' +FORWARD_OR_REVERSE = 'guess' # 'forward' (address -> coordinates) or 'reverse' (coordinates -> address) + # With 'guess' the script checks if the address is two numbers and then + # assumes reverse -MAX_ITEMS = 100 # Howy man lines to read from the input file. Set to 0 for unlimited +MAX_ITEMS = 100 # How many lines to read from the input file. Set to 0 for unlimited NUM_WORKERS = 3 # For 10 requests per second try 2-5 REQUEST_TIMEOUT_SECONDS = 5 # For individual HTTP requests. Default is 1 RETRY_MAX_TRIES = 10 # How often to retry if a HTTP request times out RETRY_MAX_TIME = 60 # Limit in seconds for retries SHOW_PROGRESS = True # Show progress bar + + + + + + + + +if os.path.exists(FILENAME_OUTPUT_CSV): + sys.stderr.write(f"The output file '{FILENAME_OUTPUT_CSV}' already exists.\n") + sys.exit(1) + csv_writer = csv.writer(open(FILENAME_OUTPUT_CSV, 'w', encoding='utf8', newline='')) PROGRESS_BAR = SHOW_PROGRESS and tqdm(total=0, position=0, desc="Addresses geocoded", dynamic_ncols=True) -async def write_one_geocoding_result(geocoding_results, address, address_id): - if geocoding_results is not None and len(geocoding_results): - first_result = geocoding_results[0] +# '40.78,-73.97' => true +# '3rd Ave, New York' => false +def guess_text_is_coordinate_pair(text): + coordinate_pattern = r'^(-?\d+(\.\d+)?),(-?\d+(\.\d+)?)$' + # x = 'yes' if bool(re.search(coordinate_pattern, text)) else 'no' + # sys.stderr.write(f"{text} is coordinate_pair: {x}\n") + return bool(re.search(coordinate_pattern, text)) + +async def write_one_geocoding_result(geocoding_result, address, address_id): + # print(geocoding_result, file=sys.stderr) + if geocoding_result is not None: row = [ address_id, - first_result['geometry']['lat'], - first_result['geometry']['lng'], + geocoding_result['geometry']['lat'], + geocoding_result['geometry']['lng'], # Any of the components might be empty: - first_result['components'].get('_type', ''), - first_result['components'].get('country', ''), - first_result['components'].get('county', ''), - first_result['components'].get('city', ''), - first_result['components'].get('postcode', ''), - first_result['components'].get('road', ''), - first_result['components'].get('house_number', ''), - first_result['confidence'], - first_result['formatted'] + geocoding_result['components'].get('_type', ''), + geocoding_result['components'].get('country', ''), + geocoding_result['components'].get('county', ''), + geocoding_result['components'].get('city', ''), + geocoding_result['components'].get('postcode', ''), + geocoding_result['components'].get('road', ''), + geocoding_result['components'].get('house_number', ''), + geocoding_result['confidence'], + geocoding_result['formatted'] ] else: + sys.stderr.write(f"not found, writing empty result: {address}\n") row = [ address_id, 0, # not to be confused with https://en.wikipedia.org/wiki/Null_Island @@ -65,7 +116,6 @@ async def write_one_geocoding_result(geocoding_results, address, address_id): -1, # confidence values are 1-10 (lowest to highest), use -1 for unknown '' ] - sys.stderr.write(f"not found, writing empty result: {address}\n") csv_writer.writerow(row) @@ -84,24 +134,32 @@ def backoff_hdlr(details): on_backoff=backoff_hdlr) async def geocode_one_address(address, address_id): async with OpenCageGeocode(API_KEY) as geocoder: - # address -> coordinates - # note: you may also want to set other optional parameters like - # countrycode, language, etc - # see the full list: https://opencagedata.com/api#forward-opt + global FORWARD_OR_REVERSE try: - geocoding_results = await geocoder.geocode_async(address, no_annotations=1) + if FORWARD_OR_REVERSE == 'reverse' or \ + (FORWARD_OR_REVERSE == 'guess' and guess_text_is_coordinate_pair(address)): + # Reverse: + # coordinates -> address, e.g. '40.78,-73.97' => '101, West 91st Street, New York' + lon_lat = address.split(',') + geocoding_results = await geocoder.reverse_geocode_async( + lon_lat[0], lon_lat[1], no_annotations=1) + else: + # Forward: + # address -> coordinates + # note: you may also want to set other optional parameters like + # countrycode, language, etc + # see the full list: https://opencagedata.com/api#forward-opt + geocoding_results = await geocoder.geocode_async(address, no_annotations=1) except Exception as exc: - geocoding_results = None traceback.print_exception(exc, file=sys.stderr) - # coordinates -> address, e.g. '40.78,-73.97' => 101, West 91st Street, New York - # lon_lat = address.split(',') - # geocoding_result = await geocoder.reverse_geocode_async(lon_lat[0], lon_lat[1], no_annotations=1) - # returns a single result so we convert it to a list - # geocoding_results = [geocoding_result] - try: - await write_one_geocoding_result(geocoding_results, address, address_id) + if geocoding_results is not None and len(geocoding_results): + geocoding_result = geocoding_results[0] + else: + geocoding_result = None + + await write_one_geocoding_result(geocoding_result, address, address_id) except Exception as exc: traceback.print_exception(exc, file=sys.stderr) @@ -127,7 +185,7 @@ async def run_worker(worker_name, queue): async def main(): global PROGRESS_BAR - assert sys.version_info >= (3, 7), "Script requires Python 3.7+." + assert sys.version_info >= (3, 7), "Script requires Python 3.7 or newer" ## 1. Read CSV into a Queue ## Each work_item is an address and id. The id will be part of the output, @@ -138,16 +196,23 @@ async def main(): ## queue = asyncio.Queue(maxsize=MAX_ITEMS) - csv_reader = csv.reader(open(FILENAME_INPUT_CSV, 'r'), strict=True, skipinitialspace=True) + with open(FILENAME_INPUT_CSV, 'r', encoding='utf-8') as infile: + csv_reader = csv.reader(infile, strict=True, skipinitialspace=True) + + for row in csv_reader: + if len(row) == 0: + raise Exception(f"Empty line in input file at line number {csv_reader.line_num}, aborting") - for row in csv_reader: - if len(row) == 0: - raise Exception(f"Empty line in input file at line number {csv_reader.line_num}, aborting") + if FORWARD_OR_REVERSE == 'reverse' or \ + (FORWARD_OR_REVERSE == 'guess' and len(row) > 2 and \ + guess_text_is_coordinate_pair(f"{row[1]},{row[2]}")): + work_item = {'id': row[0], 'address': f"{row[1]},{row[2]}"} + else: + work_item = {'id': row[0], 'address': row[1]} - work_item = {'id': row[0], 'address': row[1]} - await queue.put(work_item) - if queue.full(): - break + await queue.put(work_item) + if queue.full(): + break sys.stderr.write(f"{queue.qsize()} work_items in queue\n")