This repository has been archived by the owner on Jun 2, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy path1_fetch.R
330 lines (298 loc) · 13.5 KB
/
1_fetch.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
source("1_fetch/src/get_nwis_data.R")
source("1_fetch/src/get_feature_vars.R")
source("1_fetch/src/moving_window_functions.R")
##p1- only parameters
gagesii_path <- "Gages2.1_RefSiteList.xlsx"
NWIS_parameter <- '00060'
startDate <- as.Date("1900-10-01")
endDate <- as.Date("2022-09-30")
##number of complete years we require for a site to be used
complete_years <- 20
##percentile for flood threshold in EflowStats. 0.6 is the default
perc <- 0.6
#Drop the following gages from the dataset because they are not representative
#pipeline, ditch, duplicate comids in gages2.1, spring, etc.
drop_gages <- c('02084557', '09406300', '09512200', '10143500', '10172200',
'01349711', '01362198', '01362380', '02322698', '04127918',
'10336674', '10336675', '06190540')
#Combine the following gages from the dataset because they are located on same
#comid with unique periods or record
combine_gages <- list(c('03584000', '06037000', '12209500'),
c('03584020', '06037100', '12209490'))
names(combine_gages) <- c("to_be_combined", "assigned_rep")
# list of science base identifiers containing feature variables of interest
sb_var_ids_path <- "1_fetch/in/sb_var_ids.csv"
p1_targets_list <- list(
#file target for gagesii (g2) sites
tar_target(p1_sites_g2_xlsx,
gagesii_path,
deployment = 'main',
format = "file"
),
#all gagesii (g2) sites
tar_target(p1_sites_g2,
read_xlsx(p1_sites_g2_xlsx) %>%
mutate(ID = substr(ID, start=2, stop=nchar(ID))) %>%
#drop 5 sites that are not representative (ditch, pipeline)
#and 7 sites that are duplicates on same comid
filter(!(ID %in% drop_gages)),
deployment = 'main'
),
#create a spatial object
tar_target(p1_sites_g2_sf,
st_as_sf(x = p1_sites_g2, coords = c('LON', 'LAT'),
crs = st_crs(4326),remove = FALSE, dim = 'XY', na.fail = TRUE),
deployment = 'main'
),
#ID numbers for sites to use
tar_target(
p1_sites_list,
p1_sites_g2$ID,
deployment = 'main'
),
##check to make sure peak and daily flow are actually available for all sites
tar_target(p1_has_data,
has_data_check(p1_sites_list, NWIS_parameter, endDate),
deployment = 'main'
),
##fetch daily streamflow
#this is deployed on main to avoid overloading the NWIS server with download requests
#note that the download occasionally randomly hangs even with the timeout.
#you can stop and restart the pipeline when this happens.
tar_target(p1_daily_flow_csv,
get_nwis_daily_data(p1_has_data, outdir="./1_fetch/out",
NWIS_parameter, startDate, endDate,
timeout = 60),
map(p1_has_data),
deployment = 'main',
format = "file"
),
##generate log file to track changes to dataRetrieval daily flow request
tar_target(p1_daily_flow_log,
get_daily_flow_log(files_in = p1_daily_flow_csv,
file_out = "./1_fetch/out/logs/daily_flow_log.csv"),
deployment = 'main',
format = "file"
),
##prescreen data to remove provisional data and handle odd column names, and
##combine records from gages with unique periods of record on same comid
tar_target(p1_prescreen_daily_data,
prescreen_daily_data(p1_daily_flow_csv, prov_rm = TRUE),
map(p1_daily_flow_csv),
deployment = 'worker'
),
##compute the number of complete years based on when the year starts
#These are being run on main because parallel processing is taking too long.
#Likely because the mapped branches build quickly and the prescreen data are large
tar_target(p1_screen_daily_flow,
screen_daily_data(p1_has_data, p1_prescreen_daily_data, year_start),
map(p1_has_data),
deployment = 'main'
),
##For seasonal analysis
tar_target(p1_screen_daily_flow_season,
screen_daily_data(p1_has_data, p1_prescreen_daily_data, season_year_start),
map(p1_has_data),
deployment = 'main'
),
# tar_target(p1_screen_daily_flow_season_high,
# screen_daily_data(p1_has_data, p1_prescreen_daily_data, season_year_start_high),
# map(p1_has_data),
# deployment = 'main'
# ),
##select sites with enough complete years
tar_target(p1_screened_site_list,
filter_complete_years(p1_screen_daily_flow, combine_gages, complete_years),
deployment = 'main'
),
##seasonal
tar_target(p1_screened_site_list_season,
filter_complete_years(p1_screen_daily_flow_season, combine_gages, complete_years),
deployment = 'main'
),
# tar_target(p1_screened_site_list_season_high,
# filter_complete_years(p1_screen_daily_flow_season_high, combine_gages, complete_years),
# deployment = 'main'
# ),
##clean and format daily data so it can be used in EflowStats
tar_target(p1_clean_daily_flow,
clean_daily_data(p1_screened_site_list, p1_prescreen_daily_data,
p1_screen_daily_flow, yearType, year_start),
map(p1_screened_site_list),
deployment = 'main'
),
##seasonal
tar_target(p1_clean_daily_flow_season,
clean_daily_data(p1_screened_site_list_season, p1_prescreen_daily_data,
p1_screen_daily_flow_season, yearType, season_year_start),
map(p1_screened_site_list_season),
deployment = 'main'
),
# tar_target(p1_clean_daily_flow_season_high,
# clean_daily_data(p1_screened_site_list_season_high, p1_prescreen_daily_data,
# p1_screen_daily_flow_season_high, yearType,
# season_year_start_high),
# map(p1_screened_site_list_season_high),
# deployment = 'main'
# ),
#get drainage area from NWIS
#this is deployed on main to avoid overloading the NWIS server with download requests
tar_target(p1_drainage_area,
get_NWIS_drainArea(p1_screened_site_list),
map(p1_screened_site_list),
deployment = 'main'
),
##generate log file to track changes to dataRetrieval drainage area request
tar_target(p1_drainage_area_log,
get_drainage_area_log(file_in = p1_drainage_area,
file_out = "./1_fetch/out/logs/drainage_area_log.csv"),
deployment = 'main',
format = "file"
),
##get and save as file peak flow from NWIS for eflowstats
#this is deployed on main to avoid overloading the NWIS server with download requests
tar_target(p1_peak_flow_csv,
get_nwis_peak_data(p1_screened_site_list, outdir="./1_fetch/out",
startDate, endDate, timeout = 60),
map(p1_screened_site_list),
deployment = 'main',
format="file"
),
##generate log file to track changes to dataRetrieval peak flow request
tar_target(p1_peak_flow_log,
get_peak_flow_log(files_in = p1_peak_flow_csv,
file_out = "./1_fetch/out/logs/peak_flow_log.csv"),
deployment = 'main',
format = "file"
),
##get flood threshold from NWIS for eflowstats
#this is deployed on main to avoid overloading the NWIS server with download requests
tar_target(p1_flood_threshold,
get_floodThreshold(p1_screened_site_list, p1_clean_daily_flow,
p1_peak_flow_csv, perc, yearType),
map(p1_screened_site_list),
deployment = 'main'
),
#file target for sciencebase variable list csv
tar_target(p1_sb_var_ids_csv,
sb_var_ids_path,
deployment = 'main',
format = "file"
),
#read in sciencebase variable list csv
tar_target(p1_sb_var_ids,
read_csv(file = p1_sb_var_ids_csv, show_col_types = FALSE),
deployment = 'main'
),
##generate tables of feature variables from gagesii list
tar_target(p1_sb_data_g2_csv,
get_sb_data(sites = p1_sites_g2,
sb_var_ids = p1_sb_var_ids,
dldir = "./1_fetch/out/sb/dldir",
workdir = "./1_fetch/out/sb/workdir",
outdir = "./1_fetch/out/sb/data",
out_file_label = "raw_g2"),
map(p1_sb_var_ids),
iteration = "list",
deployment = 'main',
format = "file"
),
##generate log file to track updates to sb variables
tar_target(p1_sb_data_g2_log,
get_sb_data_log(sb_var_ids = p1_sb_var_ids,
file_out = "1_fetch/out/logs/sb_update_log.csv"),
deployment = "main",
format = "file"
),
##download and unzip nhd geodatabase
tar_target(p1_nhd_conus_gdb,
get_nhd_conus_gdb(outdir = "./1_fetch/out/nhd_plus/",
seven_zip = "/caldera/projects/usgs/water/impd/fhwa/seven_zip/7zz"),
deployment = 'main',
format = "file"
),
##filter nhd conus comids by flowline type and non-tidal, retain attributes of interest
tar_target(p1_sites_conus_sf,
prep_comid_conus(nhd_conus_gdb = p1_nhd_conus_gdb,
attrib_to_keep =
c("COMID", "GNIS_NAME", "LENGTHKM", "FTYPE",
"StreamOrde", "Divergence", "StartFlag",
"TerminalFl", "AreaSqKM", "TotDASqKM", "DivDASqKM",
"Tidal", "SLOPE", "LakeFract", "SurfArea"),
outdir = "./1_fetch/out/nhd_plus/"),
deployment = 'main'
),
##list of COMIDs for CONUS-wide predictions
tar_target(p1_sites_conus,
st_drop_geometry(p1_sites_conus_sf),
deployment = 'main'
),
##merge and select feature variables from gagesii list
tar_target(p1_feature_vars_g2,
prep_feature_vars_g2(sb_var_data = p1_sb_data_g2_csv,
sites_all = p1_sites_g2,
sites_screened = p1_screened_site_list,
combine_gages = combine_gages,
years_by_site = p1_clean_daily_flow,
nhd_conus = p1_sites_conus,
retain_vars = c("ID", "LAT", "LON",
"npdes", "fwwd", "strg",
"devl", "cndp")),
deployment = "main"
),
#create a spatial object for remaining feature variables
tar_target(p1_feature_vars_g2_sf,
st_as_sf(x = p1_feature_vars_g2, coords = c('LON', 'LAT'),
remove = FALSE, dim = 'XY', na.fail = TRUE),
deployment = 'main'
),
##generate tables of feature variables from conus-wide comid list
tar_target(p1_sb_data_conus_csv,
get_sb_data(sites = p1_sites_conus,
sb_var_ids = p1_sb_var_ids,
dldir = "./1_fetch/out/sb/dldir",
workdir = "./1_fetch/out/sb/workdir",
outdir = "./1_fetch/out/sb/data",
out_file_label = "raw_conus"),
map(p1_sb_var_ids),
iteration = "list",
deployment = 'main',
format = "file"
),
##merge and select feature variables from conus-wide comid list
tar_target(p1_prep1_feature_vars_conus,
prep1_vars_conus(sb_var_data = p1_sb_data_conus_csv,
sites = p1_sites_conus,
retain_attr = c("StreamOrde", "Divergence",
"TotDASqKM", "DivDASqKM", "SLOPE",
"LakeFract", "SurfArea"),
outdir = "./1_fetch/out/sb/data",
out_file_label = "prep1_conus"),
map(p1_sb_data_conus_csv),
deployment = "main",
format = "file"
),
##finalize comid selection and and use variables for predictive models
tar_target(p1_prep2_feature_vars_conus,
prep2_vars_conus(sohl_early = p1_sb_data_conus_csv[35],
sohl_late = p1_sb_data_conus_csv[36],
outdir = "./1_fetch/out/sb/data",
out_file_label = "prep2_conus"),
deployment = "main",
format = "file"
),
##finalize selection and formatting of conus-wide feature variables
tar_target(p1_feature_vars_conus,
finalize_vars_conus(prep1_conus = p1_prep1_feature_vars_conus,
prep2_conus = p1_prep2_feature_vars_conus,
drop_attr = c("StreamOrde", "Divergence",
"TotDASqKM", "DivDASqKM", "SLOPE",
"LakeFract", "SurfArea")),
deployment = "main"
),
##screening out NA and -9998, -9999 values
tar_target(p1_feature_vars_conus_screen,
p1_feature_vars_conus %>%
drop_na() %>%
filter_if(is.numeric, all_vars(. > -990)))
)