-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtwix_deidentification.py
407 lines (341 loc) · 17.1 KB
/
twix_deidentification.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @ Moritz Rempe, [email protected]
# Institute for Artifical Intelligence in Medicine,
# University Medicine Essen
import re
import struct
import random
from pathlib import Path
import logging
from typing import IO
from tqdm import tqdm
from glob import glob
import pandas as pd
import argparse
import shutil
import os
from datetime import datetime
logging.basicConfig(
encoding="utf-8", level=logging.DEBUG, format="%(levelname)s - %(message)s"
)
class TwixAnonymizer:
def __init__(
self,
filename: str,
save_path: str,
csv_path: str = None,
meta_only: bool = False,
) -> None:
"""
Anonymizes TWIX files by replacing sensitive information with placeholders.
Args:
filename (str): The path to the TWIX file to be anonymized.
save_path (str): The directory where the anonymized TWIX file will be saved.
csv_path (str, optional): The path to the CSV file where the matches will be written. Defaults to None.
meta_only (bool, optional): If True, only save the metadata, but do not write anonymized file. Defaults to False.
Attributes:
filename (str): The path to the TWIX file to be anonymized.
save_path (str): The directory where the anonymized TWIX file will be saved.
csv_path (str): The path to the CSV file where the matches will be written.
matches (dict): A dictionary of the matched values during anonymization.
meta_only (bool, optional): If True, only save the metadata, but do not write anonymized file. Defaults to False.
Methods:
read_and_anonymize: Reads a TWIX file, determines its type, and performs anonymization based on the file type.
write_csv: Writes a dictionary of matches to a CSV file.
anonymize_twix_header: Anonymizes the header string of a TWIX file by replacing sensitive information with placeholders.
anonymize_twix_vd: Anonymizes a TWIX VD file.
anonymize_twix_vb: Anonymizes a TWIX VB file.
"""
self.filename = filename
self.save_path = save_path
self.csv_path = csv_path
self.meta_only = meta_only
def read_and_anonymize(self) -> None:
"""
Reads the file, determines its type, and performs anonymization based on the file type.
This method reads the file specified by `self.filename` and determines its type by checking the first two uints in the header.
Based on the file type, it performs anonymization using the appropriate method (`anonymize_twix_vd` or `anonymize_twix_vb`).
The anonymized data is then written to a new file in the `self.save_path` directory.
If `self.meta_only` is True, only the metadata is anonymized and the anonymized data file is deleted.
Returns:
None
"""
with open(self.filename, "rb") as fin:
# we can tell the type of file from the first two uints in the header
first_uint, second_uint = struct.unpack("II", fin.read(8))
# reset the file pointer before giving to specific function
fin.seek(0)
with open(
Path(self.save_path, f"{str(random.randint(0, 10000))}.dat"), "wb"
) as fout:
if first_uint == 0 and second_uint <= 64:
self.filename, self.matches = self.anonymize_twix_vd(
fin, fout, meta_only=self.meta_only
)
else:
self.filename, self.matches = self.anonymize_twix_vb(
fin, fout, meta_only=self.meta_only
)
self.write_csv()
fout.close()
if self.meta_only:
os.remove(fout.name)
def write_csv(self) -> None:
"""
Write the matches to a CSV file.
This method takes the matches stored in the `self.matches` attribute and writes them to a CSV file.
If a `csv_path` is provided, the matches are appended to an existing CSV file or a new file is created.
If no `csv_path` is provided, the matches are written to a new CSV file with the same name as the input file.
Returns:
None
"""
anonymized_id = Path(self.filename).stem
self.matches = {"anonymized_id": anonymized_id, **self.matches}
if self.csv_path:
self.filename = self.csv_path
if Path(self.filename).is_file():
df = pd.DataFrame(self.matches, index=[0])
df_orig = pd.read_csv(self.csv_path, index_col=0)
df = pd.concat([df_orig, df], ignore_index=True)
df.to_csv(self.filename, mode="w")
else:
df = pd.DataFrame(self.matches, index=[0])
df.to_csv(self.filename, mode="w")
else:
df = pd.DataFrame(self.matches, index=[0])
df.to_csv(self.filename, mode="w")
@staticmethod
def _get_date(date_str: str) -> str:
"""
Converts a date string in the format "%d%m%y" to the format "%y%m%d".
Args:
date_str (str): The date string to be converted.
Returns:
str: The converted date string in the format "%Y-%m-%d".
"""
# Parse the string into a datetime object
date_obj = datetime.strptime(date_str, "%y%m%d")
# Format the datetime object into the desired format
formatted_date = date_obj.strftime("%Y-%m-%d")
return formatted_date
@staticmethod
def anonymize_twix_header(header_string: str) -> str | dict:
"""
Anonymizes the header string of a TWIX file by replacing sensitive information with placeholders.
Args:
header_string (str): The header string of the TWIX file.
Returns:
tuple: A tuple containing the anonymized header string and a dictionary of the matched values.
Credit:
This method was partially adapted from the original implementation by the authors of https://github.com/openmrslab/suspect/blob/master/suspect/io/twix.py
"""
number_buffer = {
"Patient_id": r"(<ParamString.\"PatientID\">\s*\{\s*\")(.+)(\"\s*\}\n)",
"Device_serial": r"(<ParamString.\"DeviceSerialNumber\">\s*\{\s*\")(.+)(\"\s*\}\n)",
"Exam_memory_uid": r"(<ParamString.\"ExamMemoryUID\">\s*\{\s*\")(.+)(\"\s*\}\n)",
"PatientLOID": r"(<ParamString.\"PatientLOID\">\s*\{\s*\")(.+)(\"\s*\}\n)",
"StudyLOID": r"(<ParamString.\"StudyLOID\">\s*\{\s*\")(.+)(\"\s*\}\n)",
"SeriesLOID": r"(<ParamString.\"SeriesLOID\">\s*\{\s*\")(.+)(\"\s*\}\n)",
"Study": r"(<ParamString.\"Study\">\s*\{\s*\")(.+)(\"\s*\}\n)",
"FrameOfReference": r"(<ParamString.\"FrameOfReference\">\s*\{\s*\")(.+)(\"\s*\}\n)",
"Patient": r"(<ParamString.\"Patient\">\s*\{\s*\")(.+)(\"\s*\}\n)",
"MeasUID": r"(<ParamString.\"MeasUID\">\s*\{\s*\")(.+)(\"\s*\}\n)",
}
x_buffer = {
"Patient_name": r"(<ParamString.\"t?Patients?Name\">\s*\{(\s*<Visible>\s*\"true\"\s*)?\s*\")(.+)(\"\s*\}\n)",
"InstitutionAddress": r"(<ParamString.\"InstitutionAddress\">\s*\{(\s*<Visible>\s*\"true\"\s*)?\s*\")(.+)(\"\s*\}\n)",
"InstitutionName": r"(<ParamString.\"InstitutionName\">\s*\{(\s*<Visible>\s*\"true\"\s*)?\s*\")(.+)(\"\s*\}\n)",
}
zero_buffer = {
"Patient_gender": r"(<ParamLong.\"l?PatientSex\">\s*\{(\s*<Visible>\s*\"true\"\s*)?\s*)(\d+)(\s*\}\n)",
"Patient_age": r"(<ParamDouble.\"flPatientAge\">\s*\{(\s*<Visible>\s*\"true\"\s*)?\s*<Precision> \d+\s*)(\d+\.\d*)(\s*\}\n)",
"Patient_weight": r"(<ParamDouble.\"flUsedPatientWeight\">\s*\{(\s*<Visible>\s*\"true\"\s*)?\s*<Precision> \d+\s*)(\d+\.\d*)(\s*\}\n)",
"Patient_height": r"(<ParamDouble.\"flPatientHeight\">\s*\{(\s*<Visible>\s*\"true\"\s*)?\s*<Unit> \"\[mm\]\"\s*<Precision> \d+\s*)(\d+\.\d*)(\s*\}\n)",
"Patient_birthday": r"(<ParamString.\"PatientBirthDay\">\s*\{(\s*<Visible>\s*\"true\"\s*)?\s*\")(\d{8})(\"\s*\}\n)",
"ulVersion": r"(<ParamLong.\"ulVersion\">\s*\{(\s*<Visible>\s*\"true\"\s*)?\s*)(\d+)(\s*\}\n)",
}
meta_buffer = {
"tBodyPartExamined": r"(<ParamString.\"tBodyPartExamined\">\s*\{\s*\")(.+)(\"\s*\}\n)",
"Sequence": r"(<ParamString.\"SequenceDescription\">\s*\{\s*\")(.+)(\"\s*\}\n)",
"TurboFactor": r"(<ParamLong.\"TurboFactor\">\s*\{\s*)(\d+)(\s*\}\n)",
"ReadoutOversamplingFactor": r"(<ParamDouble.\"ReadoutOversamplingFactor\">\s*\{\s*<Precision> \d+\s*)(\d+\.\d*)(\s*\}\n)",
"NSlc": r"(<ParamLong.\"NSlc\">\s*\{\s*)(\d+)(\s*\}\n)",
"PhaseEncodingLines": r"(<ParamLong.\"PhaseEncodingLines\">\s*\{\s*)(\d+)(\s*\}\n)",
"ReadFoV": r"(<ParamDouble.\"ReadFoV\">\s*\{\s*<Precision> \d+\s*)(\d+\.\d*)(\s*\}\n)",
"PhaseFoV": r"(<ParamDouble.\"PhaseFoV\">\s*\{\s*<Precision> \d+\s*)(\d+\.\d*)(\s*\}\n)",
"PhaseResolution": r"(<ParamDouble.\"PhaseResolution\">\s*\{\s*<Precision> \d+\s*)(\d+\.\d*)(\s*\}\n)",
"TR": r"(<ParamDouble.\"TR\">\s*\{\s*<Precision> \d+\s*)(\d+\.\d*)(\s*\}\n)",
"TI": r"(<ParamDouble.\"TI\">\s*\{\s*<Precision> \d+\s*)(\d+\.\d*)(\s*\}\n)",
"flMagneticFieldStrength": r"(<ParamDouble.\"flMagneticFieldStrength\">\s*\{\s*<Precision> \d+\s*)(\d+\.\d*)(\s*\}\n)",
"PatientPosition": r"(<ParamString.\"PatientPosition\">\s*\{\s*\")(.+)(\"\s*\}\n)",
}
matches = {}
frame_of_reference = re.search(
r"(<ParamString.\"FrameOfReference\"> { )(\".+\")( }\n)", header_string
).group(2)
exam_date_time = frame_of_reference.split(".")[10]
exam_date = exam_date_time[2:8]
matches["Exam_date"] = TwixAnonymizer._get_date(exam_date)
for key, buffer in number_buffer.items():
match = re.search(buffer, header_string)
if match:
matches[key] = match.group(2)
header_string = re.sub(
buffer,
lambda match: "".join(
(match.group(1), ("0" * (len(match.group(2)))), match.group(3))
),
header_string,
)
for key, buffer in zero_buffer.items():
match = re.search(buffer, header_string)
if match:
matches[key] = match.group(3)
header_string = re.sub(
buffer,
lambda match: "".join(
(
match.group(1),
re.sub(r"\d", "0", match.group(3)),
match.group(4),
)
),
header_string,
)
for key, buffer in x_buffer.items():
match = re.search(buffer, header_string)
matches[key] = match.group(3)
header_string = re.sub(
buffer,
lambda match: "".join(
(
match.group(1),
("x" * (len(match.group(3)))),
match.group(4),
)
),
header_string,
)
# Do not anonymize these buffers, but save them
for key, buffer in meta_buffer.items():
match = re.search(buffer, header_string)
if match:
matches[key] = match.group(2)
header_string = re.sub(
r"\"[\d\.]*{0}[\d\.]*\"".format(exam_date),
lambda match: re.sub(r"\w", "x", match.group()),
header_string,
)
return header_string, matches
@staticmethod
def anonymize_twix_vd(fin: IO, fout: IO, meta_only: bool = False) -> str | dict:
"""
Anonymizes a TWIX VD file.
Args:
fin (file): The input file object.
fout (file): The output file object.
meta_only (bool, optional): If True, only save the metadata, but do not write anonymized file. Defaults to False.
Returns:
Union[str, dict]: The name of the output file and a dictionary of matches found during anonymization.
Credit:
This method was adapted from the original implementation by the authors of https://github.com/openmrslab/suspect/blob/master/suspect/io/twix.py
"""
twix_id, num_measurements = struct.unpack("II", fin.read(8))
if not meta_only:
fout.write(struct.pack("II", twix_id, num_measurements))
for i in range(num_measurements):
fin.seek(8 + 152 * i)
meas_id, file_id, offset, length, patient_name, protocol_name = (
struct.unpack("IIQQ64s64s", fin.read(152))
)
anon_patient_name = ("x" * 64).encode("latin-1")
fin.seek(offset)
# read the header and anonymize it
header_size = struct.unpack("I", fin.read(4))[0]
header = fin.read(header_size - 4)
header_string = header[:-24].decode("latin-1")
anonymized_header, matches = TwixAnonymizer.anonymize_twix_header(
header_string=header_string
)
if not meta_only:
fout.seek(8 + 152 * i)
fout.write(
struct.pack(
"IIQQ64s64s",
meas_id,
file_id,
offset,
length,
anon_patient_name,
protocol_name,
)
)
fout.seek(offset)
fout.write(struct.pack("I", header_size))
fout.write(anonymized_header.encode("latin1"))
fout.write(header[-24:])
fout.write(fin.read(length - header_size))
return fout.name, matches
@staticmethod
def anonymize_twix_vb(fin: IO, fout: IO, meta_only: bool = False) -> str | dict:
"""
Anonymizes a TWIX VB file.
Args:
fin (file): The input file object.
fout (file): The output file object.
meta_only (bool, optional): If True, only save the metadata, but do not write anonymized file. Defaults to False.
Returns:
Union[str, dict]: The name of the output file and a dictionary of matches found during anonymization.
Credit:
This method was adapted from the original implementation by the authors of https://github.com/openmrslab/suspect/blob/master/suspect/io/twix.py
"""
# first four bytes are the size of the header
header_size = struct.unpack("I", fin.read(4))[0]
# read the rest of the header minus the four bytes we already read
header = fin.read(header_size - 4)
# last 24 bytes of the header contain non-strings
header_string = header[:-24].decode("latin-1")
anonymized_header, matches = TwixAnonymizer.anonymize_twix_header(
header_string=header_string
)
if not meta_only:
fout.write(struct.pack("I", header_size))
fout.write(anonymized_header.encode("latin-1"))
fout.write(header[-24:])
fout.write(fin.read())
return fout.name, matches
def anonymize_twix(input_path: str, save_path: str, meta_only: bool = False):
"""
Anonymizes TWIX files located at the given input path and saves the anonymized files at the specified save path.
Args:
input_path (str): The path to the TWIX file or directory containing TWIX files to be anonymized.
save_path (str): The path to save the anonymized files.
meta_only (bool, optional): If True, only save the metadata, but do not write anonymized file. Defaults to False.
Raises:
FileNotFoundError: If the input_path does not exist.
"""
assert Path(input_path).exists(), f"{input_path} does not exist."
os.makedirs(save_path, exist_ok=True)
if Path(input_path).is_dir():
files = [path for path in glob(f"{input_path}/*.dat")]
folder_len = len(files)
if meta_only:
logging.info(f"Only saving metadata! Not writing anonymized files.")
else:
logging.info(f"Will save anonymized files in {save_path}.")
logging.info(f"Anonymizing all files in {input_path}.")
logging.info(f"Total of {folder_len} files.")
csv_path = Path(save_path, f"{Path(input_path).name}.csv")
for filename in tqdm(
files, desc="Anonymizing files", total=folder_len, unit="files"
):
anonymizer = TwixAnonymizer(filename, save_path, csv_path, meta_only)
anonymizer.read_and_anonymize()
else:
logging.info(f"Anonymizing {input_path}.")
csv_path = Path(save_path, f"{Path(input_path).stem}.csv")
if meta_only:
logging.info(f"Only saving metadata! Not writing anonymized files.")
anonymizer = TwixAnonymizer(input_path, save_path, csv_path, meta_only)
anonymizer.read_and_anonymize()