-
Notifications
You must be signed in to change notification settings - Fork 0
/
h5_to_mtx.py
99 lines (84 loc) · 3.13 KB
/
h5_to_mtx.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
"""
Convert a 10xgenomics HDF5 matrix to MTX.
https://support.10xgenomics.com/single-cell-gene-expression/software/pipelines/latest/advanced/h5_matrices
http://scipy-lectures.org/advanced/scipy_sparse/csc_matrix.html
https://math.nist.gov/MatrixMarket/formats.html
"""
from itertools import chain
import logging
from _pathlib import Path
from typing import (
Any,
Iterable,
Tuple,
)
from dataclasses import (
dataclass,
fields,
)
import h5py
from h5py import (
Dataset,
Group,
)
from more_itertools import (
one,
pairwise,
)
from csv2mtx import write_gzip_file
log = logging.getLogger(__name__)
@dataclass
class Matrix:
file_path: Path
name: str
barcodes: Dataset
data: Dataset
gene_names: Dataset
genes: Dataset
indices: Dataset
indptr: Dataset
shape: Dataset
@classmethod
def from_group(cls, file_path: Path, group: Group) -> 'Matrix':
return cls(file_path, group.name, *(group[f.name] for f in fields(cls)[2:]))
@property
def rows(self) -> int:
return self.shape[0]
@property
def columns(self) -> int:
return self.shape[1]
@property
def cells(self) -> int:
return len(self.data)
def to_mtx(self, output_dir: Path):
assert self.rows == len(self.genes)
assert self.columns == len(self.barcodes)
assert len(self.data) == len(self.indices)
genes = chain(['featurekey'], map(bytes.decode, self.genes))
gene_names = chain(['featurename'], map(bytes.decode, self.gene_names))
write_gzip_file(output_dir / 'genes.tsv.gz', list('\t'.join(row) for row in zip(genes, gene_names)))
write_gzip_file(output_dir / 'barcodes.tsv.gz', chain(['barcodes'], map(bytes.decode, self.barcodes)))
log.info('Reading matrix data from %s ...', self.file_path)
write_gzip_file(output_dir / 'matrix.mtx.gz', chain([
'%%MatrixMarket matrix coordinate integer general',
f'{self.rows} {self.columns} {self.cells}'
], self._mtx_lines()))
def _mtx_lines(self) -> Iterable[str]:
# Sorting shouldn't be necessary since the MatrixMarket format does not
# dictate an ordering of lines in the .mtx but most of the .mtx files
# I've seen are sorted by row index first and column index second. If
# we didn't sort, we'd get the transposed ordering by column first and
# row second. This is because the HDF5 matrices are column-oriented.
return (' '.join(map(str, t)) for t in sorted(self._mtx_tuples()))
def _mtx_tuples(self) -> Iterable[Tuple[int, int, Any]]:
for column, (start, end) in enumerate(pairwise(self.indptr)):
data = self.data[start:end]
rows = self.indices[start:end]
for row, value in zip(rows, data):
yield row + 1, column + 1, value
def convert_h5_to_mtx(input_file: Path, output_dir: Path) -> None:
with h5py.File(str(input_file), mode='r') as h5:
group = one(h5.values())
m = Matrix.from_group(input_file, group)
output_dir.mkdir(parents=True, exist_ok=True) # FIXME: move to convert_matrices.py
m.to_mtx(output_dir)