-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathpreprocessing.py
233 lines (203 loc) · 8.21 KB
/
preprocessing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
import pandas as pd
import numpy as np
import statsmodels.api as sm
from functools import partial
import sklearn
__all__ = ['transformCytokines',
'enforceSensitivity',
'normalizeLevels',
'meanSubNormalize',
'partialCorrNormalize',
'convertLevel',
'standardizedMean',
'imputeNA']
def meanSubNormalize(cyDf, cyVars=None, compCommVars=None, meanVar=None):
"""Normalize cytokine columns by the log-mean for each patient, within each compartment.
The point is that if cytokine concentrations are generally high for one sample or another,
this might dominate the covariation of cytokines across patients (both within/across compartments).
We subtract off the mean since the "overall inflamation" level
that we are adjusting for would probably be on the fold-change concentration scale.
(additive on the log-concentration scale)"""
def _normFuncSub(vec):
out = vec - muVec
return out
if cyVars is None:
cyVars = cyDf.columns
if meanVar is None:
meanVar = 'Mean'
if compCommVars is None:
cyDf.columns
"""No standardizing cytokines before taking the mean (need units to stay in log-concentration)"""
muVec = cyDf[compCommVars].mean(axis=1)
ndf = cyDf.copy()
ndf.loc[:, cyVars] = ndf[cyVars].apply(_normFuncSub, axis = 1)
ndf.loc[:, meanVar] = muVec
return ndf
def partialCorrNormalize(cyDf, cyVars=None, compCommVars=None, meanVar=None, returnModels=True):
"""Computes residuals in-place after regressing each cytokine on the mean cytokine level
Correlations among residuals are partial correlations, adjusting for meanVar
Parameters
----------
cyDf : pd.DataFrame
Log-transformed cytokine data with cytokine columns and rows per patient/timepoint
cyVars : list
Cytokine columns in cyDf that will be normalized and included in the returned df
(default: all columns in cyDf)
compCommVars : list
Cytokine columns used for computing the mean level for each row.
(default: all columns in cyDf)
meanVar : str
Name of the cytokine mean column added to the df
(default: "Mean")
returnModels : bool
If True, return the fitted statsmodels.GLM objects for transforming additional timepoints.
Returns
-------
nCyDf : pd.DataFrame
Residuals after regressing each cyVar on the mean cytokine level for each row.
models : pd.Series
If returnModels is True,
result object from the regression for each cytokine (index), that can be used
to normalize additional timepoints."""
def _meanCorrModel(colVec):
if colVec.isnull().all():
result = None
else:
model = sm.GLM(endog=colVec, exog=sm.add_constant(muVec), missing='drop')
try:
result = model.fit()
except sm.tools.sm_exceptions.PerfectSeparationError:
prnt = (colVec.name, colVec.isnull().sum(), colVec.shape[0])
print('PerfectSeparationError with column "%s": %d of %d values are Nan' % prnt)
result = None
return result
def _meanCorrResiduals(colVec):
result = _meanCorrModel(colVec)
if result is None:
return colVec
else:
return colVec - result.predict(sm.add_constant(muVec))
if cyVars is None:
cyVars = cyDf.columns
if meanVar is None:
meanVar = 'Mean'
if compCommVars is None:
compCommVars = cyVars
"""Standardize each cytokine before taking the mean.
Ensures equal "weighting" between cytokines when computing the mean level."""
muVec = standardizedMean(cyDf, vars=compCommVars)
models = cyDf.loc[:, cyVars].apply(_meanCorrModel, axis=0)
ndf = cyDf.copy()
ndf.loc[:, cyVars] = ndf.loc[:, cyVars].apply(_meanCorrResiduals, axis=0)
ndf.loc[:, meanVar] = muVec
if returnModels:
return ndf, models
else:
return ndf
def standardizedMean(df, vars=None):
if vars is None:
vars = df.columns
muS = df[vars].apply(lambda cy: (cy - cy.mean()) / cy.std(), axis=0).mean(axis=1)
muS.name = 'Mean'
return muS
def imputeNA(df, method='mean', dropThresh=0.75):
"""Drop rows (PTIDs) that have fewer than 90% of their cytokines"""
outDf = df.dropna(axis=0, thresh=np.round(df.shape[1] * dropThresh)).copy()
if method == 'resample':
for col in outDf.columns:
naInd = outDf[col].isnull()
outDf.loc[naInd, col] = outDf.loc[~naInd, col].sample(naInd.sum(), replace=True).values
elif method == 'mean':
for col in outDf.columns:
naInd = outDf[col].isnull()
outDf.loc[naInd, col] = outDf.loc[~naInd, col].mean()
elif method == 'predict':
naInds = []
for col in outDf.columns:
naInd = outDf[col].isnull()
outDf.loc[naInd, col] = outDf.loc[~naInd, col].mean()
naInds.append(naInd)
for naInd,col in zip(naInds, outDf.columns):
if naInd.sum() > 0:
otherCols = [c for c in outDf.columns if not c == col]
mod = sklearn.linear_model.LinearRegression().fit(outDf[otherCols], outDf[col])
outDf.loc[naInd, col] = mod.predict(outDf.loc[naInd, otherCols])
return outDf
def convertLevel(mn, mx, val, mask = False, verbose = False):
"""Map function for cleaning and censoring cytokine values
Remove ">" and "," characters, while setting minimum/maximum detection level
and coverting to floats
If mask is True, return a sentinel value for the mask (instead of a converted value)
0 : minimum sensitivity/NS (no signal)
0.5 : OK value
1 : maximum/saturated
-1 : NA/ND (not done)"""
if isinstance(val, str):
val = val.replace(',', '').replace('>', '')
try:
out = float(val)
except ValueError:
if val == 'NS':
out = mn
elif val == 'ND':
out = np.nan
elif val.find('N/A') >= 0:
out = np.nan
else:
raise BaseException("Unexpected value: %s" % val)
if not mx is None:
if out >= mx:
if verbose:
print('Max truncation: %1.2f to %1.2f' % (out, mx))
out = min(out, mx)
if mask:
return 1
if not mn is None:
if out <= mn:
if verbose:
print('Min truncation %1.2f to %1.2f' % (out, mn))
out = max(out, mn)
if mask:
return 0
if mask:
if np.isnan(out):
return -1
else:
return 0.5
return out
def enforceSensitivity(cyDf, sensitivityS, truncateLevels=True, inplace=True):
"""Perform truncation based on reported Sensitivity and ">XXXX" values?"""
if not inplace:
cyDf = cyDf.copy()
maskDf = cyDf.copy()
for col in cyDf.columns:
mn = sensitivityS[col]
if not cyDf[col].dtype == dtype('float64'):
gtPresent = (cyDf[col].str.contains('>') == True)
if any(gtPresent):
mx = cyDf.loc[gtPresent, col].map(partial(_convertLevel, 0, None)).max()
else:
mx = None
maskDf.loc[:, col] = maskDf[col].map(partial(convertLevel, mn, mx, mask = True))
if truncateLevels:
cyDf.loc[:, col] = cyDf[col].map(partial(convertLevel, mn, mx))
else:
cyDf.loc[:, col] = cyDf[col].map(partial(convertLevel, mn, None))
return cyDf, maskDf
def tranformCytokines(cyDf, maskDf=None, performLog=True, halfLOD=True, discardCensored=False, inplace=True):
if maskDf is None:
maskDf = pd.DataFrame(np.zeros(cyDf.shape), index=cyDf.index, columns=cyDf.columns)
if not inplace:
cyDf = cyDf.copy()
if halfLOD:
"""Replace left-censored values with the LOD/2"""
tmpView = cyDf.values
tmpView[maskDf.values == 0] = tmpView[maskDf.values == 0]/2
if performLog:
"""Take the log of all cytokine concentrations"""
cyDf = np.log10(cyDf)
if discardCensored:
"""Force censored values to be Nan"""
tmpView = cyDf.values
tmpView[(maskDf.values == 1) | (maskDf.values == 0)] = np.nan
return cyDf