forked from zsylvester/curvaturepy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcline_analysis.py
197 lines (188 loc) · 7.25 KB
/
cline_analysis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
import numpy as np
import matplotlib.pyplot as plt
# from dp_python_master.dpcore import dp
from dtw import *
import os
import datetime
import pandas as pd
from scipy.signal import correlate
from scipy.spatial import distance
def load_data(dirname):
fnames = os.listdir(dirname)
for fname in fnames:
if fname == '.DS_Store':
os.remove(dirname+fname)
fnames = os.listdir(dirname)
fnames.sort()
clxs = []
clys = []
ages = []
widths = []
rbxs = []
lbxs = []
rbys = []
lbys = []
curvatures = []
for i in range(0,len(fnames)):
df = pd.read_csv(dirname+fnames[i])
x = np.array(df['centerline_x'])
y = np.array(df['centerline_y'])
lbx = np.array(df['right_bank_x']) # left and right are switched in the CSV file! - fix this
rbx = np.array(df['left_bank_x'])
lby = np.array(df['right_bank_y'])
rby = np.array(df['left_bank_y'])
clxs.append(x)
clys.append(y)
rbxs.append(rbx)
lbxs.append(lbx)
rbys.append(rby)
lbys.append(lby)
curvatures.append(np.array(df['curvature']))
ages.append(int(fnames[i][-12:-4]))
widths.append(df['width (m)'])
plt.figure()
for i in range(len(clxs)):
plt.plot(clxs[i],clys[i],'k.-')
plt.plot(rbxs[i],rbys[i],'b')
plt.plot(lbxs[i],lbys[i],'r')
plt.axis('equal')
dates = []
for i in range(len(ages)):
year = int(str(ages[i])[:4])
month = int(str(ages[i])[4:6])
day = int(str(ages[i])[6:])
date = datetime.datetime(year, month, day)
dates.append(date)
return fnames,clxs,clys,rbxs,lbxs,rbys,lbys,curvatures,ages,widths,dates
def correlate_clines(x1, x2, y1, y2):
"""use dynamic time warping to correlate centerlines
inputs:
x1, y1 - coordinates of first centerline
x2, y2 - coordinates of second centerline
penalty - parameter that forces more parallel correlation (or not)
outputs:
p - indices of correlation in second centerline
q - indices of correlation in first centerline
sm - distance matrix"""
X = np.vstack((x1,y1))
Y = np.vstack((x2,y2))
sm = distance.cdist(X.T, Y.T) # similarity matrix
alignment = dtw(sm, step_pattern=symmetric1) # dynamic time warping
p = alignment.index1[::-1] # correlation indices for first curve
q = alignment.index2[::-1] # correlation indices for second curve
return p, q, sm
def get_migr_rate(x1, x2, y1, y2, years):
"""use dynamic time warping to correlate centerlines
inputs:
x1, y1 - coordinates of first centerline
x2, y2 - coordinates of second centerline
years - time between the two centerlines, in years
outputs:
migr_rate - migration rate (in m/years)
migr_sign - migration sign
p - indices of correlation in second centerline
q - indices of correlation in first centerline"""
p, q, sm = correlate_clines(x1, x2, y1, y2)
p = p[::-1] # p and q need to be flipped!
q = q[::-1]
qn = np.delete(np.array(q),np.where(np.diff(p)==0)[0]+1)
pn = np.delete(np.array(p),np.where(np.diff(p)==0)[0]+1)
xa = x1[:-1]
xb = x1[1:]
ya = y1[:-1]
yb = y1[1:]
x = x2[qn][1:]
y = y2[qn][1:]
migr_sign = np.sign((x-xa) * (yb-ya) - (y-ya) * (xb-xa))
migr_sign = np.hstack((migr_sign[0], migr_sign))
migr_dist = migr_sign * sm[pn, qn] / years
return migr_dist, migr_sign, p, q
def compute_curvature(x,y):
"""compute curvature of curve defined by coordinates x and y
curvature is returned in units of 1/(unit of x and y)
s - distance along curve"""
dx = np.gradient(x); dy = np.gradient(y) # first derivatives
ds = np.sqrt(dx**2+dy**2)
ddx = np.gradient(dx); ddy = np.gradient(dy) # second derivatives
curvature = (dx*ddy - dy*ddx) / ((dx**2 + dy**2)**1.5) # curvature
s = np.cumsum(ds) # along-channel distance
return curvature, s
def compute_derivatives(x,y):
dx = np.gradient(x) # first derivatives
dy = np.gradient(y)
ds = np.sqrt(dx**2+dy**2)
s = np.cumsum(ds)
return dx, dy, ds, s
def find_zero_crossings(curve):
"""find zero crossings of a curve
input:
a one-dimensional array that describes the curve
outputs:
loc_zero_curv - indices of zero crossings
loc_max_curv - indices of maximum values"""
n_curv = abs(np.diff(np.sign(curve)))
n_curv[plt.mlab.find(n_curv==2)] = 1
loc_zero_curv = plt.mlab.find(n_curv)
loc_zero_curv = loc_zero_curv +1
loc_zero_curv = np.hstack((0,loc_zero_curv,len(curve)-1))
n_infl = len(loc_zero_curv)
max_curv = np.zeros(n_infl-1)
loc_max_curv = np.zeros(n_infl-1, dtype=int)
for i in range(1, n_infl):
if np.mean(curve[loc_zero_curv[i-1]:loc_zero_curv[i]])>0:
max_curv[i-1] = np.max(curve[loc_zero_curv[i-1]:loc_zero_curv[i]])
if np.mean(curve[loc_zero_curv[i-1]:loc_zero_curv[i]])<0:
max_curv[i-1] = np.min(curve[loc_zero_curv[i-1]:loc_zero_curv[i]])
max_local_ind = plt.mlab.find(curve[loc_zero_curv[i-1]:loc_zero_curv[i]]==max_curv[i-1])
if len(max_local_ind)>1:
loc_max_curv[i-1] = loc_zero_curv[i-1] + max_local_ind[0]
elif len(max_local_ind)==1:
loc_max_curv[i-1] = loc_zero_curv[i-1] + max_local_ind
else:
loc_max_curv[i-1] = 0
return loc_zero_curv, loc_max_curv
def get_predicted_migr_rate(curvature,W,k,Cf,D,kl,s):
"""function for calculating predicted migration rate
using the simplified Howard-Knutson model
inputs:
W - channel width (m)
k - constant (=1)
Cf - friction factor
D - channel depth (m)
kl - migration constant (m/year)
s - along-channel distance (m)
output:
R1 - predicted migration rate"""
ds = np.diff(s)
alpha = k*2*Cf/D
ns = len(s)
R0 = kl*W*curvature # preallocate vector for nominal channel migration rate
R1 = np.zeros(ns) # preallocate adjusted channel migration rate
for i in range(0,len(R1)):
si2 = np.hstack((0,np.cumsum(ds[i-1::-1]))) # distance along centerline, backwards from current point
G = np.exp(-alpha*si2) # weighting function
R1[i] = -1*R0[i] + 2.5*np.sum(R0[i::-1]*G)/np.sum(G) # actual migration rate (m/year)
return R1
def get_time_shifts(migr_rate,curv,window_length):
delta_t = np.arange(-len(migr_rate[:window_length]), len(migr_rate[:window_length]))
time_shifts = []
i = 0
while i+window_length < len(curv):
if np.sum(np.isnan(migr_rate[i:i+window_length]))>0: # get rid of windows with NaNs
time_shifts.append(np.NaN)
else:
corr = correlate(curv[i:i+window_length], migr_rate[i:i+window_length])
time_shift = delta_t[corr.argmax()]
time_shifts.append(time_shift)
i = i+window_length
time_shifts = np.array(time_shifts)
time_shifts = time_shifts[np.isnan(time_shifts)==0] # get rid of NaNs
return time_shifts
# function for optimizing for Cf:
def get_friction_factor(Cf,curvature,migr_rate,kl,W,k,D,s):
R1 = get_predicted_migr_rate(curvature,W,k,Cf,D,kl,s)
corr = correlate(R1, migr_rate)
# delta time array to match xcorr:
delta_t = np.arange(1-len(R1), len(R1))
time_shift = delta_t[corr.argmax()]
return time_shift # goal is to minimize the time shift