forked from ychen306/FHIR-Genomics
-
Notifications
You must be signed in to change notification settings - Fork 0
/
fhir_parser.py
232 lines (190 loc) · 8.25 KB
/
fhir_parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# -*- coding: utf-8 -*-
import re
import os
import json
from fhir_spec import SPECS
# TODO: support parsing path wild card path
# e.g. Extension.value[x]
DATE_RE = re.compile(r'-?([1-9][0-9]{3}|0[0-9]{3})(-(0[1-9]|1[0-2])(-(0[1-9]|[12][0-9]|3[01]))?)?')
DATETIME_RE = re.compile(r'-?([1-9][0-9]{3}|0[0-9]{3})(-(0[1-9]|1[0-2])(-(0[1-9]|[12][0-9]|3[01])(T(([01][0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9](\.[0-9]+)?|(24:00:00(\.0+)?))(Z|(\+|-)((0[0-9]|1[0-3]):[0-5][0-9]|14:00))?)?)?)?')
ID_RE = re.compile(r'[a-z0-9\-\.]{1,36}')
INSTANT_RE = re.compile(r'[1-9][0-9]{3}-.+T[^.]+(Z|[+-].+)')
OID_RE = re.compile(r'urn:oid:\d+\.\d+\.\d+\.\d+')
UUID_RE = re.compile(r'urn:uuid:[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}')
URI_RE = re.compile(r'''(?i)\b((?:[a-z][\w-]+:(?:/{1,3}|[a-z0-9%])|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:'".,<>?«»“”‘’]))''')
def validate_by_regex(regex):
return lambda data: regex.match(str(data)) is not None
def validate_by_instance(datatype):
return lambda data: isinstance(data, datatype)
FHIR_PRIMITIVE_VALIDATORS = {
'base64Binary': validate_by_instance(basestring),
'boolean': validate_by_instance(bool),
'date': validate_by_regex(DATE_RE),
'dateTime': validate_by_regex(DATETIME_RE),
'decimal': validate_by_instance(float),
'id': validate_by_regex(ID_RE),
'instant': validate_by_regex(INSTANT_RE),
'integer': validate_by_instance(int),
'oid': validate_by_regex(OID_RE),
'string': validate_by_instance(basestring),
'uri': validate_by_regex(URI_RE),
'uuid': validate_by_regex(UUID_RE),
}
FHIR_PRIMITIVE_INIT = {
'boolean': lambda bl: bl == 'true',
'decimal': float,
'integer': int
}
ASSESED_TRAIT_EXTENSION_URL = 'http://genomics.smartplatforms.org/dictionary/GeneticObservation#assessedCondition'
ASSESSED_TRAIT_SPEC = {
'type': 'reference',
'name': 'assessed-condition'
}
def get_assessed_condition(observation, correctable):
'''
extract assesed condition from a Observation extended with "GeneticObservation"
'''
for extension in observation.get('extension', []):
if extension.get('url') == ASSESED_TRAIT_EXTENSION_URL:
condition_ref = extension.get('valueReference')
if isinstance(condition_ref, dict):
valid, _ = parse('ResourceReference', condition_ref, correctable)
if valid:
return condition_ref
def parse(datatype, data, correctible):
'''
walk through a complex datatype or a resource and collect elements bound to search params
'''
search_elements = []
if datatype in SPECS:
elements = [FHIRElement(element_spec, correctible)
for element_spec in SPECS[datatype]['elements']]
search_elements = [element.get_search_elements()
for element in elements if element.validate(data)]
if len(elements) != len(search_elements):
return False, None
search_elements = filter(lambda x: x.get('spec') is not None,
search_elements)
# extract element for SMART Genomics' custom search param - assesed-condition
if datatype == 'Observation':
condition = get_assessed_condition(data, correctible)
customed_search_param = {
'spec': ASSESSED_TRAIT_SPEC,
'elements': []}
if condition is not None:
customed_search_param['elements'].append(condition)
search_elements.append(customed_search_param)
return True, search_elements
def parse_resource(resource_type, resource, correctible=False):
'''
parse a resource
with `correctible` being `True` the validate function will try to make an invalid resource valid if possible.
i.e. making changes such as "1" -> 1, {'a': 1} -> [{'a', 1}] to fit the profile description
'''
if resource.get('resourceType') == resource_type:
return parse(resource_type, resource, correctible)
return False, None
def correct_element(element, element_types):
for et in element_types:
if et in FHIR_PRIMITIVE_INIT:
try:
return FHIR_PRIMITIVE_INIT[et](element)
except:
pass
class FHIRElement(object):
def __init__(self, spec, correctible):
self.correctible = correctible
self.path = spec['path']
self.elem_types = []
if 'type' in spec['definition']:
self.elem_types = [_type['code']
for _type in spec['definition']['type']]
self.min_occurs = spec['definition']['min']
self.max_occurs = spec['definition']['max']
self.search_spec = spec.get('searchParam')
self.search_elements = []
def _push_ancestors(self, jsondict, path_elems, elem_ancestors):
cur_key = path_elems[0]
if cur_key not in jsondict:
return
val = jsondict[cur_key]
if isinstance(val, dict):
elem_ancestors.append((val, path_elems[1:]))
else:
elem_ancestors.extend(
[(ancestor, path_elems[1:]) for ancestor in val])
def get_search_elements(self):
return {'spec': self.search_spec, 'elements': self.search_elements}
def validate(self, data):
path_elems = self.path.split('.')
if len(path_elems) == 1:
return True
elem_name = path_elems[-1]
path_elems = path_elems[1:-1]
elem_parents = []
elem_ancestors = []
if len(path_elems) == 0:
elem_parents = [data]
else:
self._push_ancestors(data, path_elems, elem_ancestors)
while len(elem_ancestors) > 0:
ancestor, ancestor_path = elem_ancestors.pop()
if len(ancestor_path) == 0:
elem_parents.append(ancestor)
else:
self._push_ancestors(ancestor, path_elems, elem_ancestors)
for parent in elem_parents:
if not isinstance(parent, dict):
return False
elem = parent.get(elem_name)
if elem is None:
if self.min_occurs > 0:
return False
continue
if isinstance(elem, list):
if self.max_occurs != "*":
return False
elems = elem
for i, elem in enumerate(elems):
if not self.validate_elem(elem):
if not self.correctible:
return False
corrected = correct_element(elem, self.elem_types)
if corrected is not None:
elems[i] = corrected
return False
elif self.max_occurs == '*' and not self.correctible:
return False
elif not self.validate_elem(elem):
if not self.correctible:
return False
corrected = correct_element(elem, self.elem_types)
if corrected is not None:
if self.max_occurs == '*':
parent[elem_name] = [corrected]
else:
parent[elem_name] = corrected
else:
return False
elif self.max_occurs == '*':
# in this case, the elem itself is correct, with a cardinality
# or '*' but stored as a single item
parent[elem_name] = [elem]
return True
def validate_elem(self, elem):
for elem_type in self.elem_types:
if elem_type in FHIR_PRIMITIVE_VALIDATORS:
validate_func = FHIR_PRIMITIVE_VALIDATORS[elem_type]
if not validate_func(elem):
return False
else:
continue
elif elem_type == 'Resource' and 'resourceType' in elem:
elem_type = elem.resourceType
# type of the element is a complex type
valid, _ = parse(elem_type, elem, self.correctible)
if not valid:
return False
if self.search_spec is not None:
self.search_elements.append(elem)
return True