Skip to content

Commit

Permalink
significantly improve the performance of export_coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
kaizhang committed Nov 28, 2024
1 parent 8aa8516 commit 2718590
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 108 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test_rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
uses: nschloe/action-cached-lfs-checkout@v1

- uses: ./.github/actions/setup-rust
with:
Expand Down
1 change: 1 addition & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

- Implement `BPM` normalization in `ex.export_coverage`.
- Add `include_for_norm` and `exclude_for_norm` to `ex.export_coverage`.
- `ex.export_coverage` is much faster.

## Release 2.7.1 (released October 29, 2024)

Expand Down
2 changes: 1 addition & 1 deletion snapatac2-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ anyhow = "1.0"
bstr = "1.0"
byteorder = "1.0"
bigtools = { version = "0.5", features = ["read", "write"] }
bed-utils = "0.6.1"
bed-utils = "0.6.2"
flate2 = "1.0"
tokio = "1.34"
hora = "0.1"
Expand Down
205 changes: 105 additions & 100 deletions snapatac2-core/src/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@ use crate::{
};

use anyhow::{bail, ensure, Context, Result};
use bed_utils::bed::{
io,
map::{GIntervalIndexSet, GIntervalMap},
merge_sorted_bed_with, BEDLike, BedGraph, GenomicRange,
use bed_utils::{
bed::{io, map::GIntervalMap, merge_sorted_bedgraph, BEDLike, BedGraph, GenomicRange},
extsort::ExternalSorterBuilder,
};
use bed_utils::coverage::SparseBinnedCoverage;
use bigtools::BigWigWrite;
use indexmap::IndexMap;
use indicatif::{style::ProgressStyle, ParallelProgressIterator, ProgressIterator};
Expand Down Expand Up @@ -197,11 +195,18 @@ pub trait Exporter: SnapData {
.as_ref()
.join(prefix.to_string() + grp.replace("/", "+").as_str() + suffix);

let fragments = io::Reader::new(utils::open_file_for_read(filename), None)
.into_records::<Fragment>()
.map(Result::unwrap);
let fragments = ExternalSorterBuilder::new()
.with_tmp_dir(temp_dir.path())
.build()?
.sort_by(fragments, |a, b| a.compare(b))?
.map(Result::unwrap);

// Make BedGraph
let bedgraph = create_bedgraph_from_fragments(
io::Reader::new(utils::open_file_for_read(filename), None)
.into_records()
.map(Result::unwrap),
let bedgraph = create_bedgraph_from_sorted_fragments(
fragments,
&chrom_sizes,
resolution as u64,
smooth_length,
Expand Down Expand Up @@ -282,7 +287,7 @@ impl std::str::FromStr for Normalization {
/// * `exclude_for_norm` - If specified, the regions that overlap with these intervals will be
/// excluded from normalization. If a region is in both "include_for_norm" and
/// "exclude_for_norm", it will be excluded.
fn create_bedgraph_from_fragments<I>(
fn create_bedgraph_from_sorted_fragments<I>(
fragments: I,
chrom_sizes: &ChromSizes,
bin_size: u64,
Expand All @@ -295,67 +300,47 @@ fn create_bedgraph_from_fragments<I>(
where
I: Iterator<Item = Fragment>,
{
let genome: GIntervalIndexSet = chrom_sizes
.into_iter()
.map(|(k, v)| GenomicRange::new(k, 0, *v))
.collect();
let mut counter = SparseBinnedCoverage::new(&genome, bin_size);
let mut norm_factor = 0.0;
fragments.for_each(|frag| {
let not_in_blacklist = blacklist_regions.map_or(true, |bl| !bl.is_overlapped(&frag));
if not_in_blacklist {
let bedgraph = fragments.flat_map(|frag| {
if blacklist_regions.map_or(false, |bl| bl.is_overlapped(&frag)) {
None
} else {
if include_for_norm.map_or(true, |x| x.is_overlapped(&frag))
&& !exclude_for_norm.map_or(false, |x| x.is_overlapped(&frag))
{
norm_factor += 1.0;
}
counter.insert(&frag, 1.0);
let mut frag = BedGraph::from_bed(&frag, 1.0);
fit_to_bin(&mut frag, bin_size);
clip_bed(frag, chrom_sizes)
}
});
let mut bedgraph: Vec<_> = merge_sorted_bedgraph(bedgraph).collect();

norm_factor = match normalization {
None => 1.0,
Some(Normalization::RPKM) => norm_factor * bin_size as f32 / 1e9,
Some(Normalization::CPM) => norm_factor / 1e6,
Some(Normalization::BPM) => counter.get_coverage().values().sum::<f32>() / 1e6,
Some(Normalization::BPM) => bedgraph.iter().map(|x| x.value).sum::<f32>() / 1e6,
Some(Normalization::RPGC) => todo!(),
};

let counts = counter
.get_coverage()
.iter()
.map(|(i, val)| (counter.get_region(*i).unwrap(), *val / norm_factor));
bedgraph.iter_mut().for_each(|x| x.value /= norm_factor);

let counts: Box<dyn Iterator<Item = _>> = if let Some(smooth_length) = smooth_length {
if let Some(smooth_length) = smooth_length {
let smooth_left = (smooth_length - 1) / 2;
let smooth_right = smooth_left + (smooth_left - 1) % 2;
Box::new(smooth_bedgraph(
counts,
bedgraph = smooth_bedgraph(
bedgraph.into_iter(),
bin_size,
smooth_left,
smooth_right,
chrom_sizes,
))
} else {
Box::new(counts)
};
)
.collect();
}

let chunks = counts
.map(|(region, count)| BedGraph::from_bed(&region, count))
.chunk_by(|x| x.value);
chunks
.into_iter()
.flat_map(|(_, groups)| {
merge_sorted_bed_with(groups, |beds| {
let mut iter = beds.into_iter();
let mut first = iter.next().unwrap();
if let Some(last) = iter.last() {
first.set_end(last.end());
}
first
})
})
.collect()
bedgraph
}

fn smooth_bedgraph<'a, I>(
Expand All @@ -364,14 +349,14 @@ fn smooth_bedgraph<'a, I>(
left_window_len: u16,
right_window_len: u16,
chr_sizes: &'a ChromSizes,
) -> impl Iterator<Item = (GenomicRange, f32)> + 'a
) -> impl Iterator<Item = BedGraph<f32>> + 'a
where
I: Iterator<Item = (GenomicRange, f32)> + 'a,
I: Iterator<Item = BedGraph<f32>> + 'a,
{
let left_bases = left_window_len as u64 * bin_size;
let right_bases = right_window_len as u64 * bin_size;
get_overlapped_chunks(input, left_bases, right_bases).flat_map(move |chunk| {
let size = chr_sizes.get(chunk[0].0.chrom()).unwrap();
let size = chr_sizes.get(chunk[0].chrom()).unwrap();
moving_average(&chunk, left_bases, right_bases, size)
})
}
Expand All @@ -380,21 +365,21 @@ fn get_overlapped_chunks<I>(
mut input: I,
left_bases: u64,
right_bases: u64,
) -> impl Iterator<Item = Vec<(GenomicRange, f32)>>
) -> impl Iterator<Item = Vec<BedGraph<f32>>>
where
I: Iterator<Item = (GenomicRange, f32)>,
I: Iterator<Item = BedGraph<f32>>,
{
let mut buffer = vec![input.next().unwrap()];
std::iter::from_fn(move || {
while let Some((cur_region, cur_val)) = input.next() {
let (prev_region, _) = buffer.last().unwrap();
if cur_region.chrom() == prev_region.chrom()
&& prev_region.end() + right_bases > cur_region.start().saturating_sub(left_bases)
while let Some(cur) = input.next() {
let prev = buffer.last().unwrap();
if cur.chrom() == prev.chrom()
&& prev.end() + right_bases > cur.start().saturating_sub(left_bases)
{
buffer.push((cur_region, cur_val));
buffer.push(cur);
} else {
let result = Some(buffer.clone());
buffer = vec![(cur_region, cur_val)];
buffer = vec![cur];
return result;
}
}
Expand All @@ -409,17 +394,17 @@ where
}

fn moving_average(
chunk: &Vec<(GenomicRange, f32)>,
chunk: &Vec<BedGraph<f32>>,
left_bases: u64,
right_bases: u64,
chrom_size: u64,
) -> impl Iterator<Item = (GenomicRange, f32)> {
let bin_size = chunk[0].0.len() as u64;
) -> impl Iterator<Item = BedGraph<f32>> {
let bin_size = chunk[0].len() as u64;
let n_left = (left_bases / bin_size) as usize;
let n_right = (right_bases / bin_size) as usize;
let chrom = chunk.first().unwrap().0.chrom();
let chunk_start = chunk.first().unwrap().0.start();
let chunk_end = (chunk.last().unwrap().0.end() + right_bases).min(chrom_size);
let chrom = chunk.first().unwrap().chrom();
let chunk_start = chunk.first().unwrap().start();
let chunk_end = (chunk.last().unwrap().end() + right_bases).min(chrom_size);
let mut regions = GenomicRange::new(chrom, chunk_start.saturating_sub(left_bases), chunk_start)
.rsplit_by_len(bin_size)
.collect::<Vec<_>>()
Expand All @@ -431,15 +416,15 @@ fn moving_average(
// Add values
chunk
.iter()
.for_each(|(gr, val)| *regions.get_mut(gr).unwrap() = *val);
.for_each(|gr| *regions.get_mut(&gr.to_genomic_range()).unwrap() = gr.value);
let len = regions.len();
// Compute the average
(0..len).map(move |i| {
let s: f32 = (i.saturating_sub(n_left)..(i + n_right + 1).min(len))
.map(|x| regions[x])
.sum();
let val = s / (n_right + n_left + 1) as f32;
(regions.get_index(i).unwrap().0.clone(), val)
BedGraph::from_bed(regions.get_index(i).unwrap().0, val)
})
}

Expand Down Expand Up @@ -491,6 +476,28 @@ fn create_bigwig_from_bedgraph<P: AsRef<Path>>(
Ok(())
}

fn clip_bed<B: BEDLike>(mut bed: B, chr_size: &ChromSizes) -> Option<B> {
let size = chr_size.get(bed.chrom())?;
if bed.start() >= size {
return None;
}
if bed.end() > size {
bed.set_end(size);
}
Some(bed)
}

fn fit_to_bin<B: BEDLike>(x: &mut B, bin_size: u64) {
if bin_size > 1 {
if x.start() % bin_size != 0 {
x.set_start(x.start() - x.start() % bin_size);
}
if x.end() % bin_size != 0 {
x.set_end(x.end() + bin_size - x.end() % bin_size);
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -509,7 +516,7 @@ mod tests {
];
let genome: ChromSizes = [("chr1", 50)].into_iter().collect();

let output: Vec<_> = create_bedgraph_from_fragments(
let output: Vec<_> = create_bedgraph_from_sorted_fragments(
fragments.clone().into_iter(),
&genome,
3,
Expand All @@ -525,7 +532,7 @@ mod tests {
let expected = vec![1.0, 3.0, 4.0, 3.0, 2.0, 4.0, 3.0, 2.0];
assert_eq!(output, expected);

let output: Vec<_> = create_bedgraph_from_fragments(
let output: Vec<_> = create_bedgraph_from_sorted_fragments(
fragments.clone().into_iter(),
&genome,
5,
Expand All @@ -550,9 +557,9 @@ mod tests {

let reader = crate::utils::open_file_for_read("test/coverage.bdg.gz");
let mut reader = bed_utils::bed::io::Reader::new(reader, None);
let expected: Vec<BedGraph<f32>> = reader.records().map(|x| x.unwrap()).collect();
let mut expected: Vec<BedGraph<f32>> = reader.records().map(|x| x.unwrap()).collect();

let output = create_bedgraph_from_fragments(
let output = create_bedgraph_from_sorted_fragments(
fragments.clone().into_iter(),
&[("chr1", 248956422), ("chr2", 242193529)]
.into_iter()
Expand All @@ -564,9 +571,9 @@ mod tests {
None,
None,
);
assert_eq!(output, expected);
assert_eq!(output, expected, "Left: {:?}\n\n{:?}", &output[..10], &expected[..10]);

let output = create_bedgraph_from_fragments(
let output = create_bedgraph_from_sorted_fragments(
fragments.into_iter(),
&[("chr1", 248956422), ("chr2", 242193529)]
.into_iter()
Expand All @@ -580,28 +587,26 @@ mod tests {
);
let scale_factor: f32 = expected
.iter()
.map(|x| x.value * x.len() as f32)
.map(|x| x.value)
.sum::<f32>()
/ 1e6;
assert_eq!(
output,
expected
.into_iter()
.map(|mut x| {
x.value = x.value / scale_factor;
x
})
.collect::<Vec<_>>()
);
expected = expected
.into_iter()
.map(|mut x| {
x.value = x.value / scale_factor;
x
})
.collect::<Vec<_>>();
assert_eq!(output, expected, "Left: {:?}\n\n{:?}", &output[..10], &expected[..10]);
}

#[test]
fn test_smoothing() {
let input = vec![
(GenomicRange::new("chr1", 0, 100), 1.0),
(GenomicRange::new("chr1", 200, 300), 2.0),
(GenomicRange::new("chr1", 500, 600), 3.0),
(GenomicRange::new("chr1", 1000, 1100), 5.0),
BedGraph::new("chr1", 0, 100, 1.0),
BedGraph::new("chr1", 200, 300, 2.0),
BedGraph::new("chr1", 500, 600, 3.0),
BedGraph::new("chr1", 1000, 1100, 5.0),
];

assert_eq!(
Expand All @@ -614,19 +619,19 @@ mod tests {
)
.collect::<Vec<_>>(),
vec![
(GenomicRange::new("chr1", 0, 100), 0.6),
(GenomicRange::new("chr1", 100, 200), 0.6),
(GenomicRange::new("chr1", 200, 300), 0.6),
(GenomicRange::new("chr1", 300, 400), 1.0),
(GenomicRange::new("chr1", 400, 500), 1.0),
(GenomicRange::new("chr1", 500, 600), 0.6),
(GenomicRange::new("chr1", 600, 700), 0.6),
(GenomicRange::new("chr1", 700, 800), 0.6),
(GenomicRange::new("chr1", 800, 900), 1.0),
(GenomicRange::new("chr1", 900, 1000), 1.0),
(GenomicRange::new("chr1", 1000, 1100), 1.0),
(GenomicRange::new("chr1", 1100, 1200), 1.0),
(GenomicRange::new("chr1", 1200, 1300), 1.0),
BedGraph::new("chr1", 0, 100, 0.6),
BedGraph::new("chr1", 100, 200, 0.6),
BedGraph::new("chr1", 200, 300, 0.6),
BedGraph::new("chr1", 300, 400, 1.0),
BedGraph::new("chr1", 400, 500, 1.0),
BedGraph::new("chr1", 500, 600, 0.6),
BedGraph::new("chr1", 600, 700, 0.6),
BedGraph::new("chr1", 700, 800, 0.6),
BedGraph::new("chr1", 800, 900, 1.0),
BedGraph::new("chr1", 900, 1000, 1.0),
BedGraph::new("chr1", 1000, 1100, 1.0),
BedGraph::new("chr1", 1100, 1200, 1.0),
BedGraph::new("chr1", 1200, 1300, 1.0),
],
);
}
Expand Down
Loading

0 comments on commit 2718590

Please sign in to comment.