Skip to content

Commit

Permalink
Batch example: Guess if input is coordinate pair, if so then do rever…
Browse files Browse the repository at this point in the history
…se (#51)
  • Loading branch information
mtmail authored Nov 8, 2023
1 parent 4792173 commit 1e0117c
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 39 deletions.
4 changes: 4 additions & 0 deletions Changes.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
unreleased
Batch example: Guess if input is coordinate pair, if so then do reverse geocoding
Batch example: Give example of input file format

v2.3.0 Tue 04 Jul 2023
Batch example: Raise exception when API key fails (quota, missing API key)
Batch example: Raise exception when input file contains an empty line. Better
Expand Down
143 changes: 104 additions & 39 deletions examples/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,54 +3,105 @@
# Background tutorial on async programming with Python
# https://realpython.com/async-io-python/

# Requires Python 3.7 or newer. Tested with 3.8 and 3.9.
# Requires Python 3.7 or newer. Tested with 3.8/3.9/3.10/3.11.

# Installation:
# pip3 install --upgrade opencage asyncio aiohttp backoff tqdm

# Example (forward) input file:
# 1,"Via Allende 8, Cascina, Toscana, Italia"
# 2,"Via Coppi, 17, Formigine, Emilia-Romagna, Italia"
# 3,"Via Dei Salici 20, Gallarate, Lombardia, Italia"
# 4,"Via Vittorio Veneto N7, San Giuliano Terme, Toscana, Italia"
# 5,"Via Tiro A Segno 8, Gallarate, Lombardia, Italia"

# Example (reverse) input file:
# 1,"43.6783472,10.5533173"
# 2,"44.5655041,10.8412106"
# 3,"45.6823942,8.7919808"
# 4,"43.7804922,10.402925"
# 5,"45.6506236,8.8037173"
# or
# 1,43.6783472,10.5533173
# 2,44.5655041,10.8412106
# 3,45.6823942,8.7919808
# 4,43.7804922,10.402925
# 5,45.6506236,8.8037173

import os
import sys
import csv
import re
import asyncio
import traceback
import backoff
from tqdm import tqdm
from opencage.geocoder import OpenCageGeocode, AioHttpError






API_KEY = ''
FILENAME_INPUT_CSV = 'file_to_geocode.csv'
FILENAME_OUTPUT_CSV = 'file_geocoded.csv'
FORWARD_OR_REVERSE = 'guess' # 'forward' (address -> coordinates) or 'reverse' (coordinates -> address)
# With 'guess' the script checks if the address is two numbers and then
# assumes reverse

MAX_ITEMS = 100 # Howy man lines to read from the input file. Set to 0 for unlimited
MAX_ITEMS = 100 # How many lines to read from the input file. Set to 0 for unlimited
NUM_WORKERS = 3 # For 10 requests per second try 2-5
REQUEST_TIMEOUT_SECONDS = 5 # For individual HTTP requests. Default is 1
RETRY_MAX_TRIES = 10 # How often to retry if a HTTP request times out
RETRY_MAX_TIME = 60 # Limit in seconds for retries
SHOW_PROGRESS = True # Show progress bar









if os.path.exists(FILENAME_OUTPUT_CSV):
sys.stderr.write(f"The output file '{FILENAME_OUTPUT_CSV}' already exists.\n")
sys.exit(1)

csv_writer = csv.writer(open(FILENAME_OUTPUT_CSV, 'w', encoding='utf8', newline=''))

PROGRESS_BAR = SHOW_PROGRESS and tqdm(total=0, position=0, desc="Addresses geocoded", dynamic_ncols=True)

async def write_one_geocoding_result(geocoding_results, address, address_id):
if geocoding_results is not None and len(geocoding_results):
first_result = geocoding_results[0]
# '40.78,-73.97' => true
# '3rd Ave, New York' => false
def guess_text_is_coordinate_pair(text):
coordinate_pattern = r'^(-?\d+(\.\d+)?),(-?\d+(\.\d+)?)$'
# x = 'yes' if bool(re.search(coordinate_pattern, text)) else 'no'
# sys.stderr.write(f"{text} is coordinate_pair: {x}\n")
return bool(re.search(coordinate_pattern, text))

async def write_one_geocoding_result(geocoding_result, address, address_id):
# print(geocoding_result, file=sys.stderr)
if geocoding_result is not None:
row = [
address_id,
first_result['geometry']['lat'],
first_result['geometry']['lng'],
geocoding_result['geometry']['lat'],
geocoding_result['geometry']['lng'],
# Any of the components might be empty:
first_result['components'].get('_type', ''),
first_result['components'].get('country', ''),
first_result['components'].get('county', ''),
first_result['components'].get('city', ''),
first_result['components'].get('postcode', ''),
first_result['components'].get('road', ''),
first_result['components'].get('house_number', ''),
first_result['confidence'],
first_result['formatted']
geocoding_result['components'].get('_type', ''),
geocoding_result['components'].get('country', ''),
geocoding_result['components'].get('county', ''),
geocoding_result['components'].get('city', ''),
geocoding_result['components'].get('postcode', ''),
geocoding_result['components'].get('road', ''),
geocoding_result['components'].get('house_number', ''),
geocoding_result['confidence'],
geocoding_result['formatted']
]

else:
sys.stderr.write(f"not found, writing empty result: {address}\n")
row = [
address_id,
0, # not to be confused with https://en.wikipedia.org/wiki/Null_Island
Expand All @@ -65,7 +116,6 @@ async def write_one_geocoding_result(geocoding_results, address, address_id):
-1, # confidence values are 1-10 (lowest to highest), use -1 for unknown
''
]
sys.stderr.write(f"not found, writing empty result: {address}\n")
csv_writer.writerow(row)


Expand All @@ -84,24 +134,32 @@ def backoff_hdlr(details):
on_backoff=backoff_hdlr)
async def geocode_one_address(address, address_id):
async with OpenCageGeocode(API_KEY) as geocoder:
# address -> coordinates
# note: you may also want to set other optional parameters like
# countrycode, language, etc
# see the full list: https://opencagedata.com/api#forward-opt
global FORWARD_OR_REVERSE
try:
geocoding_results = await geocoder.geocode_async(address, no_annotations=1)
if FORWARD_OR_REVERSE == 'reverse' or \
(FORWARD_OR_REVERSE == 'guess' and guess_text_is_coordinate_pair(address)):
# Reverse:
# coordinates -> address, e.g. '40.78,-73.97' => '101, West 91st Street, New York'
lon_lat = address.split(',')
geocoding_results = await geocoder.reverse_geocode_async(
lon_lat[0], lon_lat[1], no_annotations=1)
else:
# Forward:
# address -> coordinates
# note: you may also want to set other optional parameters like
# countrycode, language, etc
# see the full list: https://opencagedata.com/api#forward-opt
geocoding_results = await geocoder.geocode_async(address, no_annotations=1)
except Exception as exc:
geocoding_results = None
traceback.print_exception(exc, file=sys.stderr)

# coordinates -> address, e.g. '40.78,-73.97' => 101, West 91st Street, New York
# lon_lat = address.split(',')
# geocoding_result = await geocoder.reverse_geocode_async(lon_lat[0], lon_lat[1], no_annotations=1)
# returns a single result so we convert it to a list
# geocoding_results = [geocoding_result]

try:
await write_one_geocoding_result(geocoding_results, address, address_id)
if geocoding_results is not None and len(geocoding_results):
geocoding_result = geocoding_results[0]
else:
geocoding_result = None

await write_one_geocoding_result(geocoding_result, address, address_id)
except Exception as exc:
traceback.print_exception(exc, file=sys.stderr)

Expand All @@ -127,7 +185,7 @@ async def run_worker(worker_name, queue):

async def main():
global PROGRESS_BAR
assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
assert sys.version_info >= (3, 7), "Script requires Python 3.7 or newer"

## 1. Read CSV into a Queue
## Each work_item is an address and id. The id will be part of the output,
Expand All @@ -138,16 +196,23 @@ async def main():
##
queue = asyncio.Queue(maxsize=MAX_ITEMS)

csv_reader = csv.reader(open(FILENAME_INPUT_CSV, 'r'), strict=True, skipinitialspace=True)
with open(FILENAME_INPUT_CSV, 'r', encoding='utf-8') as infile:
csv_reader = csv.reader(infile, strict=True, skipinitialspace=True)

for row in csv_reader:
if len(row) == 0:
raise Exception(f"Empty line in input file at line number {csv_reader.line_num}, aborting")

for row in csv_reader:
if len(row) == 0:
raise Exception(f"Empty line in input file at line number {csv_reader.line_num}, aborting")
if FORWARD_OR_REVERSE == 'reverse' or \
(FORWARD_OR_REVERSE == 'guess' and len(row) > 2 and \
guess_text_is_coordinate_pair(f"{row[1]},{row[2]}")):
work_item = {'id': row[0], 'address': f"{row[1]},{row[2]}"}
else:
work_item = {'id': row[0], 'address': row[1]}

work_item = {'id': row[0], 'address': row[1]}
await queue.put(work_item)
if queue.full():
break
await queue.put(work_item)
if queue.full():
break

sys.stderr.write(f"{queue.qsize()} work_items in queue\n")

Expand Down

0 comments on commit 1e0117c

Please sign in to comment.