Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add functions to check struct and array missingness #738

Merged
merged 27 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions gnomad/assessment/validity_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1211,3 +1211,120 @@ def count_vep_annotated_variants_per_interval(
)

return interval_ht


def check_missingness_of_struct(
struct_expr: hl.expr.StructExpression, prefix: str = ""
) -> Dict[str, Any]:
"""
Recursively check the fraction of missing values of all fields within a StructExpression.
klaricch marked this conversation as resolved.
Show resolved Hide resolved

Either a standalone or nested struct can be provided. If the struct contains an array (or set) of values, the array
will be considered missing if it is NA, an empty array, or only has missing elements.

:param struct_expr: StructExpression for which to check for missing values.
:param prefix: Prefix to append to names of struct fields within the struct_expr.
:return: Dictionary mapping field names to their missingness fraction expressions, with nested dictionaries representing any nested structs.
"""
if isinstance(struct_expr, hl.expr.StructExpression):
klaricch marked this conversation as resolved.
Show resolved Hide resolved
return {
f"{prefix}.{key}": check_missingness_of_struct(
struct_expr[key], f"{prefix}.{key}"
)
for key in struct_expr.keys()
}
elif isinstance(struct_expr, (hl.expr.ArrayExpression, hl.expr.SetExpression)):
# Count array/set as missing if it is NA, an empty array/set, or only has missing
# elements.
return hl.agg.fraction(
hl.or_else(struct_expr.all(lambda x: hl.is_missing(x)), True)
klaricch marked this conversation as resolved.
Show resolved Hide resolved
)
else:
return hl.agg.fraction(hl.is_missing(struct_expr))


def flatten_missingness_struct(
missingness_struct: hl.expr.StructExpression,
) -> Dict[str, float]:
"""
Recursively flatten and evaluate nested dictionaries of missingness within a Struct.

:param missingness_struct: Struct containing dictionaries of missingness values.
:return: Dictionary with field names as keys and their evaluated missingness fractions as values.
"""
missingness_dict = {}
for key, value in missingness_struct.items():
# Recursively check nested missingness dictionaries and flatten if needed.
if isinstance(value, dict):
missingness_dict.update(flatten_missingness_struct(value))
else:
missingness_dict[key] = hl.eval(value)
return missingness_dict


def unfurl_array_annotations(
ht: hl.Table, indexed_array_annotations: Dict[str, str]
) -> Dict[str, Any]:
"""
Unfurl specified arrays of structs into a dictionary of flattened expressions.

Array annotations must have a corresponding dictionary to define the indices for each array field.
Example: indexed_array_annotations = {"freq": "freq_index_dict"}, where 'freq' is structured as array<struct{AC: int32, AF: float64, AN: int32, homozygote_count: int64} and 'freq_index_dict' is defined as {'adj': 0, 'raw': 1}.
klaricch marked this conversation as resolved.
Show resolved Hide resolved
The keys of indexed_array_annotations should be present in the Table as row annotations, whereas the values should be present as global annotations.
klaricch marked this conversation as resolved.
Show resolved Hide resolved

:param ht: Input Table.
:param indexed_array_annotations: Dictionary mapping array field names to their corresponding index dictionaries, which define the indices for each array field. Default is {'faf': 'faf_index_dict', 'freq': 'freq_index_dict'}.
:return: Flattened dictionary of unfurled array annotations.
"""
expr_dict = {}

# For each specified array, unfurl the array elements and their structs
# into expr_dict.
for array, array_index_dict in indexed_array_annotations.items():
# Check for presence of array in the Table rows and the array index in the
# globals.
if array not in ht.row:
raise ValueError(f"Annotation '{array}' not found in the Table rows.")
if array_index_dict not in ht.globals:
raise ValueError(
f"Annotation '{array_index_dict}' not found in the Table globals."
)

# Evaluate the index dictionary for the specified array.
array_index_dict = hl.eval(ht[array_index_dict])

# Unfurl the array elements and structs into the expression dictionary.
for k, i in array_index_dict.items():
for f in ht[array][0].keys():
expr_dict[f"{f}_{k}"] = ht[array][i][f]

return expr_dict


def check_array_struct_missingness(
ht: hl.Table,
indexed_array_annotations: Dict[str, str] = {
"faf": "faf_index_dict",
"freq": "freq_index_dict",
},
) -> hl.expr.StructExpression:
"""
Check the missingness of all fields in an array of structs.

Iterates over arrays of structs and calculates the percentage of missing values for each element of the array and each struct. Array annotations must have a corresponding dictionary to define the indices for each array field.
Example: indexed_array_annotations = {"freq": "freq_index_dict"}, where 'freq' is structured as array<struct{AC: int32, AF: float64, AN: int32, homozygote_count: int64} and 'freq_index_dict' is defined as {'adj': 0, 'raw': 1}.

:param ht: Input Table.
:param indexed_array_annotations: A dictionary mapping array field names to their corresponding index dictionaries, which define the indices for each array field. Default is {'faf': 'faf_index_dict', 'freq': 'freq_index_dict'}.
:return: A Struct where each field represents a struct field's missingness percentage across the Table for each element of the specified arrays.
"""
# Create row annotations for each element of the arrays and their structs.
annotations = unfurl_array_annotations(ht, indexed_array_annotations)
ht = ht.annotate(**annotations)

# Compute missingness for each of the newly created row annotations.
missingness_dict = {
field_name: hl.agg.fraction(hl.is_missing(ht[field_name]))
for field_name in annotations.keys()
}
return ht.aggregate(hl.struct(**missingness_dict))
Loading
Loading