Skip to content

Commit

Permalink
Enhance PS processing, rename ps_parallel() to adi_parallel()
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexeyPechnikov committed Jul 28, 2023
1 parent c327f4c commit 2cf94fa
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 40 deletions.
3 changes: 3 additions & 0 deletions pygmtsar/pygmtsar/PRM.py
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,7 @@ def read_SLC_block(slc_filename, start, stop, amplitude):
#print (' size', data.size)
real_part = data[::2].astype(np.float32)
imag_part = data[1::2].astype(np.float32)
del data
if not amplitude:
# return original complex data
return DFACT * (real_part + 1j * imag_part)
Expand All @@ -807,9 +808,11 @@ def read_SLC_block(slc_filename, start, stop, amplitude):
block = dask.array.from_delayed(read_SLC_block(slc_filename, start, stop, amplitude), shape=((stop-start),),
dtype=np.float32 if amplitude else np.complex64)
lazy_arrays.append(block)
del block
# Concatenate the chunks together
# Data file can include additional data outside of the specified dimensions
data = dask.array.concatenate(lazy_arrays).reshape((-1, xdim))[:ydim,:]
del lazy_arrays
#amp = xr.DataArray(dask.array.flipud(amp), coords={'y': np.arange(ydim) + 0.5, 'x': np.arange(xdim) + 0.5})
data = xr.DataArray(data, coords={'y': np.arange(ydim) + 0.5, 'x': np.arange(xdim) + 0.5})
# set the short name instead of autogenerated one
Expand Down
83 changes: 43 additions & 40 deletions pygmtsar/pygmtsar/SBAS_ps.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@

class SBAS_ps(SBAS_stl):

def get_adi(self, subswath=None, threshold=None, chunksize=None, debug=False):
"""
TODO: threshold
"""

adis = self.open_grids(None, 'adi')
return adis[0] if len(adis)==1 else adis

#from pygmtsar import tqdm_dask
#SBAS.ps_parallel = ps_parallel
#sbas.ps_parallel(interactive=True)
Expand All @@ -22,58 +30,53 @@ class SBAS_ps(SBAS_stl):
#adi_dec = adi.coarsen({'y': 4, 'x': 16}, boundary='trim').min()
#adi_dec
# define PS candidates using Amplitude Dispersion Index (ADI)
def ps_parallel(self, dates=None, threshold=None, chunksize=None, interactive=False, debug=False):
def adi_parallel(self, dates=None, chunksize=None):
import xarray as xr
import numpy as np
import dask
import os

if dates is None:
dates = self.df.index

# scale factor is selected to have the same number of pixels per square chunk
if chunksize is None:
chunksize = self.chunksize
# use SLC-related chunks for faster processing
bigchunk = self.PRM().get('num_rng_bins')
minichunksize = int(np.round(chunksize**2/bigchunk))
#print (minichunksize, bigchunk)

amps = []
for subswath in self.get_subswaths():
subamps = []
for date in dates:
#print (subswath, date)
prm = self.PRM(subswath, date)
amp = prm.read_SLC_int(amplitude=True, chunksize=minichunksize)
amp['date'] = date
subamps.append(amp)
if dates is None:
dates = self.df[self.df['subswath']==subswath].index.values
#print ('dates', dates)
# select radar coordinates extent
rng_max = self.PRM(subswath).get('num_rng_bins')
#print ('azi_max', azi_max, 'rng_max', rng_max)
# use SLC-related chunks for faster processing
minichunksize = int(np.round(chunksize**2/rng_max))
#print (minichunksize)
amps = [self.PRM(subswath, date).read_SLC_int(amplitude=True, chunksize=minichunksize) for date in dates]
# build stack
subamps = xr.concat(subamps, dim='date')
amps = xr.concat(amps, dim='date')
# normalize image amplitudes
mean = subamps.mean(dim=['y','x'])
norm = mean/mean.mean(dim='date')
#mean = amps.mean(dim=['y','x']).compute()
tqdm_dask(mean := dask.persist(amps.mean(dim=['y','x'])), desc=f'Amplitude Normalization sw{subswath}')
#print ('mean', mean)
# dask.persist returns tuple
norm = (mean[0]/mean[0].mean(dim='date'))
del mean
#print ('norm', norm)
# compute Amplitude Dispersion Index (ADI)
adi = (norm*subamps).std(dim='date')/(norm*subamps).mean(dim='date')
# use the specified threshold
if threshold is not None:
amps.append(adi.where(adi<=threshold))
else:
amps.append(adi)

if interactive:
return amps[0] if len(amps)==1 else amps

delayeds = []
for (subswath, ps) in zip(self.get_subswaths(), amps):
#adi = ((norm*amps).std(dim='date')/(norm*amps).mean(dim='date')).rename('adi')
#del norm, amps
stats = (norm*amps).pipe(lambda x: (x.mean(dim='date'), x.std(dim='date')))
del amps, norm
adi = (stats[1] / stats[0]).rename('adi')
del stats
# Define the encoding and choose the appropriate storage scheme
ps_filename = self.get_filenames(subswath, None, f'ps')
if os.path.exists(ps_filename):
os.remove(ps_filename)
# save by fastest way using chunks like to (12, 512)
delayed = ps.rename('adi').to_netcdf(ps_filename,
adi_filename = self.get_filenames(subswath, None, f'adi')
if os.path.exists(adi_filename):
os.remove(adi_filename)
# save by fastest way using SLC-related chunks like (48, 1024)
delayed = adi.rename({'y': 'a', 'x': 'r'}).to_netcdf(adi_filename,
engine=self.engine,
encoding={'adi': self.compression(ps.shape, chunksize=(minichunksize, 512))},
encoding={'adi': self.compression(adi.shape, chunksize=(minichunksize, chunksize))},
compute=False)
delayeds.append(delayed)
tqdm_dask(dask.persist(delayeds), desc='Persistent Scatterers')
tqdm_dask(dask.persist(delayed), desc=f'Amplitude Dispersion Index (ADI) sw{subswath}')
del adi, delayed
# cleanup - sometimes writing NetCDF handlers are not closed immediately and block reading access
import gc; gc.collect()

0 comments on commit 2cf94fa

Please sign in to comment.