-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Updated timeseries generation with GenTS #143
base: main
Are you sure you want to change the base?
Changes from all commits
4e7753d
d556170
3bcc2cd
8413466
ec17e2e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,8 +25,11 @@ | |
import os | ||
|
||
import click | ||
import gents | ||
import intake | ||
import ploomber | ||
from dask.distributed import Client | ||
from dask.distributed import LocalCluster | ||
|
||
import cupid.timeseries | ||
import cupid.util | ||
|
@@ -83,110 +86,75 @@ def run( | |
"glc": landice, | ||
} | ||
|
||
# Automatically run all if no components specified | ||
|
||
if True not in [atmosphere, ocean, land, seaice, landice]: | ||
all = True | ||
for key in component_options.keys(): | ||
component_options[key] = True | ||
|
||
##################################################################### | ||
# Managing global parameters | ||
|
||
global_params = dict() | ||
|
||
if "global_params" in control: | ||
global_params = control["global_params"] | ||
|
||
global_params["serial"] = serial | ||
|
||
#################################################################### | ||
|
||
if time_series: | ||
timeseries_params = control["timeseries"] | ||
client = None | ||
if not global_params["serial"]: | ||
cluster = LocalCluster( | ||
n_workers=1, | ||
processes=1, | ||
threads_per_worker=1, | ||
memory_limit="2GB", | ||
) | ||
|
||
# general timeseries arguments for all components | ||
num_procs = timeseries_params["num_procs"] | ||
|
||
for component, comp_bool in component_options.items(): | ||
if comp_bool: | ||
|
||
# set time series input and output directory: | ||
# ----- | ||
if isinstance(timeseries_params["case_name"], list): | ||
ts_input_dirs = [] | ||
for cname in timeseries_params["case_name"]: | ||
ts_input_dirs.append(global_params["CESM_output_dir"]+"/"+cname+f"/{component}/hist/") | ||
cluster.scale(timeseries_params["num_procs"]) | ||
client = Client(cluster) | ||
|
||
if "ts_output_dir" not in timeseries_params: | ||
timeseries_params["ts_output_dir"] = "./" | ||
|
||
for component in component_options: | ||
if component not in timeseries_params: | ||
continue | ||
include_vars = None | ||
if "vars" in timeseries_params[component]: | ||
include_vars = timeseries_params[component]["vars"] | ||
if include_vars == [] or include_vars == ["process_all"]: | ||
include_vars = None | ||
|
||
year_start = None | ||
if "start_years" in timeseries_params[component]: | ||
year_start = timeseries_params[component]["start_years"] | ||
if year_start == []: | ||
year_start = None | ||
else: | ||
ts_input_dirs = [ | ||
global_params["CESM_output_dir"] + "/" + | ||
timeseries_params["case_name"] + f"/{component}/hist/", | ||
] | ||
|
||
if "ts_output_dir" in timeseries_params: | ||
if isinstance(timeseries_params["ts_output_dir"], list): | ||
ts_output_dirs = [] | ||
for ts_outdir in timeseries_params["ts_output_dir"]: | ||
ts_output_dirs.append([ | ||
os.path.join( | ||
ts_outdir, | ||
f"{component}", "proc", "tseries", | ||
), | ||
]) | ||
else: | ||
ts_output_dirs = [ | ||
os.path.join( | ||
timeseries_params["ts_output_dir"], | ||
f"{component}", "proc", "tseries", | ||
), | ||
] | ||
year_start = int(year_start[0]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current implementation is that |
||
|
||
year_end = None | ||
if "end_years" in timeseries_params[component]: | ||
year_end = timeseries_params[component]["end_years"] | ||
if year_end == []: | ||
year_end = None | ||
else: | ||
if isinstance(timeseries_params["case_name"], list): | ||
ts_output_dirs = [] | ||
for cname in timeseries_params["case_name"]: | ||
ts_output_dirs.append( | ||
os.path.join( | ||
global_params["CESM_output_dir"], | ||
cname, | ||
f"{component}", "proc", "tseries", | ||
), | ||
) | ||
else: | ||
ts_output_dirs = [ | ||
os.path.join( | ||
global_params["CESM_output_dir"], | ||
timeseries_params["case_name"], | ||
f"{component}", "proc", "tseries", | ||
), | ||
] | ||
# ----- | ||
|
||
# fmt: off | ||
# pylint: disable=line-too-long | ||
cupid.timeseries.create_time_series( | ||
component, | ||
timeseries_params[component]["vars"], | ||
timeseries_params[component]["derive_vars"], | ||
timeseries_params["case_name"], | ||
timeseries_params[component]["hist_str"], | ||
ts_input_dirs, | ||
ts_output_dirs, | ||
# Note that timeseries output will eventually go in | ||
# /glade/derecho/scratch/${USER}/archive/${CASE}/${component}/proc/tseries/ | ||
timeseries_params["ts_done"], | ||
timeseries_params["overwrite_ts"], | ||
timeseries_params[component]["start_years"], | ||
timeseries_params[component]["end_years"], | ||
timeseries_params[component]["level"], | ||
num_procs, | ||
serial, | ||
logger, | ||
) | ||
# fmt: on | ||
# pylint: enable=line-too-long | ||
year_end = int(year_end[0]) | ||
|
||
modb = gents.ModelOutputDatabase( | ||
hf_head_dir=global_params["CESM_output_dir"] + "/" + timeseries_params["case_name"], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens if |
||
ts_head_dir=timeseries_params["ts_output_dir"], | ||
dir_name_swaps={"hist": "proc/tseries"}, | ||
dir_exclusions=[comp for comp in component_options if comp != component], | ||
timeseries_year_length=10, | ||
overwrite=timeseries_params["overwrite_ts"][0], | ||
include_variables=include_vars, | ||
compression_level=0, | ||
year_start=year_start, | ||
year_end=year_end, | ||
verbosity_level=1, | ||
) | ||
modb.run(client=client, serial=global_params["serial"]) | ||
client.shutdown() | ||
|
||
# Grab paths | ||
|
||
run_dir = os.path.realpath(os.path.expanduser(control["data_sources"]["run_dir"])) | ||
output_dir = run_dir + "/computed_notebooks/" + control["data_sources"]["sname"] | ||
temp_data_path = run_dir + "/temp_data" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,4 +18,5 @@ dependencies: | |
- pyyaml | ||
- xarray | ||
- pip: | ||
- gents | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we want to install a static version of |
||
- -e ../ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason to specify
memory_limit
rather than letting theLocalCluster
object figure it out based on available resources? It seems like it could be problematic (if less than 2 GB / core is available) or unnecessarily restrictive (if more than 2 GB / core is available)