Skip to content

Commit

Permalink
updated ScanData class
Browse files Browse the repository at this point in the history
  • Loading branch information
bingli621 committed Oct 28, 2024
1 parent c79d31d commit e05f56a
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 101 deletions.
7 changes: 7 additions & 0 deletions scripts/tavi_loader.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
using HDF5

filename = "/Users/x4l/Documents/GitHub/TAVI/test_data/tavi_rez.h5"
fid = h5open(filename, "r")
pdata = fid["processed_data"]
scan0042 = pdata["scan0042"]
rez_mat = read(scan0042, "rez_mat")
170 changes: 96 additions & 74 deletions src/tavi/data/scan_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ def __init__(
self,
x: np.ndarray,
y: np.ndarray,
norm: Optional[np.ndarray] = None,
norm_col: Optional[np.ndarray] = None,
) -> None:

# ind = np.argsort(x)
# self.x = x[ind]
# self.y = y[ind]
self.x = x
self.y = y
self.norm = norm
self.norm_col = norm_col
self.err = np.sqrt(y)
# self._ind = ind
self.label = ""
Expand All @@ -45,74 +45,89 @@ def make_labels(self, axes: tuple[str, str], norm_to: tuple[float, str], label:
self.label = label
self.title = title

def __add__(self, other):
# check x length, rebin other if do not match
if len(self.x) != len(other.x):
rebin_intervals = np.diff(self.x)
rebin_intervals = np.append(rebin_intervals, rebin_intervals[-1])
rebin_boundary = self.x + rebin_intervals / 2

y = np.zeros_like(rebin_boundary)
counts = np.zeros_like(rebin_boundary)
err = np.zeros_like(rebin_boundary)
(x_min, x_max) = (self.x[0] - rebin_intervals[0] / 2, self.x[-1] + rebin_intervals[-1] / 2)

for i, x0 in enumerate(other.x):
if x0 > x_max or x0 < x_min:
continue
idx = np.nanargmax(rebin_boundary + ScanData1D.ZERO >= x0)
y[idx] += other.y[i]
err[idx] += other.err[i] ** 2
counts[idx] += 1

other.err = err / counts
other.y = y / counts

scan_data_1d = ScanData1D(self.x, self.y + other.y)
scan_data_1d.err = np.sqrt(self.err**2 + other.err**2)
return scan_data_1d

# def __add__(self, other):
# # check x length, rebin other if do not match
# if len(self.x) != len(other.x):
# rebin_intervals = np.diff(self.x)
# rebin_intervals = np.append(rebin_intervals, rebin_intervals[-1])
# rebin_boundary = self.x + rebin_intervals / 2

# y = np.zeros_like(rebin_boundary)
# counts = np.zeros_like(rebin_boundary)
# err = np.zeros_like(rebin_boundary)
# (x_min, x_max) = (self.x[0] - rebin_intervals[0] / 2, self.x[-1] + rebin_intervals[-1] / 2)

# for i, x0 in enumerate(other.x):
# if x0 > x_max or x0 < x_min:
# continue
# idx = np.nanargmax(rebin_boundary + ScanData1D.ZERO >= x0)
# y[idx] += other.y[i]
# err[idx] += other.err[i] ** 2
# counts[idx] += 1

# other.err = err / counts
# other.y = y / counts

# scan_data_1d = ScanData1D(self.x, self.y + other.y)
# scan_data_1d.err = np.sqrt(self.err**2 + other.err**2)
# return scan_data_1d

# TODO how to normalize?
def __sub__(self, other):
# check x length, rebin other if do not match
# check x length, rebin other
if len(self.x) != len(other.x):
rebin_intervals = np.diff(self.x)
rebin_intervals = np.append(rebin_intervals, rebin_intervals[-1])
rebin_boundary = self.x + rebin_intervals / 2

y = np.zeros_like(rebin_boundary)
counts = np.zeros_like(rebin_boundary)
err = np.zeros_like(rebin_boundary)
(x_min, x_max) = (self.x[0] - rebin_intervals[0] / 2, self.x[-1] + rebin_intervals[-1] / 2)

for i, x0 in enumerate(other.x):
if x0 > x_max or x0 < x_min:
continue
idx = np.nanargmax(rebin_boundary + ScanData1D.ZERO >= x0)
y[idx] += other.y[i]
err[idx] += other.err[i] ** 2
counts[idx] += 1

other.err = err / counts
other.y = y / counts
pass
# rebin_intervals = np.diff(self.x)
# rebin_intervals = np.append(rebin_intervals, rebin_intervals[-1])
# rebin_boundary = self.x + rebin_intervals / 2

# y = np.zeros_like(rebin_boundary)
# counts = np.zeros_like(rebin_boundary)
# err = np.zeros_like(rebin_boundary)
# (x_min, x_max) = (self.x[0] - rebin_intervals[0] / 2, self.x[-1] + rebin_intervals[-1] / 2)

# for i, x0 in enumerate(other.x):
# if x0 > x_max or x0 < x_min:
# continue
# idx = np.nanargmax(rebin_boundary + ScanData1D.ZERO >= x0)
# y[idx] += other.y[i]
# err[idx] += other.err[i] ** 2
# counts[idx] += 1

# other.err = err / counts
# other.y = y / counts

scan_data_1d = ScanData1D(self.x, self.y - other.y)
scan_data_1d.err = np.sqrt(self.err**2 + other.err**2)
return scan_data_1d

def renorm(self, norm: Optional[np.ndarray] = None, val: float = 1.0):
"""Renormalized to norm_val"""
"""Renormalized to norm_val
Use norm if given, otherwise use self.norm
"""

if norm is not None:
norm_col = norm
elif self.norm is not None:
norm_col = self.norm
elif self.norm_col is not None:
norm_col = self.norm_col
else:
raise ValueError("Normalizaion collumns cannot be None.")
raise ValueError("Normalizaion columns cannot be None.")
self.y = self.y / norm_col * val
self.err = self.err / norm_col * val

def rebin_tol(self, rebin_params: tuple, weight_col: np.ndarray):
"""Rebin with tolerance"""
def rebin_tol(self, rebin_params: tuple, weight_col: Optional[np.ndarray] = None):
"""Rebin with tolerance
Note:
This should be used only if all data are measured with the same weight/counting time"""

if weight_col is not None:
norm_col = weight_col
elif self.norm_col is not None:
norm_col = self.norm_col
else:
raise ValueError("Normalizaion columns cannot be None.")

rebin_min, rebin_max, rebin_step = rebin_params
rebin_min = np.min(self.x) if rebin_min is None else rebin_min
Expand All @@ -122,49 +137,57 @@ def rebin_tol(self, rebin_params: tuple, weight_col: np.ndarray):
num = math.floor((rebin_max + ZERO - rebin_min) / rebin_step) + 1

x_boundary = np.linspace(rebin_min - rebin_step / 2, rebin_min + rebin_step * (num - 1 / 2), num + 1)
x = np.linspace(rebin_min, rebin_min + rebin_step * (num - 1), num)
x = np.zeros_like(x_boundary[1:])
y = np.zeros_like(x)
counts = np.zeros_like(x)
weights = np.zeros_like(x)
x = np.zeros_like(y)

for i, x0 in enumerate(self.x):
# Return the indices of the maximum values in the specified axis ignoring NaNs.
idx = np.nanargmax(x_boundary - ZERO > x0)
if idx > 0: # ignore first and last bin box
y[idx - 1] += self.y[i]
x[idx - 1] += self.x[i] * weight_col[i]
weights[idx - 1] += weight_col[i]
x[idx - 1] += self.x[i] * norm_col[i]
weights[idx - 1] += norm_col[i]
counts[idx - 1] += 1

self.err = np.sqrt(y) / counts
self.y = y / counts
self.x = x / weights

def rebin_tol_renorm(self, rebin_params: tuple, norm_col: np.ndarray, norm_val: float = 1.0):
def rebin_tol_renorm(self, rebin_params: tuple, norm_col: Optional[np.ndarray] = None, norm_val: float = 1.0):
"""Rebin with tolerance and renormalize"""

if norm_col is not None:
norm_col = norm_col
elif self.norm_col is not None:
norm_col = self.norm_col
else:
raise ValueError("Normalizaion columns cannot be None.")

rebin_min, rebin_max, rebin_step = rebin_params
rebin_min = np.min(self.x) if rebin_min is None else rebin_min
rebin_max = np.max(self.x) if rebin_max is None else rebin_max

ZERO = rebin_step / 100 # helps with the rounding error
num = math.floor((rebin_max + ZERO - rebin_min) / rebin_step) + 1

x_grid = np.arange(rebin_min - rebin_step / 2, rebin_max + rebin_step * 3 / 2, rebin_step)
x = np.zeros_like(x_grid)
y = np.zeros_like(x_grid)
counts = np.zeros_like(x_grid)

# norm_col = norm_col[self._ind]
x_boundary = np.linspace(rebin_min - rebin_step / 2, rebin_min + rebin_step * (num - 1 / 2), num + 1)
x = np.zeros_like(x_boundary[1:])
y = np.zeros_like(x)
counts = np.zeros_like(x)

for i, x0 in enumerate(self.x):
idx = np.nanargmax(x_grid + rebin_step / 2 + ZERO >= x0)
y[idx] += self.y[i]
x[idx] += self.x[i] * norm_col[i]
counts[idx] += norm_col[i]
# Return the indices of the maximum values in the specified axis ignoring NaNs.
idx = np.nanargmax(x_boundary - ZERO > x0)
if idx > 0: # ignore first and last bin box
y[idx - 1] += self.y[i]
x[idx - 1] += self.x[i] * norm_col[i]
counts[idx - 1] += norm_col[i]

self.err = np.sqrt(y[1:-2]) / counts[1:-2] * norm_val
self.y = y[1:-2] / counts[1:-2] * norm_val
self.x = x[1:-2] / counts
self.err = np.sqrt(y) / counts * norm_val
self.y = y / counts * norm_val
self.x = x / counts

def rebin_grid(self, rebin_params: tuple):
"""Rebin with a regular grid"""
Expand All @@ -181,7 +204,6 @@ def rebin_grid(self, rebin_params: tuple):
counts = np.zeros_like(x)

for i, x0 in enumerate(self.x):

# Return the indices of the maximum values in the specified axis ignoring NaNs.
idx = np.nanargmax(x_boundary - ZERO > x0)
if idx > 0: # ignore first and last bin box
Expand Down
74 changes: 47 additions & 27 deletions tests/test_scan_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@

@pytest.fixture
def scans1d():
scan0001 = ScanData1D(x=np.array([0, 1, 2]), y=np.array([1, 2, 3]), norm=np.array([2, 2, 3]))
scan0001 = ScanData1D(x=np.array([0, 1, 2]), y=np.array([1, 2, 3]), norm_col=np.array([2, 2, 3]))
scan0002 = ScanData1D(
x=np.array([15, 15.1, 15.2, 15.3, 15.4, 15.1, 15.2, 15.3, 15.4, 15.5]),
y=np.array([10, 12, 15, 42, 90, 31, 34, 105, 230, 3]),
norm=np.array([2, 2, 2, 2, 2, 5, 5, 5, 5, 5]),
norm_col=np.array([2, 2, 2, 2, 2, 5, 5, 5, 5, 5]),
)
scan0003 = ScanData1D(x=np.array([0.1, 1.1, 2.1]), y=np.array([1, 1, 1]))
scan0004 = ScanData1D(x=np.array([-0.9, 0.1, 1.1, 2.1, 3.1]), y=np.array([10, 1, 1, 1, 10]))
scan0004 = ScanData1D(
x=np.array([-0.9, 0.1, 1.1, 2.1, 3.1]), y=np.array([10, 1, 1, 1, 10]), norm_col=np.array([2, 2, 3])
)

scans = (scan0001, scan0002, scan0003, scan0004)

Expand All @@ -25,7 +27,7 @@ def scans1d():

def test_scan_data_1d_renorm(scans1d):
scan0001, *_ = scans1d
scan0001.renorm(norm_val=2)
scan0001.renorm(val=2)
assert np.allclose(scan0001.y, [1, 2, 2], atol=1e-3)
assert np.allclose(scan0001.err, [1, np.sqrt(2), np.sqrt(3) / 3 * 2], atol=1e-3)

Expand Down Expand Up @@ -73,14 +75,9 @@ def test_rebin_grid(scans1d):
_, scan0002, *_ = scans1d
scan0002.rebin_grid_renorm(
rebin_params=(15.0, 15.5, 0.2),
norm_col=np.array([2, 2, 2, 2, 2, 5, 5, 5, 5, 5]),
norm_val=4.0,
)
assert np.allclose(
scan0002.x,
[15.0, 15.2, 15.4],
atol=1e-3,
)
assert np.allclose(scan0002.x, [15.0, 15.2, 15.4], atol=1e-3)
assert np.allclose(
scan0002.y,
[
Expand All @@ -104,12 +101,10 @@ def test_rebin_grid(scans1d):


def test_rebin_tol(scans1d):
"""This should be used only for data measured w/ same weight/counting time"""
_, scan0002, *_ = scans1d

scan0002.rebin_tol(
rebin_params=(15.0, 15.5, 0.2),
weight_col=np.array([2, 2, 2, 2, 2, 5, 5, 5, 5, 5]),
)
scan0002.rebin_tol(rebin_params=(15.0, 15.5, 0.2))
assert np.allclose(
scan0002.x,
[
Expand All @@ -122,19 +117,19 @@ def test_rebin_tol(scans1d):
assert np.allclose(
scan0002.y,
[
10 / 2,
(12 + 15 + 31 + 34) / (2 + 2 + 5 + 5),
(42 + 90 + 105 + 230) / (2 + 5 + 2 + 5),
10,
(12 + 15 + 31 + 34) / 4,
(42 + 90 + 105 + 230) / 4,
],
atol=1e-3,
equal_nan=True,
)
assert np.allclose(
scan0002.err,
[
np.sqrt(10) / 2,
np.sqrt(12 + 15 + 31 + 34) / (2 + 2 + 5 + 5),
np.sqrt(42 + 90 + 105 + 230) / (2 + 5 + 2 + 5),
np.sqrt(10),
np.sqrt(12 + 15 + 31 + 34) / 4,
np.sqrt(42 + 90 + 105 + 230) / 4,
],
atol=1e-3,
equal_nan=True,
Expand All @@ -146,18 +141,37 @@ def test_rebin_tol_renorm(scans1d):

scan0002.rebin_tol_renorm(
rebin_params=(15.0, 15.5, 0.2),
norm_col=np.array([2, 2, 2, 2, 2, 5, 5, 5, 5, 5]),
norm_val=4.0,
)
assert np.allclose(
scan0002.x,
[
(2 * 15 + 2 * 15.1 + 5 * 15.1 + 2 * 15.2 + 5 * 15.2) / (2 + 2 + 5 + 2 + 5),
15,
(2 * 15.1 + 5 * 15.1 + 2 * 15.2 + 5 * 15.2) / (2 + 2 + 5 + 5),
(2 * 15.3 + 5 * 15.3 + 2 * 15.4 + 5 * 15.4) / (2 + 2 + 5 + 5),
(5 * 15.5) / (5),
],
atol=1e-3,
)
assert np.allclose(
scan0002.y,
[
10 / 2 * 4,
(12 + 15 + 31 + 34) / (2 + 2 + 5 + 5) * 4,
(42 + 90 + 105 + 230) / (2 + 5 + 2 + 5) * 4,
],
atol=1e-3,
equal_nan=True,
)
assert np.allclose(
scan0002.err,
[
np.sqrt(10) / 2 * 4,
np.sqrt(12 + 15 + 31 + 34) / (2 + 2 + 5 + 5) * 4,
np.sqrt(42 + 90 + 105 + 230) / (2 + 5 + 2 + 5) * 4,
],
atol=1e-3,
equal_nan=True,
)


def test_sub(scans1d):
Expand All @@ -178,7 +192,13 @@ def test_add(scans1d):

def test_sub_mismatch_x(scans1d):
scan0001, _, _, scan0004, *_ = scans1d
scan_sub = scan0001 - scan0004
assert np.allclose(scan_sub.x, [0, 1, 2])
assert np.allclose(scan_sub.y, [0, 1, 2])
assert np.allclose(scan_sub.err, [np.sqrt(2), np.sqrt(3), 2])
scan_sub1 = scan0001 - scan0004
assert np.allclose(scan_sub1.x, [0, 1 / 2, 2 / 3])
assert np.allclose(scan_sub1.y, [0, 1, 2])
assert np.allclose(scan_sub1.err, [1 / np.sqrt(2), np.sqrt(3) / 2, 2 / 3])

# TODO reverse?
# scan_sub2 = scan0004 - scan0001
# assert np.allclose(scan_sub2.x, [0, 1, 2])
# assert np.allclose(scan_sub2.y, [0, -1, -2])
# assert np.allclose(scan_sub2.err, [np.sqrt(2), np.sqrt(3), 2])

0 comments on commit e05f56a

Please sign in to comment.