From c3f3ede019f8708c425ec7c3a1bcd96bdb360a86 Mon Sep 17 00:00:00 2001 From: Alexey Pechnikov Date: Thu, 8 Aug 2024 16:00:49 +0700 Subject: [PATCH] Compatibility update for the recent changes in distributed library --- pygmtsar/pygmtsar/IO.py | 16 +++++++++++---- pygmtsar/pygmtsar/Stack_export.py | 26 +++++++++++++++++------- pygmtsar/pygmtsar/Stack_phasediff.py | 8 ++++++-- pygmtsar/pygmtsar/Stack_stl.py | 8 ++++++-- pygmtsar/pygmtsar/Stack_topo.py | 8 ++++++-- pygmtsar/pygmtsar/Stack_trans.py | 8 ++++++-- pygmtsar/pygmtsar/Stack_trans_inv.py | 8 ++++++-- pygmtsar/pygmtsar/Stack_unwrap_snaphu.py | 8 ++++++-- pygmtsar/pygmtsar/__init__.py | 2 +- 9 files changed, 68 insertions(+), 24 deletions(-) diff --git a/pygmtsar/pygmtsar/IO.py b/pygmtsar/pygmtsar/IO.py index ca8f2e06..b076596c 100644 --- a/pygmtsar/pygmtsar/IO.py +++ b/pygmtsar/pygmtsar/IO.py @@ -451,8 +451,12 @@ def save_cube(self, data, name=None, caption='Saving NetCDF 2D/3D Dataset'): # prevent warnings "RuntimeWarning: All-NaN slice encountered" logging.getLogger('distributed.nanny').setLevel(logging.ERROR) # disable "distributed.utils_perf - WARNING - full garbage collections ..." - from dask.distributed import utils_perf - utils_perf.disable_gc_diagnosis() + try: + from dask.distributed import utils_perf + utils_perf.disable_gc_diagnosis() + except ImportError: + from distributed.gc import disable_gc_diagnosis + disable_gc_diagnosis() if name is None and isinstance(data, xr.DataArray): assert data.name is not None, 'Define data name or use "name" argument for the NetCDF filename' @@ -705,8 +709,12 @@ def save_stack(self, data, name, caption='Saving 2D Stack', queue=None, timeout= # Suppress Dask "Restarting worker" warnings logging.getLogger('distributed.nanny').setLevel(logging.ERROR) # disable "distributed.utils_perf - WARNING - full garbage collections ..." - from dask.distributed import utils_perf - utils_perf.disable_gc_diagnosis() + try: + from dask.distributed import utils_perf + utils_perf.disable_gc_diagnosis() + except ImportError: + from distributed.gc import disable_gc_diagnosis + disable_gc_diagnosis() # Dask cluster client client = get_client() diff --git a/pygmtsar/pygmtsar/Stack_export.py b/pygmtsar/pygmtsar/Stack_export.py index c7d12d3d..08572a9f 100644 --- a/pygmtsar/pygmtsar/Stack_export.py +++ b/pygmtsar/pygmtsar/Stack_export.py @@ -207,8 +207,12 @@ def export_geojson(self, data, name, caption='Exporting WGS84 GeoJSON', pivotal= import numpy as np from tqdm.auto import tqdm # disable "distributed.utils_perf - WARNING - full garbage collections ..." - from dask.distributed import utils_perf - utils_perf.disable_gc_diagnosis() + try: + from dask.distributed import utils_perf + utils_perf.disable_gc_diagnosis() + except ImportError: + from distributed.gc import disable_gc_diagnosis + disable_gc_diagnosis() assert isinstance(data, xr.DataArray), 'Argument data is not an xr.DataArray object' @@ -291,9 +295,13 @@ def export_csv(self, data, name, caption='Exporting WGS84 CSV', delimiter=',', d import numpy as np from tqdm.auto import tqdm # disable "distributed.utils_perf - WARNING - full garbage collections ..." - from dask.distributed import utils_perf - utils_perf.disable_gc_diagnosis() - + try: + from dask.distributed import utils_perf + utils_perf.disable_gc_diagnosis() + except ImportError: + from distributed.gc import disable_gc_diagnosis + disable_gc_diagnosis() + assert isinstance(data, xr.DataArray), 'Argument data is not an xr.DataArray object' # convert the data to geographic coordinates if necessary @@ -381,8 +389,12 @@ def export_netcdf(self, data, name, caption='Exporting WGS84 NetCDF', engine='ne import dask import os # disable "distributed.utils_perf - WARNING - full garbage collections ..." - from dask.distributed import utils_perf - utils_perf.disable_gc_diagnosis() + try: + from dask.distributed import utils_perf + utils_perf.disable_gc_diagnosis() + except ImportError: + from distributed.gc import disable_gc_diagnosis + disable_gc_diagnosis() assert isinstance(data, xr.DataArray), 'Argument data is not an xr.DataArray object' diff --git a/pygmtsar/pygmtsar/Stack_phasediff.py b/pygmtsar/pygmtsar/Stack_phasediff.py index c0407fa9..c2cd32b0 100644 --- a/pygmtsar/pygmtsar/Stack_phasediff.py +++ b/pygmtsar/pygmtsar/Stack_phasediff.py @@ -492,8 +492,12 @@ def calc_drho(rho, topo, earth_radius, height, b, alpha, Bx): def block_phasediff(date1, date2, prm1, prm2, ylim, xlim): # use outer variables date, stack_prm # disable "distributed.utils_perf - WARNING - full garbage collections ..." - from dask.distributed import utils_perf - utils_perf.disable_gc_diagnosis() + try: + from dask.distributed import utils_perf + utils_perf.disable_gc_diagnosis() + except ImportError: + from distributed.gc import disable_gc_diagnosis + disable_gc_diagnosis() import warnings # suppress Dask warning "RuntimeWarning: invalid value encountered in divide" warnings.filterwarnings('ignore') diff --git a/pygmtsar/pygmtsar/Stack_stl.py b/pygmtsar/pygmtsar/Stack_stl.py index 7ddaae47..a7fbc78c 100644 --- a/pygmtsar/pygmtsar/Stack_stl.py +++ b/pygmtsar/pygmtsar/Stack_stl.py @@ -125,8 +125,12 @@ def stl(self, data, freq='W', periods=52, robust=False): import pandas as pd import dask # disable "distributed.utils_perf - WARNING - full garbage collections ..." - from dask.distributed import utils_perf - utils_perf.disable_gc_diagnosis() + try: + from dask.distributed import utils_perf + utils_perf.disable_gc_diagnosis() + except ImportError: + from distributed.gc import disable_gc_diagnosis + disable_gc_diagnosis() assert data.dims[0] == 'date', 'The first data dimension should be date' diff --git a/pygmtsar/pygmtsar/Stack_topo.py b/pygmtsar/pygmtsar/Stack_topo.py index cae34d95..8d8e7236 100644 --- a/pygmtsar/pygmtsar/Stack_topo.py +++ b/pygmtsar/pygmtsar/Stack_topo.py @@ -113,8 +113,12 @@ def calc_drho(rho, topo, earth_radius, height, b, alpha, Bx): def block_phase(prm1, prm2, ylim, xlim): # use outer variables date, stack_prm # disable "distributed.utils_perf - WARNING - full garbage collections ..." - from dask.distributed import utils_perf - utils_perf.disable_gc_diagnosis() + try: + from dask.distributed import utils_perf + utils_perf.disable_gc_diagnosis() + except ImportError: + from distributed.gc import disable_gc_diagnosis + disable_gc_diagnosis() import warnings # suppress Dask warning "RuntimeWarning: invalid value encountered in divide" warnings.filterwarnings('ignore') diff --git a/pygmtsar/pygmtsar/Stack_trans.py b/pygmtsar/pygmtsar/Stack_trans.py index 04b4e45e..4321e85e 100644 --- a/pygmtsar/pygmtsar/Stack_trans.py +++ b/pygmtsar/pygmtsar/Stack_trans.py @@ -109,8 +109,12 @@ def SAT_llt2rat(lats, lons, zs): # exclude latitude and longitude columns as redundant def trans_block(lats, lons, amin=-np.inf, amax=np.inf, rmin=-np.inf, rmax=np.inf, filename=None): # disable "distributed.utils_perf - WARNING - full garbage collections ..." - from dask.distributed import utils_perf - utils_perf.disable_gc_diagnosis() + try: + from dask.distributed import utils_perf + utils_perf.disable_gc_diagnosis() + except ImportError: + from distributed.gc import disable_gc_diagnosis + disable_gc_diagnosis() import warnings warnings.filterwarnings('ignore') diff --git a/pygmtsar/pygmtsar/Stack_trans_inv.py b/pygmtsar/pygmtsar/Stack_trans_inv.py index aa74c785..ea344a98 100644 --- a/pygmtsar/pygmtsar/Stack_trans_inv.py +++ b/pygmtsar/pygmtsar/Stack_trans_inv.py @@ -69,8 +69,12 @@ def compute_trans_inv(self, coarsen, trans='auto', interactive=False): def trans_inv_block(azis, rngs, tolerance, chunksize): from scipy.spatial import cKDTree # disable "distributed.utils_perf - WARNING - full garbage collections ..." - from dask.distributed import utils_perf - utils_perf.disable_gc_diagnosis() + try: + from dask.distributed import utils_perf + utils_perf.disable_gc_diagnosis() + except ImportError: + from distributed.gc import disable_gc_diagnosis + disable_gc_diagnosis() import warnings warnings.filterwarnings('ignore') diff --git a/pygmtsar/pygmtsar/Stack_unwrap_snaphu.py b/pygmtsar/pygmtsar/Stack_unwrap_snaphu.py index 60ea551c..5451642f 100644 --- a/pygmtsar/pygmtsar/Stack_unwrap_snaphu.py +++ b/pygmtsar/pygmtsar/Stack_unwrap_snaphu.py @@ -53,8 +53,12 @@ def snaphu(self, phase, corr=None, conf=None, conncomp=False, debug=False): from datetime import datetime import uuid # disable "distributed.utils_perf - WARNING - full garbage collections ..." - from dask.distributed import utils_perf - utils_perf.disable_gc_diagnosis() + try: + from dask.distributed import utils_perf + utils_perf.disable_gc_diagnosis() + except ImportError: + from distributed.gc import disable_gc_diagnosis + disable_gc_diagnosis() if conf is None: conf = self.snaphu_config() diff --git a/pygmtsar/pygmtsar/__init__.py b/pygmtsar/pygmtsar/__init__.py index 087a3524..83d27d22 100644 --- a/pygmtsar/pygmtsar/__init__.py +++ b/pygmtsar/pygmtsar/__init__.py @@ -7,7 +7,7 @@ # # Licensed under the BSD 3-Clause License (see LICENSE for details) # ---------------------------------------------------------------------------- -__version__ = '2024.8.2' +__version__ = '2024.8.2.post3' # unified progress indicators from .tqdm_joblib import tqdm_joblib