-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathsocrata_upload.py
365 lines (299 loc) · 10.7 KB
/
socrata_upload.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
import contextlib
import io
import json
import logging
import os
import time
import pandas as pd
import requests
from dbt.cli.main import dbtRunner
from pyathena import connect
from pyathena.pandas.cursor import PandasCursor
logger = logging.getLogger(__name__)
# Allow python to print full length dataframes for logging
pd.set_option("display.max_rows", None)
# Create a session object so HTTP requests can be pooled
session = requests.Session()
session.auth = (
str(os.getenv("SOCRATA_USERNAME")),
str(os.getenv("SOCRATA_PASSWORD")),
)
# Connect to Athena
cursor = connect(
s3_staging_dir="s3://ccao-athena-results-us-east-1/",
region_name="us-east-1",
cursor_class=PandasCursor,
).cursor(unload=True)
def parse_years(years):
"""
Make sure the years environmental variable is formatted correctly.
"""
if years is not None:
years = str(years).replace(" ", "").split(",")
return years
def get_asset_info(socrata_asset):
"""
Simple helper function to retrieve asset-specific information from dbt.
"""
os.chdir("./dbt")
DBT = dbtRunner()
dbt_list_args = [
"--quiet",
"list",
"--resource-types",
"exposure",
"--output",
"json",
"--output-keys",
"label",
"meta",
"depends_on",
]
print(f"> dbt {' '.join(dbt_list_args)}")
dbt_output = io.StringIO()
with contextlib.redirect_stdout(dbt_output):
DBT.invoke(dbt_list_args)
model = [
json.loads(model_dict_str)
for model_dict_str in dbt_output.getvalue().split("\n")
# Filter out empty strings caused by trailing newlines
if model_dict_str
]
os.chdir("..")
model = pd.json_normalize(model)
model = model[model["label"] == socrata_asset]
athena_asset = model.iloc[0]["depends_on.nodes"][0].split(".", 2)[-1]
asset_id = model.iloc[0]["meta.asset_id"]
return athena_asset, asset_id
def build_query(athena_asset, asset_id, years=None, township=None):
"""
Build an Athena compatible SQL query. Function will append a year
conditional if `years` is non-empty. Many of the CCAO's open data assets are
too large to pass to Socrata without chunking.
"""
# Retrieve column names and types from Athena
columns = cursor.execute("show columns from " + athena_asset).as_pandas()
athena_columns = columns["column"].tolist()
athena_columns.sort()
# Retrieve column names from Socrata
asset_columns = (
session.get(
f"https://datacatalog.cookcountyil.gov/resource/{asset_id}"
)
.headers["X-SODA2-Fields"]
.replace('"', "")
.strip("[")
.strip("]")
.split(",")
)
# row id won't show up here since it's hidden on the open data portal assets
asset_columns += ["row_id"]
asset_columns.sort()
# If there are columns on Socrata that are not in Athena, abort upload and
# inform user of discrepancies. The script should not run at all in this
# circumstance since it will update some but not all columns in the open
# data asset.
# If there are columns in Athena but not on Socrata, it may be the case that
# they should be added, but there are also cases when not all columns for an
# Athena view that feeds an open data asset need to be part of that asset.
if athena_columns != asset_columns:
columns_not_on_socrata = set(athena_columns) - set(asset_columns)
columns_not_in_athena = set(asset_columns) - set(athena_columns)
exception_message = (
f"Columns on Socrata and in Athena do not match for {athena_asset}"
)
if len(columns_not_on_socrata) > 0:
exception_message += f"\nColumns in Athena but not on Socrata: {columns_not_on_socrata}"
logger.warning(exception_message)
if len(columns_not_in_athena) > 0:
exception_message += f"\nColumns on Socrata but not in Athena: {columns_not_in_athena}"
raise Exception(exception_message)
# Limit pull to columns present in open data asset
columns = columns[columns["column"].isin(asset_columns)]
print("The following columns will be updated:")
print(columns)
query = f"SELECT {', '.join(columns['column'])} FROM {athena_asset}"
if not years:
query = query
elif years is not None and not township:
query += " WHERE year = %(year)s"
elif years is not None and township is not None:
query += " WHERE year = %(year)s" + " AND township_code = %(township)s"
return query
def upload(
method, asset_id, sql_query, overwrite, count, year=None, township=None
):
"""
Function to perform the upload to Socrata. `puts` or `posts` depending on
user's choice to overwrite existing data.
"""
# Load environmental variables
app_token = os.getenv("SOCRATA_APP_TOKEN")
url = "https://datacatalog.cookcountyil.gov/resource/" + asset_id + ".json"
print_message = "Overwriting" if overwrite else "Updating"
if not year:
query_conditionals = {}
print_message = print_message + " all years for asset " + asset_id
if year is not None and not township:
query_conditionals = {"year": year}
print_message = (
print_message + " year: " + year + " for asset " + asset_id
)
if year is not None and township is not None:
query_conditionals = {"year": year, "township": township}
print_message = (
print_message
+ " township: "
+ township
+ ", year: "
+ year
+ " for asset "
+ asset_id
)
# We grab the data before uploading it so we can make sure timestamps are
# properly formatted
input_data = cursor.execute(sql_query, query_conditionals).as_pandas()
date_columns = input_data.select_dtypes(include="datetime").columns
for i in date_columns:
input_data[i] = input_data[i].fillna("").dt.strftime("%Y-%m-%dT%X")
# Raise URL status if it's bad
session.get(
(
"https://datacatalog.cookcountyil.gov/resource/"
+ asset_id
+ ".json?$limit=1"
),
headers={"X-App-Token": app_token},
).raise_for_status()
session.get(url=url, headers={"X-App-Token": app_token}).raise_for_status()
for i in range(0, input_data.shape[0], 10000):
print(print_message)
print(f"Rows {i + 1}-{i + 10000}")
if count > 0:
method = "post"
response = getattr(session, method)(
url=url,
data=input_data.iloc[i : i + 10000].to_json(orient="records"),
headers={"X-App-Token": app_token},
)
count += 1
print(response.content)
def generate_groups(athena_asset, years=None, by_township=False):
"""
Helper function to determine what groups need to be iterated over for
upload.
"""
if not years and by_township:
raise ValueError("Cannot set 'by_township' when 'years' is None")
if years == ["all"]:
years = (
cursor.execute(
"SELECT DISTINCT year FROM " + athena_asset + " ORDER BY year"
)
.as_pandas()["year"]
.to_list()
)
# Ensure township codes aren't available if they shouldn't be
township_codes = []
if by_township:
township_codes = (
cursor.execute(
"SELECT DISTINCT township_code FROM spatial.township"
)
.as_pandas()["township_code"]
.to_list()
)
groups = []
for i in range(len(years)):
for j in range(len(township_codes)):
groups.append((years[i], township_codes[j]))
flag = "both"
else:
if not years:
groups = None
flag = None
else:
groups = years
flag = "years"
return flag, groups
def socrata_upload(
socrata_asset, overwrite=False, years=None, by_township=False
):
"""
Wrapper function for building SQL query, retrieving data from Athena, and
uploading it to Socrata. Allows users to specify target Athena and Socrata
assets, whether the data on Socrata should be overwritten or updated, and
whether or not to chunk the upload by year. By default the function will
query a given Athena asset by year for all years and upload via `post`
(update rather than overwrite).
"""
# Github inputs are passed as strings rather than booleans
if isinstance(overwrite, str):
overwrite = overwrite == "true"
if isinstance(by_township, str):
by_township = by_township == "true"
athena_asset, asset_id = get_asset_info(socrata_asset)
flag, groups = generate_groups(
years=years, by_township=by_township, athena_asset=athena_asset
)
tic = time.perf_counter()
count = 0
if not flag:
sql_query = build_query(
athena_asset=athena_asset,
asset_id=asset_id,
)
upload_args = {
"asset_id": asset_id,
"sql_query": sql_query,
"overwrite": overwrite,
"count": count,
}
if overwrite:
upload("put", **upload_args)
else:
upload("post", **upload_args)
else:
if flag == "years":
sql_query = build_query(
athena_asset=athena_asset,
asset_id=asset_id,
years=years,
)
if flag == "both":
sql_query = build_query(
athena_asset=athena_asset,
asset_id=asset_id,
years=years,
township=by_township,
)
for item in groups:
if flag == "both":
upload_args = {
"asset_id": asset_id,
"sql_query": sql_query,
"overwrite": overwrite,
"count": count,
"year": item[0],
"township": item[1],
}
elif flag == "years":
upload_args = {
"asset_id": asset_id,
"sql_query": sql_query,
"overwrite": overwrite,
"count": count,
"year": item,
}
if count == 0 and overwrite:
upload("put", **upload_args)
else:
upload("post", **upload_args)
toc = time.perf_counter()
print(f"Total upload in {toc - tic:0.4f} seconds")
socrata_upload(
socrata_asset=os.getenv("SOCRATA_ASSET"),
overwrite=os.getenv("OVERWRITE"),
years=parse_years(os.getenv("YEARS")),
by_township=os.getenv("BY_TOWNSHIP"),
)