Skip to content

Commit

Permalink
change underlying representation of _values matrix
Browse files Browse the repository at this point in the history
  • Loading branch information
kaizhang committed Dec 6, 2024
1 parent 99bfcc5 commit b3531d2
Show file tree
Hide file tree
Showing 7 changed files with 390 additions and 159 deletions.
114 changes: 87 additions & 27 deletions snapatac2-core/src/feature_count/data_iter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::feature_count::{CountingStrategy, FeatureCounter};
use crate::genome::{ChromSizes, GenomeBaseIndex};
use crate::preprocessing::Fragment;
use crate::preprocessing::{Fragment, SummaryType};

use anndata::backend::{DataType, ScalarType};
use anndata::data::DynCsrMatrix;
Expand Down Expand Up @@ -499,6 +499,29 @@ pub struct BaseValue {
float: f32,
}

#[derive(Debug, Copy, Clone)]
pub enum ValueType {
Numerator,
Denominator,
Ratio,
}

impl TryFrom<BaseValue> for f32 {
type Error = anyhow::Error;

fn try_from(x: BaseValue) -> Result<Self, Self::Error> {
Ok(x.value())
}
}

impl TryFrom<BaseValue> for i32 {
type Error = anyhow::Error;

fn try_from(x: BaseValue) -> Result<Self, Self::Error> {
x.to_i32().ok_or_else(|| anyhow::anyhow!("Cannot convert to i32"))
}
}

impl From<(&str, u64, f32)> for BaseValue {
fn from(x: (&str, u64, f32)) -> Self {
Self::from_float(x.0, x.1, x.2)
Expand Down Expand Up @@ -554,6 +577,10 @@ impl BaseValue {
pub fn value(&self) -> f32 {
self.float
}

pub fn to_i32(&self) -> Option<i32> {
self.ratio.map(|x| ratio_to_i32(x))
}
}

pub struct BaseData<I> {
Expand Down Expand Up @@ -652,15 +679,17 @@ where

/// Output the raw coverage matrix. Note the values belong to the same interval
/// will be aggregated by the mean value.
pub fn into_array_iter(self) -> impl ExactSizeIterator<Item = (ArrayData, usize, usize)> {
pub fn into_array_iter(self, val_ty: ValueType, summary_ty: SummaryType) -> impl ExactSizeIterator<Item = (ArrayData, usize, usize)> {
fn helper<'a, T>(
mat: CsrMatrix<T>,
val_ty: ValueType,
summary_ty: SummaryType,
exclude_chroms: &'a HashSet<String>,
ori_index: &'a GenomeBaseIndex,
index: &'a GenomeBaseIndex,
) -> CsrMatrix<f32>
where
T: Copy + Send + Sync + ToFloat,
T: Copy + Send + Sync + ValueGetter,
{
let row_offsets = mat.row_offsets();
let col_indices = mat.col_indices();
Expand All @@ -677,15 +706,18 @@ where
if exclude_chroms.is_empty() || !exclude_chroms.contains(chrom) {
let i = index.get_position_rev(chrom, pos);
let entry = count.entry(i).or_insert(Vec::new());
entry.push(values[k].to_float());
entry.push(values[k].get_value(val_ty).unwrap());
}
}
count
.into_iter()
.map(|(k, v)| {
let len = v.len();
let sum: f32 = v.into_iter().sum();
(k, sum / len as f32)
let r = match summary_ty {
SummaryType::Mean => v.iter().sum::<f32>() / v.len() as f32,
SummaryType::Sum => v.iter().sum(),
SummaryType::Count => v.len() as f32,
};
(k, r)
})
.collect::<Vec<_>>()
})
Expand All @@ -702,6 +734,8 @@ where
DataType::CsrMatrix(ScalarType::I32) => {
helper(
CsrMatrix::<i32>::try_from(mat).unwrap(),
val_ty,
summary_ty,
&self.exclude_chroms,
&ori_index,
&index,
Expand All @@ -710,6 +744,8 @@ where
DataType::CsrMatrix(ScalarType::F32) => {
helper(
CsrMatrix::<f32>::try_from(mat).unwrap(),
val_ty,
summary_ty,
&self.exclude_chroms,
&ori_index,
&index,
Expand All @@ -726,18 +762,22 @@ where
pub fn into_aggregated_array_iter<C>(
self,
counter: C,
val_ty: ValueType,
summary_ty: SummaryType,
) -> impl ExactSizeIterator<Item = (ArrayData, usize, usize)>
where
C: FeatureCounter<Value = f32> + Clone + Sync,
{
fn helper<'a, T, C>(
data: CsrMatrix<T>,
counter: C,
val_ty: ValueType,
summary_ty: SummaryType,
exclude_chroms: &'a HashSet<String>,
index: &'a GenomeBaseIndex,
) -> Vec<Vec<(usize, f32)>>
where
T: Copy + Send + Sync + ToFloat,
T: Copy + Send + Sync + ValueGetter,
C: FeatureCounter<Value = f32> + Clone + Sync,
{
(0..data.nrows())
Expand All @@ -753,13 +793,18 @@ where
if exclude_chroms.is_empty()
|| !exclude_chroms.contains(chrom)
{
coverage.insert(&GenomicRange::new(chrom, pos, pos + 1), val.to_float());
coverage.insert(&GenomicRange::new(chrom, pos, pos + 1), val.get_value(val_ty).unwrap());
}
});
coverage
.get_values_and_counts()
.map(|(idx, (val, count))| (idx, val / count as f32))
.collect::<Vec<_>>()
match summary_ty {
SummaryType::Mean => coverage
.get_values_and_counts()
.map(|(idx, (val, count))| (idx, val / count as f32))
.collect::<Vec<_>>(),
SummaryType::Sum => coverage
.get_values(),
_ => unimplemented!("Unsupported summary type"),
}
})
.collect::<Vec<_>>()
}
Expand All @@ -771,6 +816,8 @@ where
helper(
CsrMatrix::<i32>::try_from(data).unwrap(),
counter.clone(),
val_ty,
summary_ty,
&self.exclude_chroms,
&self.index,
)
Expand All @@ -779,6 +826,8 @@ where
helper(
CsrMatrix::<f32>::try_from(data).unwrap(),
counter.clone(),
val_ty,
summary_ty,
&self.exclude_chroms,
&self.index,
)
Expand Down Expand Up @@ -903,27 +952,38 @@ fn i32_to_ratio(x: i32) -> Ratio<u16> {
Ratio::new_raw(numer, denom)
}

trait ToFloat {
fn to_float(self) -> f32;
trait ValueGetter {
fn get_value(self, ty: ValueType) -> Option<f32>;
}

impl ToFloat for f32 {
fn to_float(self) -> f32 {
self
impl ValueGetter for f32 {
fn get_value(self, ty: ValueType) -> Option<f32> {
match ty {
ValueType::Denominator => None,
ValueType::Numerator => None,
ValueType::Ratio => Some(self),
}
}
}

impl ToFloat for i32 {
fn to_float(self) -> f32 {
impl ValueGetter for i32 {
fn get_value(self, ty: ValueType) -> Option<f32> {
let numer = (self >> 16) as u16;
let denom = (self & 0xffff) as u16;
if numer == 0 {
0.0
} else if denom == 0 {
1.0
} else {
numer as f32 / denom as f32
}
let r = match ty {
ValueType::Denominator => denom as f32,
ValueType::Numerator => numer as f32,
ValueType::Ratio => {
if numer == 0 {
0.0
} else if denom == 0 {
1.0
} else {
numer as f32 / denom as f32
}
},
};
Some(r)
}
}

Expand Down
11 changes: 9 additions & 2 deletions snapatac2-core/src/feature_count/matrix.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use super::counter::{CountingStrategy, FeatureCounter, GeneCount, RegionCounter, TranscriptCount};
use super::ValueType;
use crate::genome::{Promoters, Transcript};
use crate::feature_count::SnapData;
use crate::preprocessing::SummaryType;

use anndata::{data::DataFrameIndex, AnnDataOp, ArrayData};
use anyhow::{bail, Result};
Expand All @@ -19,6 +21,7 @@ use polars::prelude::{DataFrame, NamedFrom, Series};
/// * `min_fragment_size` - The minimum fragment size.
/// * `max_fragment_size` - The maximum fragment size.
/// * `count_frag_as_reads` - Whether to treat fragments as reads during counting.
/// * `val_type` - Which kind of value to use: numerator, denominator or ratio. Only used for base data.
/// * `out` - The output anndata object.
pub fn create_tile_matrix<A, B>(
adata: &A,
Expand All @@ -28,6 +31,8 @@ pub fn create_tile_matrix<A, B>(
min_fragment_size: Option<u64>,
max_fragment_size: Option<u64>,
counting_strategy: CountingStrategy,
val_type: ValueType,
summary_type: SummaryType,
out: Option<&B>,
) -> Result<()>
where
Expand Down Expand Up @@ -67,7 +72,7 @@ where
}

feature_names = values.get_gindex().to_index().into();
data_iter = Box::new(values.into_array_iter().map(|x| ArrayData::from(x.0)));
data_iter = Box::new(values.into_array_iter(val_type, summary_type).map(|x| ArrayData::from(x.0)));
} else {
bail!("No fragment or base data found in the anndata object");
};
Expand All @@ -92,6 +97,8 @@ pub fn create_peak_matrix<A, I, D, B>(
peaks: I,
chunk_size: usize,
counting_strategy: CountingStrategy,
val_type: ValueType,
summary_type: SummaryType,
min_fragment_size: Option<u64>,
max_fragment_size: Option<u64>,
out: Option<&B>,
Expand Down Expand Up @@ -140,7 +147,7 @@ where
feature_names = counter.get_feature_ids();
data_iter = Box::new(
values
.into_aggregated_array_iter(counter)
.into_aggregated_array_iter(counter, val_type, summary_type)
.map(|x| x.0.into()),
);
} else {
Expand Down
2 changes: 1 addition & 1 deletion snapatac2-core/src/feature_count/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::str::FromStr;
use anyhow::{bail, Context, Result};
use anndata::{data::DynCsrMatrix, AnnData, AnnDataOp, AnnDataSet, ArrayElemOp, AxisArraysOp, Backend, ElemCollectionOp};
use bed_utils::bed::GenomicRange;
pub use data_iter::{BaseValue, ChromValueIter, BaseData, FragmentData, ContactData, FragmentDataIter};
pub use data_iter::{ValueType, BaseValue, ChromValueIter, BaseData, FragmentData, ContactData, FragmentDataIter};
pub use counter::{FeatureCounter, CountingStrategy};
pub use matrix::{create_gene_matrix, create_tile_matrix, create_peak_matrix};
use nalgebra_sparse::CsrMatrix;
Expand Down
74 changes: 47 additions & 27 deletions snapatac2-core/src/preprocessing/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::genome::{ChromSizes, GenomeBaseIndex};
use crate::preprocessing::qc::{Contact, Fragment, FragmentQC, FragmentQCBuilder};

use super::qc::BaseValueQC;
use anndata::data::DynCsrMatrix;
use anndata::{
data::array::utils::{from_csr_data, to_csr_data},
AnnDataOp, ArrayData, AxisArraysOp, ElemCollectionOp,
Expand Down Expand Up @@ -323,6 +324,40 @@ where
A: AnnDataOp,
I: Iterator<Item = (String, BaseValue)>,
{
fn helper<T: TryFrom<BaseValue, Error = anyhow::Error> + Send>(
chunk: Vec<Vec<BaseValue>>,
genome_index: &GenomeBaseIndex,
genome_size: usize,
) -> (Vec<BaseValueQC>, CsrMatrix<T>) {
let (qc, counts): (Vec<_>, Vec<_>) = chunk
.into_par_iter()
.map(|cell_data| {
let mut qc = BaseValueQC::new();
let mut count = cell_data
.into_iter()
.flat_map(|value| {
let chrom = &value.chrom;
if genome_index.contain_chrom(chrom) {
qc.add();
let pos = genome_index.get_position_rev(chrom, value.pos);
Some((pos, T::try_from(value).unwrap()))
} else {
None
}
})
.collect::<Vec<_>>();
count.sort_by(|x, y| x.0.cmp(&y.0));
(qc, count)
})
.unzip();
let (r, c, offset, ind, csr_data) = to_csr_data(counts, genome_size);
(
qc,
CsrMatrix::try_from_csr_data(r, c, offset, ind, csr_data)
.unwrap()
)
}

let spinner = ProgressBar::with_draw_target(None, ProgressDrawTarget::stderr_with_hz(1))
.with_style(
ProgressStyle::with_template(
Expand All @@ -344,38 +379,23 @@ where
let arrays = chunked_values.into_iter().map(|chunk| {
// Collect into vector for parallel processing
let chunk: Vec<Vec<_>> = chunk
.map(|(barcode, x)| {
.map(|(barcode, data)| {
if !scanned_barcodes.insert(barcode.clone()) {
panic!("Please sort fragment file by barcodes");
}
x.collect()
data.map(|x| x.1).collect()
})
.collect();

let (qc, counts): (Vec<_>, Vec<_>) = chunk
.into_par_iter()
.map(|cell_data| {
let mut qc = BaseValueQC::new();
let mut count = cell_data
.into_iter()
.flat_map(|(_, value)| {
let chrom = &value.chrom;
if genome_index.contain_chrom(chrom) {
qc.add();
let pos = genome_index.get_position_rev(chrom, value.pos);
Some((pos, value.value()))
} else {
None
}
})
.collect::<Vec<_>>();
count.sort_by(|x, y| x.0.cmp(&y.0));
(qc, count)
})
.unzip();
qc_metrics.extend(qc);
let (r, c, offset, ind, csr_data) = to_csr_data(counts, genome_size);
CsrMatrix::try_from_csr_data(r, c, offset, ind, csr_data).unwrap()
let mat: DynCsrMatrix = if chunk[0][0].to_i32().is_some() {
let (qc, mat) = helper::<i32>(chunk, &genome_index, genome_size);
qc_metrics.extend(qc);
mat.into()
} else {
let (qc, mat) = helper::<f32>(chunk, &genome_index, genome_size);
qc_metrics.extend(qc);
mat.into()
};
mat
});

anndata.obsm().add_iter(BASE_VALUE, arrays)?;
Expand Down
1 change: 1 addition & 0 deletions snapatac2-core/src/preprocessing/qc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::feature_count::{FragmentDataIter, SnapData};

pub type CellBarcode = String;

#[derive(Debug, Copy, Clone)]
pub enum SummaryType {
Sum,
Count,
Expand Down
Loading

0 comments on commit b3531d2

Please sign in to comment.