Skip to content

Commit

Permalink
Compatibility update for the recent changes in distributed library
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexeyPechnikov committed Aug 8, 2024
1 parent f9c732c commit c3f3ede
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 24 deletions.
16 changes: 12 additions & 4 deletions pygmtsar/pygmtsar/IO.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,8 +451,12 @@ def save_cube(self, data, name=None, caption='Saving NetCDF 2D/3D Dataset'):
# prevent warnings "RuntimeWarning: All-NaN slice encountered"
logging.getLogger('distributed.nanny').setLevel(logging.ERROR)
# disable "distributed.utils_perf - WARNING - full garbage collections ..."
from dask.distributed import utils_perf
utils_perf.disable_gc_diagnosis()
try:
from dask.distributed import utils_perf
utils_perf.disable_gc_diagnosis()
except ImportError:
from distributed.gc import disable_gc_diagnosis
disable_gc_diagnosis()

if name is None and isinstance(data, xr.DataArray):
assert data.name is not None, 'Define data name or use "name" argument for the NetCDF filename'
Expand Down Expand Up @@ -705,8 +709,12 @@ def save_stack(self, data, name, caption='Saving 2D Stack', queue=None, timeout=
# Suppress Dask "Restarting worker" warnings
logging.getLogger('distributed.nanny').setLevel(logging.ERROR)
# disable "distributed.utils_perf - WARNING - full garbage collections ..."
from dask.distributed import utils_perf
utils_perf.disable_gc_diagnosis()
try:
from dask.distributed import utils_perf
utils_perf.disable_gc_diagnosis()
except ImportError:
from distributed.gc import disable_gc_diagnosis
disable_gc_diagnosis()

# Dask cluster client
client = get_client()
Expand Down
26 changes: 19 additions & 7 deletions pygmtsar/pygmtsar/Stack_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,12 @@ def export_geojson(self, data, name, caption='Exporting WGS84 GeoJSON', pivotal=
import numpy as np
from tqdm.auto import tqdm
# disable "distributed.utils_perf - WARNING - full garbage collections ..."
from dask.distributed import utils_perf
utils_perf.disable_gc_diagnosis()
try:
from dask.distributed import utils_perf
utils_perf.disable_gc_diagnosis()
except ImportError:
from distributed.gc import disable_gc_diagnosis
disable_gc_diagnosis()

assert isinstance(data, xr.DataArray), 'Argument data is not an xr.DataArray object'

Expand Down Expand Up @@ -291,9 +295,13 @@ def export_csv(self, data, name, caption='Exporting WGS84 CSV', delimiter=',', d
import numpy as np
from tqdm.auto import tqdm
# disable "distributed.utils_perf - WARNING - full garbage collections ..."
from dask.distributed import utils_perf
utils_perf.disable_gc_diagnosis()

try:
from dask.distributed import utils_perf
utils_perf.disable_gc_diagnosis()
except ImportError:
from distributed.gc import disable_gc_diagnosis
disable_gc_diagnosis()

assert isinstance(data, xr.DataArray), 'Argument data is not an xr.DataArray object'

# convert the data to geographic coordinates if necessary
Expand Down Expand Up @@ -381,8 +389,12 @@ def export_netcdf(self, data, name, caption='Exporting WGS84 NetCDF', engine='ne
import dask
import os
# disable "distributed.utils_perf - WARNING - full garbage collections ..."
from dask.distributed import utils_perf
utils_perf.disable_gc_diagnosis()
try:
from dask.distributed import utils_perf
utils_perf.disable_gc_diagnosis()
except ImportError:
from distributed.gc import disable_gc_diagnosis
disable_gc_diagnosis()

assert isinstance(data, xr.DataArray), 'Argument data is not an xr.DataArray object'

Expand Down
8 changes: 6 additions & 2 deletions pygmtsar/pygmtsar/Stack_phasediff.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,8 +492,12 @@ def calc_drho(rho, topo, earth_radius, height, b, alpha, Bx):
def block_phasediff(date1, date2, prm1, prm2, ylim, xlim):
# use outer variables date, stack_prm
# disable "distributed.utils_perf - WARNING - full garbage collections ..."
from dask.distributed import utils_perf
utils_perf.disable_gc_diagnosis()
try:
from dask.distributed import utils_perf
utils_perf.disable_gc_diagnosis()
except ImportError:
from distributed.gc import disable_gc_diagnosis
disable_gc_diagnosis()
import warnings
# suppress Dask warning "RuntimeWarning: invalid value encountered in divide"
warnings.filterwarnings('ignore')
Expand Down
8 changes: 6 additions & 2 deletions pygmtsar/pygmtsar/Stack_stl.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,12 @@ def stl(self, data, freq='W', periods=52, robust=False):
import pandas as pd
import dask
# disable "distributed.utils_perf - WARNING - full garbage collections ..."
from dask.distributed import utils_perf
utils_perf.disable_gc_diagnosis()
try:
from dask.distributed import utils_perf
utils_perf.disable_gc_diagnosis()
except ImportError:
from distributed.gc import disable_gc_diagnosis
disable_gc_diagnosis()

assert data.dims[0] == 'date', 'The first data dimension should be date'

Expand Down
8 changes: 6 additions & 2 deletions pygmtsar/pygmtsar/Stack_topo.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,12 @@ def calc_drho(rho, topo, earth_radius, height, b, alpha, Bx):
def block_phase(prm1, prm2, ylim, xlim):
# use outer variables date, stack_prm
# disable "distributed.utils_perf - WARNING - full garbage collections ..."
from dask.distributed import utils_perf
utils_perf.disable_gc_diagnosis()
try:
from dask.distributed import utils_perf
utils_perf.disable_gc_diagnosis()
except ImportError:
from distributed.gc import disable_gc_diagnosis
disable_gc_diagnosis()
import warnings
# suppress Dask warning "RuntimeWarning: invalid value encountered in divide"
warnings.filterwarnings('ignore')
Expand Down
8 changes: 6 additions & 2 deletions pygmtsar/pygmtsar/Stack_trans.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,12 @@ def SAT_llt2rat(lats, lons, zs):
# exclude latitude and longitude columns as redundant
def trans_block(lats, lons, amin=-np.inf, amax=np.inf, rmin=-np.inf, rmax=np.inf, filename=None):
# disable "distributed.utils_perf - WARNING - full garbage collections ..."
from dask.distributed import utils_perf
utils_perf.disable_gc_diagnosis()
try:
from dask.distributed import utils_perf
utils_perf.disable_gc_diagnosis()
except ImportError:
from distributed.gc import disable_gc_diagnosis
disable_gc_diagnosis()
import warnings
warnings.filterwarnings('ignore')

Expand Down
8 changes: 6 additions & 2 deletions pygmtsar/pygmtsar/Stack_trans_inv.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,12 @@ def compute_trans_inv(self, coarsen, trans='auto', interactive=False):
def trans_inv_block(azis, rngs, tolerance, chunksize):
from scipy.spatial import cKDTree
# disable "distributed.utils_perf - WARNING - full garbage collections ..."
from dask.distributed import utils_perf
utils_perf.disable_gc_diagnosis()
try:
from dask.distributed import utils_perf
utils_perf.disable_gc_diagnosis()
except ImportError:
from distributed.gc import disable_gc_diagnosis
disable_gc_diagnosis()
import warnings
warnings.filterwarnings('ignore')

Expand Down
8 changes: 6 additions & 2 deletions pygmtsar/pygmtsar/Stack_unwrap_snaphu.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,12 @@ def snaphu(self, phase, corr=None, conf=None, conncomp=False, debug=False):
from datetime import datetime
import uuid
# disable "distributed.utils_perf - WARNING - full garbage collections ..."
from dask.distributed import utils_perf
utils_perf.disable_gc_diagnosis()
try:
from dask.distributed import utils_perf
utils_perf.disable_gc_diagnosis()
except ImportError:
from distributed.gc import disable_gc_diagnosis
disable_gc_diagnosis()

if conf is None:
conf = self.snaphu_config()
Expand Down
2 changes: 1 addition & 1 deletion pygmtsar/pygmtsar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#
# Licensed under the BSD 3-Clause License (see LICENSE for details)
# ----------------------------------------------------------------------------
__version__ = '2024.8.2'
__version__ = '2024.8.2.post3'

# unified progress indicators
from .tqdm_joblib import tqdm_joblib
Expand Down

0 comments on commit c3f3ede

Please sign in to comment.