-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgeocoder.py
127 lines (118 loc) · 5.4 KB
/
geocoder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import concurrent
import datetime
import time
import traceback
import sys
import warnings
from concurrent.futures import ThreadPoolExecutor
import censusbatchgeocoder
import pandas as pd
import psycopg2
import config
table = input("Enter table name: ")
thereExistRowToGeocode = True
n_coded = 0
db_chunk_idx = 0
batch_size = 25
db_chunk_size = 10000
def gen_update_q(d):
if d['is_match'] == 'Match':
return "UPDATE {} SET is_geocoded = {}, geocoded_address = '{}', is_match = '{}', is_exact = '{}', returned_address = '{}', coordinates = '{}', tiger_line = {}, side= '{}', state_fips= {}, county_fips= {}, tract= '{}', block= '{}', longitude= '{}', latitude= '{}' WHERE id = {}".format(
table,
True,
d.get('geocoded_address',''),
d.get('is_match',''),
d.get('is_exact',''),
(d.get('returned_address','')).replace("'",r"''"),
d.get('coordinates',''),
d.get('tiger_line','NULL') if len(d['tiger_line']) > 0 else 'NULL',
d.get('side',''),
d.get('state_fips','') if len(d['state_fips']) > 0 else 'NULL',
d.get('county_fips','') if len(d['county_fips']) > 0 else 'NULL',
d.get('tract',''),
d.get('block',''),
d.get('longitude',''),
d.get('latitude',''),
d.get('id',''))
else:
return "UPDATE {} SET is_geocoded = {}, geocoded_address = '{}', is_match = '{}' WHERE id = {}".format(
table,
True,
d.get('geocoded_address',''),
d.get('is_match',''),
d.get('id',''))
def geocode_batch(start_idx, batch_size=batch_size):
try:
start_time = time.time()
end_idx = start_idx + batch_size
batch_df = df_raw.iloc[start_idx:end_idx][:]
dict_lst = batch_df.to_dict('records')
result_dicts = censusbatchgeocoder.geocode(dict_lst, pooling=False)
update_query = ';'.join([gen_update_q(d) for d in result_dicts])
curr.execute(update_query)
print('thread finished for batch {} size: {} in {} seconds'.format((start_idx,end_idx), batch_size, time.time() - start_time))
return True
except:
traceback.print_exc(file=sys.stdout)
return False
warnings.filterwarnings("ignore")
print("initiating DB connection...")
myConnection = psycopg2.connect(host=config.db['hostname'], user=config.db['username'], password=config.db['password'], dbname=config.db['database'])
curr = myConnection.cursor()
print("connected... ")
print("ENTERING MAIN LOOP - db chunk size: {}".format(db_chunk_size))
try:
print("Starting the geocoding script - All states remaining")
start = time.time()
while thereExistRowToGeocode:
chunk_start = time.time()
query = "SELECT id, address, state, zipcode FROM {} WHERE NOT is_geocoded LIMIT {};".format(table, db_chunk_size)
print("Executing Query: ", query)
curr.execute(query)
df_raw = pd.DataFrame(curr.fetchall(), columns=['id', 'address', 'state', 'zipcode'])
n_entries_chunk,_ = df_raw.shape
if n_entries_chunk == 0:
thereExistRowToGeocode = False
break
df_raw['city'] = ''
indices = [idx for idx in range(0, n_entries_chunk, batch_size)]
retry_set = set()
print("starting threadpool for chunk[{}]".format(db_chunk_idx))
with ThreadPoolExecutor(max_workers=50) as executor:
future_to_idx = {executor.submit(geocode_batch, idx): idx for idx in indices}
for future in concurrent.futures.as_completed(future_to_idx):
idx = future_to_idx[future]
try:
success = future.result()
if not success:
retry_set.add(idx)
except Exception as exc:
print("error for idx: ", idx, " execption: ", exc)
retry_set.add(idx)
print("Before individual retries | need to retry for the following indices: ", retry_set)
retry_entries = set()
for idx in retry_set:
future_to_idx = {executor.submit(geocode_batch, i, batch_size=1): i for i in range(idx, idx+batch_size)}
for future in concurrent.futures.as_completed(future_to_idx):
i = future_to_idx[future]
try:
success = future.result()
if not success:
retry_entries.add(idx)
except Exception as exc:
print("error for idx: ", i, " execption: ", exc)
retry_entries.add(i)
print("After individual retries | was unable to geocode these entries: ", retry_entries)
n_coded += n_entries_chunk
print("Threads done for db chunk[{}], geocoded {} entries in {} seconds".format(db_chunk_idx, n_entries_chunk, time.time() - chunk_start))
print("Commiting Transaction")
myConnection.commit()
seconds = time.time() - start
print("Geocoded {} entries in {} seconds".format(n_coded, seconds))
db_chunk_idx += db_chunk_size
except Exception as err:
print("Exception while geocoding: {}".format(err))
traceback.print_exc(file='traceback_all_geo')
curr.close()
print("closing connection...")
myConnection.close()