Skip to content

Commit

Permalink
Opencage CLI adapted from batch.py example script (#54)
Browse files Browse the repository at this point in the history
* Initial release of 'opencage' CLI tool

* prepare version 3.0.0 release

---------

Co-authored-by: marc tobias <[email protected]>
  • Loading branch information
sbscully and mtmail authored Sep 4, 2024
1 parent db6829c commit 8060553
Show file tree
Hide file tree
Showing 21 changed files with 709 additions and 25 deletions.
1 change: 0 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ jobs:
- "3.10"
- "3.9"
- "3.8"
- "3.7"
os:
- ubuntu-latest
steps:
Expand Down
6 changes: 5 additions & 1 deletion Changes.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
unreleased
v3.0.0 Wed Sep 4 2024
Requires python 3.7 and asyncio package
Inititial release of the 'opencage' CLI tool
RateLimitExceededError no longer prints reset date
Batch example: warn if no API key present earlier
Batch example: some errors were not printed, e.g. invalid API key
Batch example: Check latest version of opencage package is used
Add python 3.12, no longer test against python 3.7

v2.3.1 Wed Nov 15 2023
New error 'SSLError' which is more explicit in case of SSL certificate chain issues
Expand Down
38 changes: 34 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ A Python module to access the [OpenCage Geocoding API](https://opencagedata.com/

You can find a [comprehensive tutorial for using this module on the OpenCage site](https://opencagedata.com/tutorials/geocode-in-python).

There are also two brief video tutorials on YouTube, one [covering forward geocoding](https://www.youtube.com/watch?v=9bXu8-LPr5c), one [covering reverse geocoding](https://www.youtube.com/watch?v=u-kkE4yA-z0).
There are two brief video tutorials on YouTube, one [covering forward geocoding](https://www.youtube.com/watch?v=9bXu8-LPr5c), one [covering reverse geocoding](https://www.youtube.com/watch?v=u-kkE4yA-z0).

The module installs an `opencage` CLI tool for geocoding files. Check `opencage --help` or the [CLI tutorial](https://opencagedata.com/tutorials/geocode-in-cli).


## Usage

Supports Python 3.7 or newer. Use the older opencage 1.x releases if you need Python 2.7 support.
Supports Python 3.8 or newer. Starting opencage version 3.0 depends on asyncio package.

Install the module:

Expand Down Expand Up @@ -87,7 +90,7 @@ with OpenCageGeocode(key) as geocoder:

You can run requests in parallel with the `geocode_async` and `reverse_geocode_async`
method which have the same parameters and response as their synronous counterparts.
You will need at least Python 3.7 and the `asyncio` and `aiohttp` packages installed.
You will need at least Python 3.8 and the `asyncio` and `aiohttp` packages installed.

```python
async with OpenCageGeocode(key) as geocoder:
Expand All @@ -109,7 +112,34 @@ geocoder = OpenCageGeocode('your-api-key', 'http')

### Command-line batch geocoding

See `examples/batch.py` for an example to geocode a CSV file.
Use `opencage forward` or `opencage reverse`

```
opencage forward --help
-h, --help show this help message and exit
--api-key API_KEY Your OpenCage API key
--input INPUT Input file name
--output OUTPUT Output file name
--headers If the first row should be treated as a header row
--input-columns INPUT_COLUMNS
Comma-separated list of integers (default '1')
--add-columns ADD_COLUMNS
Comma-separated list of output columns
--workers WORKERS Number of parallel geocoding requests (default 1)
--timeout TIMEOUT Timeout in seconds (default 10)
--retries RETRIES Number of retries (default 5)
--api-domain API_DOMAIN
API domain (default api.opencagedata.com)
--extra-params EXTRA_PARAMS
Extra parameters for each request (e.g. language=fr,no_dedupe=1)
--limit LIMIT Stop after this number of lines in the input
--dry-run Read the input file but no geocoding
--no-progress Display no progress bar
--quiet No progress bar and no messages
--overwrite Delete the output file first if it exists
--verbose Display debug information for each request
```

<img src="batch-progress.gif"/>

Expand Down
4 changes: 2 additions & 2 deletions examples/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Background tutorial on async programming with Python
# https://realpython.com/async-io-python/

# Requires Python 3.7 or newer. Tested with 3.8/3.9/3.10/3.11.
# Requires Python 3.8 or newer. Tested with 3.8/3.9/3.10/3.11.

# Installation:
# pip3 install --upgrade opencage asyncio aiohttp backoff tqdm
Expand Down Expand Up @@ -213,7 +213,7 @@ async def run_worker(worker_name, queue):

async def main():
global PROGRESS_BAR
assert sys.version_info >= (3, 7), "Script requires Python 3.7 or newer"
assert sys.version_info >= (3, 8), "Script requires Python 3.8 or newer"

## 1. Read CSV into a Queue
## Each work_item is an address and id. The id will be part of the output,
Expand Down
207 changes: 207 additions & 0 deletions opencage/batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
import sys
import ssl
import asyncio
import traceback
import threading
import backoff
import certifi
import random

from tqdm import tqdm
from urllib.parse import urlencode
from contextlib import suppress
from opencage.geocoder import OpenCageGeocode, OpenCageGeocodeError

class OpenCageBatchGeocoder():
def __init__(self, options):
self.options = options
self.sslcontext = ssl.create_default_context(cafile=certifi.where())
self.write_counter = 1

def __call__(self, *args, **kwargs):
asyncio.run(self.geocode(*args, **kwargs))

async def geocode(self, input, output):
if not self.options.dry_run:
test = await self.test_request()
if test['error']:
self.log(test['error'])
return

if self.options.headers:
header_columns = next(input, None)
if header_columns is None:
return

queue = asyncio.Queue(maxsize=self.options.limit)

await self.read_input(input, queue)

if self.options.dry_run:
return

if self.options.headers:
output.writerow(header_columns + self.options.add_columns)

progress_bar = not (self.options.no_progress or self.options.quiet) and \
tqdm(total=queue.qsize(), position=0, desc="Addresses geocoded", dynamic_ncols=True)

tasks = []
for _ in range(self.options.workers):
task = asyncio.create_task(self.worker(output, queue, progress_bar))
tasks.append(task)

# This starts the workers and waits until all are finished
await queue.join()

# All tasks done
for task in tasks:
task.cancel()

if progress_bar:
progress_bar.close()

async def test_request(self):
try:
async with OpenCageGeocode(self.options.api_key, domain=self.options.api_domain, sslcontext=self.sslcontext) as geocoder:
result = await geocoder.geocode_async('Kendall Sq, Cambridge, MA', raw_response=True)

free = False
with suppress(KeyError):
free = result['rate']['limit'] == 2500

return { 'error': None, 'free': free }
except Exception as exc:
return { 'error': exc }

async def read_input(self, input, queue):
for index, row in enumerate(input):
line_number = index + 1

if len(row) == 0:
raise Exception(f"Empty line in input file at line number {line_number}, aborting")

item = await self.read_one_line(row, line_number)
await queue.put(item)

if queue.full():
break

async def read_one_line(self, row, row_id):
if self.options.command == 'reverse':
input_columns = [1, 2]
elif self.options.input_columns:
input_columns = self.options.input_columns
else:
input_columns = None

if input_columns:
address = []
try:
for column in input_columns:
# input_columns option uses 1-based indexing
address.append(row[column - 1])
except IndexError:
self.log(f"Missing input column {column} in {row}")
else:
address = row

if self.options.command == 'reverse' and len(address) != 2:
self.log(f"Expected two comma-separated values for reverse geocoding, got {address}")

return { 'row_id': row_id, 'address': ','.join(address), 'original_columns': row }

async def worker(self, output, queue, progress):
while True:
item = await queue.get()

try:
await self.geocode_one_address(output, item['row_id'], item['address'], item['original_columns'])

if progress:
progress.update(1)
except Exception as exc:
traceback.print_exception(exc, file=sys.stderr)
finally:
queue.task_done()

async def geocode_one_address(self, output, row_id, address, original_columns):
def on_backoff(details):
if not self.options.quiet:
sys.stderr.write("Backing off {wait:0.1f} seconds afters {tries} tries "
"calling function {target} with args {args} and kwargs "
"{kwargs}\n".format(**details))

@backoff.on_exception(backoff.expo,
asyncio.TimeoutError,
max_time=self.options.timeout,
max_tries=self.options.retries,
on_backoff=on_backoff)
async def _geocode_one_address():
async with OpenCageGeocode(self.options.api_key, domain=self.options.api_domain, sslcontext=self.sslcontext) as geocoder:
geocoding_results = None
params = { 'no_annotations': 1, **self.options.extra_params }

try:
if self.options.command == 'reverse':
lon, lat = address.split(',')
geocoding_results = await geocoder.reverse_geocode_async(lon, lat, **params)
else:
geocoding_results = await geocoder.geocode_async(address, **params)
except OpenCageGeocodeError as exc:
self.log(str(exc))
except Exception as exc:
traceback.print_exception(exc, file=sys.stderr)

try:
if geocoding_results is not None and len(geocoding_results):
geocoding_result = geocoding_results[0]
else:
geocoding_result = None

if self.options.verbose:
self.log({
'row_id': row_id,
'thread_id': threading.get_native_id(),
'request': geocoder.url + '?' + urlencode(geocoder._parse_request(address, params)),
'response': geocoding_result
})

await self.write_one_geocoding_result(output, row_id, address, geocoding_result, original_columns)
except Exception as exc:
traceback.print_exception(exc, file=sys.stderr)

await _geocode_one_address()

async def write_one_geocoding_result(self, output, row_id, address, geocoding_result, original_columns = []):
row = original_columns

for column in self.options.add_columns:
if geocoding_result is None:
row.append('')
elif column in geocoding_result:
row.append(geocoding_result[column])
elif column in geocoding_result['components']:
row.append(geocoding_result['components'][column])
elif column in geocoding_result['geometry']:
row.append(geocoding_result['geometry'][column])
else:
row.append('')

# Enforce that row are written ordered. That means we might wait for other threads
# to finish a task and make the overall process slower. Alternative would be to
# use a second queue, or keep some results in memory.
while row_id > self.write_counter:
if self.options.verbose:
self.log(f"Want to write row {row_id}, but write_counter is at {self.write_counter}")
await asyncio.sleep(random.uniform(0.01, 0.1))

if self.options.verbose:
self.log(f"Writing row {row_id}")
output.writerow(row)
self.write_counter = self.write_counter + 1

def log(self, message):
if not self.options.quiet:
sys.stderr.write(f"{message}\n")

Loading

0 comments on commit 8060553

Please sign in to comment.