From 2cf94fa5f1aa1d6515493ca8dc9903f3b6d743eb Mon Sep 17 00:00:00 2001 From: Alexey Pechnikov Date: Fri, 28 Jul 2023 14:56:20 +0700 Subject: [PATCH] Enhance PS processing, rename ps_parallel() to adi_parallel() --- pygmtsar/pygmtsar/PRM.py | 3 ++ pygmtsar/pygmtsar/SBAS_ps.py | 83 +++++++++++++++++++----------------- 2 files changed, 46 insertions(+), 40 deletions(-) diff --git a/pygmtsar/pygmtsar/PRM.py b/pygmtsar/pygmtsar/PRM.py index 3cca6728..740bc817 100755 --- a/pygmtsar/pygmtsar/PRM.py +++ b/pygmtsar/pygmtsar/PRM.py @@ -781,6 +781,7 @@ def read_SLC_block(slc_filename, start, stop, amplitude): #print (' size', data.size) real_part = data[::2].astype(np.float32) imag_part = data[1::2].astype(np.float32) + del data if not amplitude: # return original complex data return DFACT * (real_part + 1j * imag_part) @@ -807,9 +808,11 @@ def read_SLC_block(slc_filename, start, stop, amplitude): block = dask.array.from_delayed(read_SLC_block(slc_filename, start, stop, amplitude), shape=((stop-start),), dtype=np.float32 if amplitude else np.complex64) lazy_arrays.append(block) + del block # Concatenate the chunks together # Data file can include additional data outside of the specified dimensions data = dask.array.concatenate(lazy_arrays).reshape((-1, xdim))[:ydim,:] + del lazy_arrays #amp = xr.DataArray(dask.array.flipud(amp), coords={'y': np.arange(ydim) + 0.5, 'x': np.arange(xdim) + 0.5}) data = xr.DataArray(data, coords={'y': np.arange(ydim) + 0.5, 'x': np.arange(xdim) + 0.5}) # set the short name instead of autogenerated one diff --git a/pygmtsar/pygmtsar/SBAS_ps.py b/pygmtsar/pygmtsar/SBAS_ps.py index 41abc0c9..7ef4b0fb 100644 --- a/pygmtsar/pygmtsar/SBAS_ps.py +++ b/pygmtsar/pygmtsar/SBAS_ps.py @@ -12,6 +12,14 @@ class SBAS_ps(SBAS_stl): + def get_adi(self, subswath=None, threshold=None, chunksize=None, debug=False): + """ + TODO: threshold + """ + + adis = self.open_grids(None, 'adi') + return adis[0] if len(adis)==1 else adis + #from pygmtsar import tqdm_dask #SBAS.ps_parallel = ps_parallel #sbas.ps_parallel(interactive=True) @@ -22,58 +30,53 @@ class SBAS_ps(SBAS_stl): #adi_dec = adi.coarsen({'y': 4, 'x': 16}, boundary='trim').min() #adi_dec # define PS candidates using Amplitude Dispersion Index (ADI) - def ps_parallel(self, dates=None, threshold=None, chunksize=None, interactive=False, debug=False): + def adi_parallel(self, dates=None, chunksize=None): import xarray as xr import numpy as np import dask import os - if dates is None: - dates = self.df.index - - # scale factor is selected to have the same number of pixels per square chunk if chunksize is None: chunksize = self.chunksize - # use SLC-related chunks for faster processing - bigchunk = self.PRM().get('num_rng_bins') - minichunksize = int(np.round(chunksize**2/bigchunk)) - #print (minichunksize, bigchunk) - amps = [] for subswath in self.get_subswaths(): - subamps = [] - for date in dates: - #print (subswath, date) - prm = self.PRM(subswath, date) - amp = prm.read_SLC_int(amplitude=True, chunksize=minichunksize) - amp['date'] = date - subamps.append(amp) + if dates is None: + dates = self.df[self.df['subswath']==subswath].index.values + #print ('dates', dates) + # select radar coordinates extent + rng_max = self.PRM(subswath).get('num_rng_bins') + #print ('azi_max', azi_max, 'rng_max', rng_max) + # use SLC-related chunks for faster processing + minichunksize = int(np.round(chunksize**2/rng_max)) + #print (minichunksize) + amps = [self.PRM(subswath, date).read_SLC_int(amplitude=True, chunksize=minichunksize) for date in dates] # build stack - subamps = xr.concat(subamps, dim='date') + amps = xr.concat(amps, dim='date') # normalize image amplitudes - mean = subamps.mean(dim=['y','x']) - norm = mean/mean.mean(dim='date') + #mean = amps.mean(dim=['y','x']).compute() + tqdm_dask(mean := dask.persist(amps.mean(dim=['y','x'])), desc=f'Amplitude Normalization sw{subswath}') + #print ('mean', mean) + # dask.persist returns tuple + norm = (mean[0]/mean[0].mean(dim='date')) + del mean + #print ('norm', norm) # compute Amplitude Dispersion Index (ADI) - adi = (norm*subamps).std(dim='date')/(norm*subamps).mean(dim='date') - # use the specified threshold - if threshold is not None: - amps.append(adi.where(adi<=threshold)) - else: - amps.append(adi) - - if interactive: - return amps[0] if len(amps)==1 else amps - - delayeds = [] - for (subswath, ps) in zip(self.get_subswaths(), amps): + #adi = ((norm*amps).std(dim='date')/(norm*amps).mean(dim='date')).rename('adi') + #del norm, amps + stats = (norm*amps).pipe(lambda x: (x.mean(dim='date'), x.std(dim='date'))) + del amps, norm + adi = (stats[1] / stats[0]).rename('adi') + del stats # Define the encoding and choose the appropriate storage scheme - ps_filename = self.get_filenames(subswath, None, f'ps') - if os.path.exists(ps_filename): - os.remove(ps_filename) - # save by fastest way using chunks like to (12, 512) - delayed = ps.rename('adi').to_netcdf(ps_filename, + adi_filename = self.get_filenames(subswath, None, f'adi') + if os.path.exists(adi_filename): + os.remove(adi_filename) + # save by fastest way using SLC-related chunks like (48, 1024) + delayed = adi.rename({'y': 'a', 'x': 'r'}).to_netcdf(adi_filename, engine=self.engine, - encoding={'adi': self.compression(ps.shape, chunksize=(minichunksize, 512))}, + encoding={'adi': self.compression(adi.shape, chunksize=(minichunksize, chunksize))}, compute=False) - delayeds.append(delayed) - tqdm_dask(dask.persist(delayeds), desc='Persistent Scatterers') + tqdm_dask(dask.persist(delayed), desc=f'Amplitude Dispersion Index (ADI) sw{subswath}') + del adi, delayed + # cleanup - sometimes writing NetCDF handlers are not closed immediately and block reading access + import gc; gc.collect()