From b3531d2a449e035f5a21e2b2186dc93fb88f2d59 Mon Sep 17 00:00:00 2001 From: Kai Zhang Date: Fri, 6 Dec 2024 19:39:57 +0800 Subject: [PATCH] change underlying representation of _values matrix --- snapatac2-core/src/feature_count/data_iter.rs | 114 +++++-- snapatac2-core/src/feature_count/matrix.rs | 11 +- snapatac2-core/src/feature_count/mod.rs | 2 +- snapatac2-core/src/preprocessing/import.rs | 74 +++-- snapatac2-core/src/preprocessing/qc.rs | 1 + .../python/snapatac2/preprocessing/_basic.py | 39 ++- snapatac2-python/src/preprocessing.rs | 308 ++++++++++++------ 7 files changed, 390 insertions(+), 159 deletions(-) diff --git a/snapatac2-core/src/feature_count/data_iter.rs b/snapatac2-core/src/feature_count/data_iter.rs index c3aa0ed4..981e4679 100644 --- a/snapatac2-core/src/feature_count/data_iter.rs +++ b/snapatac2-core/src/feature_count/data_iter.rs @@ -1,6 +1,6 @@ use crate::feature_count::{CountingStrategy, FeatureCounter}; use crate::genome::{ChromSizes, GenomeBaseIndex}; -use crate::preprocessing::Fragment; +use crate::preprocessing::{Fragment, SummaryType}; use anndata::backend::{DataType, ScalarType}; use anndata::data::DynCsrMatrix; @@ -499,6 +499,29 @@ pub struct BaseValue { float: f32, } +#[derive(Debug, Copy, Clone)] +pub enum ValueType { + Numerator, + Denominator, + Ratio, +} + +impl TryFrom for f32 { + type Error = anyhow::Error; + + fn try_from(x: BaseValue) -> Result { + Ok(x.value()) + } +} + +impl TryFrom for i32 { + type Error = anyhow::Error; + + fn try_from(x: BaseValue) -> Result { + x.to_i32().ok_or_else(|| anyhow::anyhow!("Cannot convert to i32")) + } +} + impl From<(&str, u64, f32)> for BaseValue { fn from(x: (&str, u64, f32)) -> Self { Self::from_float(x.0, x.1, x.2) @@ -554,6 +577,10 @@ impl BaseValue { pub fn value(&self) -> f32 { self.float } + + pub fn to_i32(&self) -> Option { + self.ratio.map(|x| ratio_to_i32(x)) + } } pub struct BaseData { @@ -652,15 +679,17 @@ where /// Output the raw coverage matrix. Note the values belong to the same interval /// will be aggregated by the mean value. - pub fn into_array_iter(self) -> impl ExactSizeIterator { + pub fn into_array_iter(self, val_ty: ValueType, summary_ty: SummaryType) -> impl ExactSizeIterator { fn helper<'a, T>( mat: CsrMatrix, + val_ty: ValueType, + summary_ty: SummaryType, exclude_chroms: &'a HashSet, ori_index: &'a GenomeBaseIndex, index: &'a GenomeBaseIndex, ) -> CsrMatrix where - T: Copy + Send + Sync + ToFloat, + T: Copy + Send + Sync + ValueGetter, { let row_offsets = mat.row_offsets(); let col_indices = mat.col_indices(); @@ -677,15 +706,18 @@ where if exclude_chroms.is_empty() || !exclude_chroms.contains(chrom) { let i = index.get_position_rev(chrom, pos); let entry = count.entry(i).or_insert(Vec::new()); - entry.push(values[k].to_float()); + entry.push(values[k].get_value(val_ty).unwrap()); } } count .into_iter() .map(|(k, v)| { - let len = v.len(); - let sum: f32 = v.into_iter().sum(); - (k, sum / len as f32) + let r = match summary_ty { + SummaryType::Mean => v.iter().sum::() / v.len() as f32, + SummaryType::Sum => v.iter().sum(), + SummaryType::Count => v.len() as f32, + }; + (k, r) }) .collect::>() }) @@ -702,6 +734,8 @@ where DataType::CsrMatrix(ScalarType::I32) => { helper( CsrMatrix::::try_from(mat).unwrap(), + val_ty, + summary_ty, &self.exclude_chroms, &ori_index, &index, @@ -710,6 +744,8 @@ where DataType::CsrMatrix(ScalarType::F32) => { helper( CsrMatrix::::try_from(mat).unwrap(), + val_ty, + summary_ty, &self.exclude_chroms, &ori_index, &index, @@ -726,6 +762,8 @@ where pub fn into_aggregated_array_iter( self, counter: C, + val_ty: ValueType, + summary_ty: SummaryType, ) -> impl ExactSizeIterator where C: FeatureCounter + Clone + Sync, @@ -733,11 +771,13 @@ where fn helper<'a, T, C>( data: CsrMatrix, counter: C, + val_ty: ValueType, + summary_ty: SummaryType, exclude_chroms: &'a HashSet, index: &'a GenomeBaseIndex, ) -> Vec> where - T: Copy + Send + Sync + ToFloat, + T: Copy + Send + Sync + ValueGetter, C: FeatureCounter + Clone + Sync, { (0..data.nrows()) @@ -753,13 +793,18 @@ where if exclude_chroms.is_empty() || !exclude_chroms.contains(chrom) { - coverage.insert(&GenomicRange::new(chrom, pos, pos + 1), val.to_float()); + coverage.insert(&GenomicRange::new(chrom, pos, pos + 1), val.get_value(val_ty).unwrap()); } }); - coverage - .get_values_and_counts() - .map(|(idx, (val, count))| (idx, val / count as f32)) - .collect::>() + match summary_ty { + SummaryType::Mean => coverage + .get_values_and_counts() + .map(|(idx, (val, count))| (idx, val / count as f32)) + .collect::>(), + SummaryType::Sum => coverage + .get_values(), + _ => unimplemented!("Unsupported summary type"), + } }) .collect::>() } @@ -771,6 +816,8 @@ where helper( CsrMatrix::::try_from(data).unwrap(), counter.clone(), + val_ty, + summary_ty, &self.exclude_chroms, &self.index, ) @@ -779,6 +826,8 @@ where helper( CsrMatrix::::try_from(data).unwrap(), counter.clone(), + val_ty, + summary_ty, &self.exclude_chroms, &self.index, ) @@ -903,27 +952,38 @@ fn i32_to_ratio(x: i32) -> Ratio { Ratio::new_raw(numer, denom) } -trait ToFloat { - fn to_float(self) -> f32; +trait ValueGetter { + fn get_value(self, ty: ValueType) -> Option; } -impl ToFloat for f32 { - fn to_float(self) -> f32 { - self +impl ValueGetter for f32 { + fn get_value(self, ty: ValueType) -> Option { + match ty { + ValueType::Denominator => None, + ValueType::Numerator => None, + ValueType::Ratio => Some(self), + } } } -impl ToFloat for i32 { - fn to_float(self) -> f32 { +impl ValueGetter for i32 { + fn get_value(self, ty: ValueType) -> Option { let numer = (self >> 16) as u16; let denom = (self & 0xffff) as u16; - if numer == 0 { - 0.0 - } else if denom == 0 { - 1.0 - } else { - numer as f32 / denom as f32 - } + let r = match ty { + ValueType::Denominator => denom as f32, + ValueType::Numerator => numer as f32, + ValueType::Ratio => { + if numer == 0 { + 0.0 + } else if denom == 0 { + 1.0 + } else { + numer as f32 / denom as f32 + } + }, + }; + Some(r) } } diff --git a/snapatac2-core/src/feature_count/matrix.rs b/snapatac2-core/src/feature_count/matrix.rs index 41680d87..5671af70 100644 --- a/snapatac2-core/src/feature_count/matrix.rs +++ b/snapatac2-core/src/feature_count/matrix.rs @@ -1,6 +1,8 @@ use super::counter::{CountingStrategy, FeatureCounter, GeneCount, RegionCounter, TranscriptCount}; +use super::ValueType; use crate::genome::{Promoters, Transcript}; use crate::feature_count::SnapData; +use crate::preprocessing::SummaryType; use anndata::{data::DataFrameIndex, AnnDataOp, ArrayData}; use anyhow::{bail, Result}; @@ -19,6 +21,7 @@ use polars::prelude::{DataFrame, NamedFrom, Series}; /// * `min_fragment_size` - The minimum fragment size. /// * `max_fragment_size` - The maximum fragment size. /// * `count_frag_as_reads` - Whether to treat fragments as reads during counting. +/// * `val_type` - Which kind of value to use: numerator, denominator or ratio. Only used for base data. /// * `out` - The output anndata object. pub fn create_tile_matrix( adata: &A, @@ -28,6 +31,8 @@ pub fn create_tile_matrix( min_fragment_size: Option, max_fragment_size: Option, counting_strategy: CountingStrategy, + val_type: ValueType, + summary_type: SummaryType, out: Option<&B>, ) -> Result<()> where @@ -67,7 +72,7 @@ where } feature_names = values.get_gindex().to_index().into(); - data_iter = Box::new(values.into_array_iter().map(|x| ArrayData::from(x.0))); + data_iter = Box::new(values.into_array_iter(val_type, summary_type).map(|x| ArrayData::from(x.0))); } else { bail!("No fragment or base data found in the anndata object"); }; @@ -92,6 +97,8 @@ pub fn create_peak_matrix( peaks: I, chunk_size: usize, counting_strategy: CountingStrategy, + val_type: ValueType, + summary_type: SummaryType, min_fragment_size: Option, max_fragment_size: Option, out: Option<&B>, @@ -140,7 +147,7 @@ where feature_names = counter.get_feature_ids(); data_iter = Box::new( values - .into_aggregated_array_iter(counter) + .into_aggregated_array_iter(counter, val_type, summary_type) .map(|x| x.0.into()), ); } else { diff --git a/snapatac2-core/src/feature_count/mod.rs b/snapatac2-core/src/feature_count/mod.rs index 5eaaeec0..2b32b6bf 100644 --- a/snapatac2-core/src/feature_count/mod.rs +++ b/snapatac2-core/src/feature_count/mod.rs @@ -7,7 +7,7 @@ use std::str::FromStr; use anyhow::{bail, Context, Result}; use anndata::{data::DynCsrMatrix, AnnData, AnnDataOp, AnnDataSet, ArrayElemOp, AxisArraysOp, Backend, ElemCollectionOp}; use bed_utils::bed::GenomicRange; -pub use data_iter::{BaseValue, ChromValueIter, BaseData, FragmentData, ContactData, FragmentDataIter}; +pub use data_iter::{ValueType, BaseValue, ChromValueIter, BaseData, FragmentData, ContactData, FragmentDataIter}; pub use counter::{FeatureCounter, CountingStrategy}; pub use matrix::{create_gene_matrix, create_tile_matrix, create_peak_matrix}; use nalgebra_sparse::CsrMatrix; diff --git a/snapatac2-core/src/preprocessing/import.rs b/snapatac2-core/src/preprocessing/import.rs index 51a48354..186fabea 100644 --- a/snapatac2-core/src/preprocessing/import.rs +++ b/snapatac2-core/src/preprocessing/import.rs @@ -3,6 +3,7 @@ use crate::genome::{ChromSizes, GenomeBaseIndex}; use crate::preprocessing::qc::{Contact, Fragment, FragmentQC, FragmentQCBuilder}; use super::qc::BaseValueQC; +use anndata::data::DynCsrMatrix; use anndata::{ data::array::utils::{from_csr_data, to_csr_data}, AnnDataOp, ArrayData, AxisArraysOp, ElemCollectionOp, @@ -323,6 +324,40 @@ where A: AnnDataOp, I: Iterator, { + fn helper + Send>( + chunk: Vec>, + genome_index: &GenomeBaseIndex, + genome_size: usize, + ) -> (Vec, CsrMatrix) { + let (qc, counts): (Vec<_>, Vec<_>) = chunk + .into_par_iter() + .map(|cell_data| { + let mut qc = BaseValueQC::new(); + let mut count = cell_data + .into_iter() + .flat_map(|value| { + let chrom = &value.chrom; + if genome_index.contain_chrom(chrom) { + qc.add(); + let pos = genome_index.get_position_rev(chrom, value.pos); + Some((pos, T::try_from(value).unwrap())) + } else { + None + } + }) + .collect::>(); + count.sort_by(|x, y| x.0.cmp(&y.0)); + (qc, count) + }) + .unzip(); + let (r, c, offset, ind, csr_data) = to_csr_data(counts, genome_size); + ( + qc, + CsrMatrix::try_from_csr_data(r, c, offset, ind, csr_data) + .unwrap() + ) + } + let spinner = ProgressBar::with_draw_target(None, ProgressDrawTarget::stderr_with_hz(1)) .with_style( ProgressStyle::with_template( @@ -344,38 +379,23 @@ where let arrays = chunked_values.into_iter().map(|chunk| { // Collect into vector for parallel processing let chunk: Vec> = chunk - .map(|(barcode, x)| { + .map(|(barcode, data)| { if !scanned_barcodes.insert(barcode.clone()) { panic!("Please sort fragment file by barcodes"); } - x.collect() + data.map(|x| x.1).collect() }) .collect(); - - let (qc, counts): (Vec<_>, Vec<_>) = chunk - .into_par_iter() - .map(|cell_data| { - let mut qc = BaseValueQC::new(); - let mut count = cell_data - .into_iter() - .flat_map(|(_, value)| { - let chrom = &value.chrom; - if genome_index.contain_chrom(chrom) { - qc.add(); - let pos = genome_index.get_position_rev(chrom, value.pos); - Some((pos, value.value())) - } else { - None - } - }) - .collect::>(); - count.sort_by(|x, y| x.0.cmp(&y.0)); - (qc, count) - }) - .unzip(); - qc_metrics.extend(qc); - let (r, c, offset, ind, csr_data) = to_csr_data(counts, genome_size); - CsrMatrix::try_from_csr_data(r, c, offset, ind, csr_data).unwrap() + let mat: DynCsrMatrix = if chunk[0][0].to_i32().is_some() { + let (qc, mat) = helper::(chunk, &genome_index, genome_size); + qc_metrics.extend(qc); + mat.into() + } else { + let (qc, mat) = helper::(chunk, &genome_index, genome_size); + qc_metrics.extend(qc); + mat.into() + }; + mat }); anndata.obsm().add_iter(BASE_VALUE, arrays)?; diff --git a/snapatac2-core/src/preprocessing/qc.rs b/snapatac2-core/src/preprocessing/qc.rs index 3b883a55..c63282fe 100644 --- a/snapatac2-core/src/preprocessing/qc.rs +++ b/snapatac2-core/src/preprocessing/qc.rs @@ -14,6 +14,7 @@ use crate::feature_count::{FragmentDataIter, SnapData}; pub type CellBarcode = String; +#[derive(Debug, Copy, Clone)] pub enum SummaryType { Sum, Count, diff --git a/snapatac2-python/python/snapatac2/preprocessing/_basic.py b/snapatac2-python/python/snapatac2/preprocessing/_basic.py index cf446a30..57bf2724 100644 --- a/snapatac2-python/python/snapatac2/preprocessing/_basic.py +++ b/snapatac2-python/python/snapatac2/preprocessing/_basic.py @@ -448,6 +448,8 @@ def add_tile_matrix( min_frag_size: int | None = None, max_frag_size: int | None = None, counting_strategy: Literal['fragment', 'insertion', 'paired-insertion'] = 'paired-insertion', + value_type: Literal['target', 'total', 'fraction'] = 'target', + summary_type: Literal['sum', 'mean'] = 'sum', file: Path | None = None, backend: Literal['hdf5'] = 'hdf5', n_jobs: int = 8, @@ -488,6 +490,18 @@ def add_tile_matrix( once if the pair of insertions of a fragment are both within the same region of interest [Miao24]_. Note that this parameter has no effect if input are single-end reads. + value_type + The type of value to use from `.obsm['_values']`, only available when + data is imported using :func:`~snapatac2.pp.import_values`. It must be one of the following: + "target", "total", or "fraction". "target" means the value is the number + of recrods that are with postive measurements, e.g., number of methylated bases. + "total" means the value is the total number of measurements, e.g., methylated bases plus + unmethylated bases. "fraction" means the value is the fraction of the + records that are positive, e.g., the fraction of methylated bases. + summary_type + The type of summary to use when multiple values are found in a bin. This parameter + is only used when `.obsm['_values']` exists, which is created by :func:`~snapatac2.pp.import_values`. + It must be one of the following: "sum" or "mean". file File name of the output file used to store the result. If provided, result will be saved to a backed AnnData, otherwise an in-memory AnnData is used. @@ -521,6 +535,9 @@ def add_tile_matrix( uns: 'reference_sequences' obsm: 'fragment_paired' """ + def fun(data, out): + internal.mk_tile_matrix(data, bin_size, chunk_size, counting_strategy, value_type, summary_type, exclude_chroms, min_frag_size, max_frag_size, out) + if isinstance(exclude_chroms, str): exclude_chroms = [exclude_chroms] @@ -528,11 +545,11 @@ def add_tile_matrix( if isinstance(adata, list): snapatac2._utils.anndata_par( adata, - lambda x: internal.mk_tile_matrix(x, bin_size, chunk_size, counting_strategy, exclude_chroms, None), + lambda x: fun(x, None), n_jobs=n_jobs, ) else: - internal.mk_tile_matrix(adata, bin_size, chunk_size, counting_strategy, exclude_chroms, min_frag_size, max_frag_size, None) + fun(adata, None) else: if file is None: if adata.isbacked: @@ -541,7 +558,7 @@ def add_tile_matrix( out = AnnData(obs=adata.obs[:]) else: out = internal.AnnData(filename=file, backend=backend, obs=adata.obs[:]) - internal.mk_tile_matrix(adata, bin_size, chunk_size, counting_strategy, exclude_chroms, min_frag_size, max_frag_size, out) + fun(adata, out) return out def make_peak_matrix( @@ -557,6 +574,8 @@ def make_peak_matrix( min_frag_size: int | None = None, max_frag_size: int | None = None, counting_strategy: Literal['fragment', 'insertion', 'paired-insertion'] = 'paired-insertion', + value_type: Literal['target', 'total', 'fraction'] = 'target', + summary_type: Literal['sum', 'mean'] = 'sum', ) -> internal.AnnData: """Generate cell by peak count matrix. @@ -604,6 +623,18 @@ def make_peak_matrix( once if the pair of insertions of a fragment are both within the same region of interest [Miao24]_. Note that this parameter has no effect if input are single-end reads. + value_type + The type of value to use from `.obsm['_values']`, only available when + data is imported using :func:`~snapatac2.pp.import_values`. It must be one of the following: + "target", "total", or "fraction". "target" means the value is the number + of recrods that are with postive measurements, e.g., number of methylated bases. + "total" means the value is the total number of measurements, e.g., methylated bases plus + unmethylated bases. "fraction" means the value is the fraction of the + records that are positive, e.g., the fraction of methylated bases. + summary_type + The type of summary to use when multiple values are found in a bin. This parameter + is only used when `.obsm['_values']` exists, which is created by :func:`~snapatac2.pp.import_values`. + It must be one of the following: "sum" or "mean". Returns ------- @@ -657,7 +688,7 @@ def make_peak_matrix( out = AnnData(obs=adata.obs[:]) else: out = internal.AnnData(filename=file, backend=backend, obs=adata.obs[:]) - internal.mk_peak_matrix(adata, peaks, chunk_size, use_x, counting_strategy, min_frag_size, max_frag_size, out) + internal.mk_peak_matrix(adata, peaks, chunk_size, use_x, counting_strategy, value_type, summary_type, min_frag_size, max_frag_size, out) return out def make_gene_matrix( diff --git a/snapatac2-python/src/preprocessing.rs b/snapatac2-python/src/preprocessing.rs index 5be65597..e9f00d91 100644 --- a/snapatac2-python/src/preprocessing.rs +++ b/snapatac2-python/src/preprocessing.rs @@ -1,26 +1,29 @@ use crate::utils::*; -use itertools::Itertools; -use pyo3::{prelude::*, pybacked::PyBackedStr}; use anndata::Backend; use anndata_hdf5::H5; +use anyhow::Result; +use bed_utils::extsort::ExternalSorterBuilder; +use bed_utils::{bed, bed::GenomicRange}; +use itertools::Itertools; use num::rational::Ratio; +use pyanndata::PyAnnData; +use pyo3::{prelude::*, pybacked::PyBackedStr}; +use snapatac2_core::feature_count::ValueType; +use snapatac2_core::preprocessing::SummaryType; use std::collections::HashMap; use std::io::{BufRead, BufReader}; use std::path::PathBuf; -use std::{str::FromStr, collections::BTreeMap, ops::Deref, collections::HashSet}; -use bed_utils::{bed, bed::GenomicRange}; -use bed_utils::extsort::ExternalSorterBuilder; -use pyanndata::PyAnnData; -use anyhow::Result; +use std::{collections::BTreeMap, collections::HashSet, ops::Deref, str::FromStr}; use snapatac2_core::{ - QualityControl, + feature_count::{ + create_gene_matrix, create_peak_matrix, create_tile_matrix, BaseValue, CountingStrategy, + }, genome::TranscriptParserOptions, - feature_count::{BaseValue, create_gene_matrix, create_tile_matrix, create_peak_matrix, CountingStrategy}, - preprocessing::{Fragment, Contact}, preprocessing, - utils, + preprocessing::{Contact, Fragment}, + utils, QualityControl, }; #[pyfunction] @@ -41,8 +44,7 @@ pub(crate) fn make_fragment_file( compression: Option<&str>, compression_level: Option, temp_dir: Option, -) -> Result> -{ +) -> Result> { fn parse_tag(tag: &str) -> [u8; 2] { let tag_b = tag.as_bytes(); if tag_b.len() == 2 { @@ -52,13 +54,28 @@ pub(crate) fn make_fragment_file( } } let (bam_qc, frag_qc) = preprocessing::make_fragment_file( - bam_file, output_file, is_paired, - barcode_tag.map(|x| parse_tag(x)), barcode_regex, - umi_tag.map(|x| parse_tag(x)), umi_regex, - shift_left, shift_right, mapq, chunk_size, source, mitochondrial_dna.map(|x| x.into_iter().collect()), - compression.map(|x| utils::Compression::from_str(x).unwrap()), compression_level, temp_dir, + bam_file, + output_file, + is_paired, + barcode_tag.map(|x| parse_tag(x)), + barcode_regex, + umi_tag.map(|x| parse_tag(x)), + umi_regex, + shift_left, + shift_right, + mapq, + chunk_size, + source, + mitochondrial_dna.map(|x| x.into_iter().collect()), + compression.map(|x| utils::Compression::from_str(x).unwrap()), + compression_level, + temp_dir, )?; - Ok(bam_qc.report().into_iter().chain(frag_qc.report()).collect()) + Ok(bam_qc + .report() + .into_iter() + .chain(frag_qc.report()) + .collect()) } #[pyfunction] @@ -74,8 +91,7 @@ pub(crate) fn import_fragments( chunk_size: usize, white_list: Option>, tempdir: Option, -) -> Result<()> -{ +) -> Result<()> { let mitochondrial_dna: HashSet = mitochondrial_dna.into_iter().collect(); let final_white_list = if fragment_is_sorted_by_name || min_num_fragment <= 0 { white_list @@ -84,21 +100,29 @@ pub(crate) fn import_fragments( bed::io::Reader::new( utils::open_file_for_read(&fragment_file), Some("#".to_string()), - ).into_records().map(Result::unwrap) + ) + .into_records() + .map(Result::unwrap), ); - let list: HashSet = barcode_count.drain().filter_map(|(k, v)| - if v >= min_num_fragment { Some(k) } else { None }).collect(); + let list: HashSet = barcode_count + .drain() + .filter_map(|(k, v)| if v >= min_num_fragment { Some(k) } else { None }) + .collect(); match white_list { None => Some(list), Some(x) => Some(list.intersection(&x).map(Clone::clone).collect()), } }; let chrom_sizes = chrom_size.into_iter().collect(); - let fragments = bed::io::Reader::new(utils::open_file_for_read(&fragment_file), Some("#".to_string())) - .into_records().map(|f| { - let mut f = f.unwrap(); - shift_fragment(&mut f, shift_left, shift_right); - f + let fragments = bed::io::Reader::new( + utils::open_file_for_read(&fragment_file), + Some("#".to_string()), + ) + .into_records() + .map(|f| { + let mut f = f.unwrap(); + shift_fragment(&mut f, shift_left, shift_right); + f }); let sorted_fragments: Box> = if !fragment_is_sorted_by_name { let mut sorter = ExternalSorterBuilder::new() @@ -107,11 +131,13 @@ pub(crate) fn import_fragments( if let Some(tmp) = tempdir { sorter = sorter.with_tmp_dir(tmp); } - Box::new(sorter - .build().unwrap() - .sort_by(fragments, |a, b| a.barcode.cmp(&b.barcode)) - .unwrap() - .map(Result::unwrap) + Box::new( + sorter + .build() + .unwrap() + .sort_by(fragments, |a, b| a.barcode.cmp(&b.barcode)) + .unwrap() + .map(Result::unwrap), ) } else { Box::new(fragments) @@ -120,15 +146,20 @@ pub(crate) fn import_fragments( macro_rules! run { ($data:expr) => { preprocessing::import_fragments( - $data, sorted_fragments, &mitochondrial_dna, &chrom_sizes, - final_white_list.as_ref(), min_num_fragment, chunk_size, + $data, + sorted_fragments, + &mitochondrial_dna, + &chrom_sizes, + final_white_list.as_ref(), + min_num_fragment, + chunk_size, )? }; } crate::with_anndata!(&anndata, run); Ok(()) -} +} fn shift_fragment(fragment: &mut Fragment, shift_left: i64, shift_right: i64) { if shift_left != 0 { @@ -151,11 +182,14 @@ pub(crate) fn import_contacts( bin_size: usize, chunk_size: usize, tempdir: Option, -) -> Result<()> -{ - let chrom_sizes = chrom_size.into_iter().map(|(chr, s)| GenomicRange::new(chr, 0, s)).collect(); +) -> Result<()> { + let chrom_sizes = chrom_size + .into_iter() + .map(|(chr, s)| GenomicRange::new(chr, 0, s)) + .collect(); - let contacts = BufReader::new(utils::open_file_for_read(&contact_file)).lines() + let contacts = BufReader::new(utils::open_file_for_read(&contact_file)) + .lines() .map(|r| Contact::from_str(&r.unwrap()).unwrap()); let sorted_contacts: Box> = if !fragment_is_sorted_by_name { let mut sorter = ExternalSorterBuilder::new() @@ -164,11 +198,13 @@ pub(crate) fn import_contacts( if let Some(tmp) = tempdir { sorter = sorter.with_tmp_dir(tmp); } - Box::new(sorter - .build().unwrap() - .sort_by(contacts, |a, b| a.barcode.cmp(&b.barcode)) - .unwrap() - .map(Result::unwrap) + Box::new( + sorter + .build() + .unwrap() + .sort_by(contacts, |a, b| a.barcode.cmp(&b.barcode)) + .unwrap() + .map(Result::unwrap), ) } else { Box::new(contacts) @@ -176,13 +212,19 @@ pub(crate) fn import_contacts( macro_rules! run { ($data:expr) => { - preprocessing::import_contacts($data, sorted_contacts, &chrom_sizes, bin_size, chunk_size)? + preprocessing::import_contacts( + $data, + sorted_contacts, + &chrom_sizes, + bin_size, + chunk_size, + )? }; } crate::with_anndata!(&anndata, run); Ok(()) -} +} #[pyfunction] pub(crate) fn import_values( @@ -190,8 +232,7 @@ pub(crate) fn import_values( input_dir: PathBuf, chrom_size: BTreeMap, chunk_size: usize, -) -> Result<()> -{ +) -> Result<()> { fn read_chrom_values(path: PathBuf) -> impl Iterator { let barcode = path.file_stem().unwrap().to_str().unwrap().to_string(); let reader = BufReader::new(utils::open_file_for_read(&path)); @@ -202,7 +243,8 @@ pub(crate) fn import_values( let pos = parts.next().unwrap().parse().unwrap(); let methyl = parts.next().unwrap().parse().unwrap(); let unmethyl: u16 = parts.next().unwrap().parse().unwrap(); - let value = BaseValue::from_ratio(chrom, pos, Ratio::new_raw(methyl, unmethyl + methyl)); + let value = + BaseValue::from_ratio(chrom, pos, Ratio::new_raw(methyl, unmethyl + methyl)); (barcode.clone(), value) }) } @@ -220,14 +262,16 @@ pub(crate) fn import_values( crate::with_anndata!(&anndata, run); Ok(()) -} +} fn parse_strategy(strategy: &str) -> CountingStrategy { match strategy { "insertion" => CountingStrategy::Insertion, "fragment" => CountingStrategy::Fragment, "paired-insertion" => CountingStrategy::PIC, - _ => panic!("Counting strategy must be one of 'insertion', 'fragment', or 'paired-insertion'"), + _ => panic!( + "Counting strategy must be one of 'insertion', 'fragment', or 'paired-insertion'" + ), } } @@ -237,13 +281,15 @@ pub(crate) fn mk_tile_matrix( bin_size: usize, chunk_size: usize, strategy: &str, + val_type: &str, + summuary_type: &str, exclude_chroms: Option>, min_fragment_size: Option, max_fragment_size: Option, - out: Option -) -> Result<()> -{ - let exclude_chroms = exclude_chroms.as_ref() + out: Option, +) -> Result<()> { + let exclude_chroms = exclude_chroms + .as_ref() .map(|s| s.iter().map(|x| x.as_ref()).collect::>()); macro_rules! run { ($data:expr) => { @@ -258,7 +304,9 @@ pub(crate) fn mk_tile_matrix( min_fragment_size, max_fragment_size, parse_strategy(strategy), - Some($out_data) + str_to_value_type(val_type), + str_to_summary_type(summuary_type), + Some($out_data), )? }; } @@ -272,7 +320,9 @@ pub(crate) fn mk_tile_matrix( min_fragment_size, max_fragment_size, parse_strategy(strategy), - None::<&PyAnnData> + str_to_value_type(val_type), + str_to_summary_type(summuary_type), + None::<&PyAnnData>, )?; } }; @@ -282,6 +332,24 @@ pub(crate) fn mk_tile_matrix( Ok(()) } +fn str_to_value_type(ty: &str) -> ValueType { + match ty { + "target" => ValueType::Numerator, + "total" => ValueType::Denominator, + "fraction" => ValueType::Ratio, + _ => panic!("Value type must be one of 'target', 'total', or 'fraction'"), + } +} + +fn str_to_summary_type(ty: &str) -> SummaryType { + match ty { + "sum" => SummaryType::Sum, + "mean" => SummaryType::Mean, + "count" => SummaryType::Count, + _ => panic!("Summary type must be one of 'sum', 'mean', or 'count'"), + } +} + #[pyfunction] pub(crate) fn mk_peak_matrix( anndata: AnnDataLike, @@ -289,12 +357,14 @@ pub(crate) fn mk_peak_matrix( chunk_size: usize, use_x: bool, strategy: &str, + val_type: &str, + summuary_type: &str, min_fragment_size: Option, max_fragment_size: Option, out: Option, -) -> Result<()> -{ - let peaks = peaks.iter()? +) -> Result<()> { + let peaks = peaks + .iter()? .map(|x| GenomicRange::from_str(x.unwrap().extract().unwrap()).unwrap()); let strategy = parse_strategy(strategy); @@ -303,16 +373,36 @@ pub(crate) fn mk_peak_matrix( if let Some(out) = out { macro_rules! run2 { ($out_data:expr) => { - create_peak_matrix($data, peaks, chunk_size, strategy, - min_fragment_size, max_fragment_size, Some($out_data), use_x)? + create_peak_matrix( + $data, + peaks, + chunk_size, + strategy, + str_to_value_type(val_type), + str_to_summary_type(summuary_type), + min_fragment_size, + max_fragment_size, + Some($out_data), + use_x, + )? }; } crate::with_anndata!(&out, run2); } else { - create_peak_matrix($data, peaks, chunk_size, strategy, - min_fragment_size, max_fragment_size, None::<&PyAnnData>, use_x)?; + create_peak_matrix( + $data, + peaks, + chunk_size, + strategy, + str_to_value_type(val_type), + str_to_summary_type(summuary_type), + min_fragment_size, + max_fragment_size, + None::<&PyAnnData>, + use_x, + )?; } - } + }; } crate::with_anndata!(&anndata, run); Ok(()) @@ -336,8 +426,7 @@ pub(crate) fn mk_gene_matrix( min_fragment_size: Option, max_fragment_size: Option, out: Option, -) -> Result<()> -{ +) -> Result<()> { let options = TranscriptParserOptions { transcript_name_key, transcript_id_key, @@ -351,18 +440,40 @@ pub(crate) fn mk_gene_matrix( if let Some(out) = out { macro_rules! run2 { ($out_data:expr) => { - create_gene_matrix($data, transcripts, id_type, - upstream, downstream, include_gene_body, chunk_size, strategy, - min_fragment_size, max_fragment_size, Some($out_data), use_x)? + create_gene_matrix( + $data, + transcripts, + id_type, + upstream, + downstream, + include_gene_body, + chunk_size, + strategy, + min_fragment_size, + max_fragment_size, + Some($out_data), + use_x, + )? }; } crate::with_anndata!(&out, run2); } else { - create_gene_matrix($data, transcripts, id_type, - upstream, downstream, include_gene_body, chunk_size, strategy, - min_fragment_size, max_fragment_size, None::<&PyAnnData>, use_x)?; + create_gene_matrix( + $data, + transcripts, + id_type, + upstream, + downstream, + include_gene_body, + chunk_size, + strategy, + min_fragment_size, + max_fragment_size, + None::<&PyAnnData>, + use_x, + )?; } - } + }; } crate::with_anndata!(&anndata, run); Ok(()) @@ -376,11 +487,10 @@ pub(crate) fn tss_enrichment<'py>( anndata: AnnDataLike, gtf_file: PathBuf, exclude_chroms: Option>, -) -> Result> -{ +) -> Result> { let exclude_chroms = match exclude_chroms { Some(chrs) => chrs.into_iter().collect(), - None => HashSet::new(), + None => HashSet::new(), }; let tss = preprocessing::read_tss(utils::open_file_for_read(gtf_file)) .unique() @@ -390,7 +500,7 @@ pub(crate) fn tss_enrichment<'py>( macro_rules! run { ($data:expr) => { $data.tss_enrichment(&promoters) - } + }; } let (scores, tsse) = crate::with_anndata!(&anndata, run)?; let library_tsse = tsse.result(); @@ -408,36 +518,39 @@ pub(crate) fn add_frip( regions: BTreeMap>, normalized: bool, count_as_insertion: bool, -) -> Result>> -{ - let trees: Vec<_> = regions.values().map(|x| - x.into_iter().map(|y| (GenomicRange::from_str(y).unwrap(), ())).collect() - ).collect(); +) -> Result>> { + let trees: Vec<_> = regions + .values() + .map(|x| { + x.into_iter() + .map(|y| (GenomicRange::from_str(y).unwrap(), ())) + .collect() + }) + .collect(); macro_rules! run { ($data:expr) => { $data.frac_read_in_region(&trees, normalized, count_as_insertion) - } + }; } let frip = crate::with_anndata!(&anndata, run)?; - Ok( - regions.keys().zip(frip.columns()) - .map(|(k, v)| (k.clone(), v.to_vec())) - .collect() - ) + Ok(regions + .keys() + .zip(frip.columns()) + .map(|(k, v)| (k.clone(), v.to_vec())) + .collect()) } #[pyfunction] pub(crate) fn fragment_size_distribution( anndata: AnnDataLike, max_recorded_size: usize, -) -> Result> -{ +) -> Result> { macro_rules! run { ($data:expr) => { $data.fragment_size_distribution(max_recorded_size) - } + }; } crate::with_anndata!(&anndata, run) @@ -447,8 +560,7 @@ pub(crate) fn fragment_size_distribution( pub(crate) fn summary_by_chrom( anndata: AnnDataLike, mode: &str, -) -> Result>> -{ +) -> Result>> { let mode = match mode { "sum" => preprocessing::SummaryType::Sum, "mean" => preprocessing::SummaryType::Mean, @@ -458,8 +570,8 @@ pub(crate) fn summary_by_chrom( macro_rules! run { ($data:expr) => { $data.summary_by_chrom(mode) - } + }; } crate::with_anndata!(&anndata, run) -} \ No newline at end of file +}