From 2718590740068862ed785807f91d42b27a12c43a Mon Sep 17 00:00:00 2001 From: Kai Zhang Date: Thu, 28 Nov 2024 21:21:30 +0800 Subject: [PATCH] significantly improve the performance of export_coverage --- .github/workflows/test_rust.yml | 2 +- docs/changelog.md | 1 + snapatac2-core/Cargo.toml | 2 +- snapatac2-core/src/export.rs | 205 +++++++++--------- snapatac2-python/Cargo.toml | 2 +- .../python/snapatac2/export/__init__.py | 6 +- snapatac2-python/src/utils.rs | 29 ++- 7 files changed, 139 insertions(+), 108 deletions(-) diff --git a/.github/workflows/test_rust.yml b/.github/workflows/test_rust.yml index 74781af7..5d3810d0 100644 --- a/.github/workflows/test_rust.yml +++ b/.github/workflows/test_rust.yml @@ -7,7 +7,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout code - uses: actions/checkout@v4 + uses: nschloe/action-cached-lfs-checkout@v1 - uses: ./.github/actions/setup-rust with: diff --git a/docs/changelog.md b/docs/changelog.md index e6d877e6..7cce0ddf 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -6,6 +6,7 @@ - Implement `BPM` normalization in `ex.export_coverage`. - Add `include_for_norm` and `exclude_for_norm` to `ex.export_coverage`. + - `ex.export_coverage` is much faster. ## Release 2.7.1 (released October 29, 2024) diff --git a/snapatac2-core/Cargo.toml b/snapatac2-core/Cargo.toml index baebde28..ae959b3a 100644 --- a/snapatac2-core/Cargo.toml +++ b/snapatac2-core/Cargo.toml @@ -15,7 +15,7 @@ anyhow = "1.0" bstr = "1.0" byteorder = "1.0" bigtools = { version = "0.5", features = ["read", "write"] } -bed-utils = "0.6.1" +bed-utils = "0.6.2" flate2 = "1.0" tokio = "1.34" hora = "0.1" diff --git a/snapatac2-core/src/export.rs b/snapatac2-core/src/export.rs index 5eecc22f..6d173a1a 100644 --- a/snapatac2-core/src/export.rs +++ b/snapatac2-core/src/export.rs @@ -7,12 +7,10 @@ use crate::{ }; use anyhow::{bail, ensure, Context, Result}; -use bed_utils::bed::{ - io, - map::{GIntervalIndexSet, GIntervalMap}, - merge_sorted_bed_with, BEDLike, BedGraph, GenomicRange, +use bed_utils::{ + bed::{io, map::GIntervalMap, merge_sorted_bedgraph, BEDLike, BedGraph, GenomicRange}, + extsort::ExternalSorterBuilder, }; -use bed_utils::coverage::SparseBinnedCoverage; use bigtools::BigWigWrite; use indexmap::IndexMap; use indicatif::{style::ProgressStyle, ParallelProgressIterator, ProgressIterator}; @@ -197,11 +195,18 @@ pub trait Exporter: SnapData { .as_ref() .join(prefix.to_string() + grp.replace("/", "+").as_str() + suffix); + let fragments = io::Reader::new(utils::open_file_for_read(filename), None) + .into_records::() + .map(Result::unwrap); + let fragments = ExternalSorterBuilder::new() + .with_tmp_dir(temp_dir.path()) + .build()? + .sort_by(fragments, |a, b| a.compare(b))? + .map(Result::unwrap); + // Make BedGraph - let bedgraph = create_bedgraph_from_fragments( - io::Reader::new(utils::open_file_for_read(filename), None) - .into_records() - .map(Result::unwrap), + let bedgraph = create_bedgraph_from_sorted_fragments( + fragments, &chrom_sizes, resolution as u64, smooth_length, @@ -282,7 +287,7 @@ impl std::str::FromStr for Normalization { /// * `exclude_for_norm` - If specified, the regions that overlap with these intervals will be /// excluded from normalization. If a region is in both "include_for_norm" and /// "exclude_for_norm", it will be excluded. -fn create_bedgraph_from_fragments( +fn create_bedgraph_from_sorted_fragments( fragments: I, chrom_sizes: &ChromSizes, bin_size: u64, @@ -295,67 +300,47 @@ fn create_bedgraph_from_fragments( where I: Iterator, { - let genome: GIntervalIndexSet = chrom_sizes - .into_iter() - .map(|(k, v)| GenomicRange::new(k, 0, *v)) - .collect(); - let mut counter = SparseBinnedCoverage::new(&genome, bin_size); let mut norm_factor = 0.0; - fragments.for_each(|frag| { - let not_in_blacklist = blacklist_regions.map_or(true, |bl| !bl.is_overlapped(&frag)); - if not_in_blacklist { + let bedgraph = fragments.flat_map(|frag| { + if blacklist_regions.map_or(false, |bl| bl.is_overlapped(&frag)) { + None + } else { if include_for_norm.map_or(true, |x| x.is_overlapped(&frag)) && !exclude_for_norm.map_or(false, |x| x.is_overlapped(&frag)) { norm_factor += 1.0; } - counter.insert(&frag, 1.0); + let mut frag = BedGraph::from_bed(&frag, 1.0); + fit_to_bin(&mut frag, bin_size); + clip_bed(frag, chrom_sizes) } }); + let mut bedgraph: Vec<_> = merge_sorted_bedgraph(bedgraph).collect(); norm_factor = match normalization { None => 1.0, Some(Normalization::RPKM) => norm_factor * bin_size as f32 / 1e9, Some(Normalization::CPM) => norm_factor / 1e6, - Some(Normalization::BPM) => counter.get_coverage().values().sum::() / 1e6, + Some(Normalization::BPM) => bedgraph.iter().map(|x| x.value).sum::() / 1e6, Some(Normalization::RPGC) => todo!(), }; - let counts = counter - .get_coverage() - .iter() - .map(|(i, val)| (counter.get_region(*i).unwrap(), *val / norm_factor)); + bedgraph.iter_mut().for_each(|x| x.value /= norm_factor); - let counts: Box> = if let Some(smooth_length) = smooth_length { + if let Some(smooth_length) = smooth_length { let smooth_left = (smooth_length - 1) / 2; let smooth_right = smooth_left + (smooth_left - 1) % 2; - Box::new(smooth_bedgraph( - counts, + bedgraph = smooth_bedgraph( + bedgraph.into_iter(), bin_size, smooth_left, smooth_right, chrom_sizes, - )) - } else { - Box::new(counts) - }; + ) + .collect(); + } - let chunks = counts - .map(|(region, count)| BedGraph::from_bed(®ion, count)) - .chunk_by(|x| x.value); - chunks - .into_iter() - .flat_map(|(_, groups)| { - merge_sorted_bed_with(groups, |beds| { - let mut iter = beds.into_iter(); - let mut first = iter.next().unwrap(); - if let Some(last) = iter.last() { - first.set_end(last.end()); - } - first - }) - }) - .collect() + bedgraph } fn smooth_bedgraph<'a, I>( @@ -364,14 +349,14 @@ fn smooth_bedgraph<'a, I>( left_window_len: u16, right_window_len: u16, chr_sizes: &'a ChromSizes, -) -> impl Iterator + 'a +) -> impl Iterator> + 'a where - I: Iterator + 'a, + I: Iterator> + 'a, { let left_bases = left_window_len as u64 * bin_size; let right_bases = right_window_len as u64 * bin_size; get_overlapped_chunks(input, left_bases, right_bases).flat_map(move |chunk| { - let size = chr_sizes.get(chunk[0].0.chrom()).unwrap(); + let size = chr_sizes.get(chunk[0].chrom()).unwrap(); moving_average(&chunk, left_bases, right_bases, size) }) } @@ -380,21 +365,21 @@ fn get_overlapped_chunks( mut input: I, left_bases: u64, right_bases: u64, -) -> impl Iterator> +) -> impl Iterator>> where - I: Iterator, + I: Iterator>, { let mut buffer = vec![input.next().unwrap()]; std::iter::from_fn(move || { - while let Some((cur_region, cur_val)) = input.next() { - let (prev_region, _) = buffer.last().unwrap(); - if cur_region.chrom() == prev_region.chrom() - && prev_region.end() + right_bases > cur_region.start().saturating_sub(left_bases) + while let Some(cur) = input.next() { + let prev = buffer.last().unwrap(); + if cur.chrom() == prev.chrom() + && prev.end() + right_bases > cur.start().saturating_sub(left_bases) { - buffer.push((cur_region, cur_val)); + buffer.push(cur); } else { let result = Some(buffer.clone()); - buffer = vec![(cur_region, cur_val)]; + buffer = vec![cur]; return result; } } @@ -409,17 +394,17 @@ where } fn moving_average( - chunk: &Vec<(GenomicRange, f32)>, + chunk: &Vec>, left_bases: u64, right_bases: u64, chrom_size: u64, -) -> impl Iterator { - let bin_size = chunk[0].0.len() as u64; +) -> impl Iterator> { + let bin_size = chunk[0].len() as u64; let n_left = (left_bases / bin_size) as usize; let n_right = (right_bases / bin_size) as usize; - let chrom = chunk.first().unwrap().0.chrom(); - let chunk_start = chunk.first().unwrap().0.start(); - let chunk_end = (chunk.last().unwrap().0.end() + right_bases).min(chrom_size); + let chrom = chunk.first().unwrap().chrom(); + let chunk_start = chunk.first().unwrap().start(); + let chunk_end = (chunk.last().unwrap().end() + right_bases).min(chrom_size); let mut regions = GenomicRange::new(chrom, chunk_start.saturating_sub(left_bases), chunk_start) .rsplit_by_len(bin_size) .collect::>() @@ -431,7 +416,7 @@ fn moving_average( // Add values chunk .iter() - .for_each(|(gr, val)| *regions.get_mut(gr).unwrap() = *val); + .for_each(|gr| *regions.get_mut(&gr.to_genomic_range()).unwrap() = gr.value); let len = regions.len(); // Compute the average (0..len).map(move |i| { @@ -439,7 +424,7 @@ fn moving_average( .map(|x| regions[x]) .sum(); let val = s / (n_right + n_left + 1) as f32; - (regions.get_index(i).unwrap().0.clone(), val) + BedGraph::from_bed(regions.get_index(i).unwrap().0, val) }) } @@ -491,6 +476,28 @@ fn create_bigwig_from_bedgraph>( Ok(()) } +fn clip_bed(mut bed: B, chr_size: &ChromSizes) -> Option { + let size = chr_size.get(bed.chrom())?; + if bed.start() >= size { + return None; + } + if bed.end() > size { + bed.set_end(size); + } + Some(bed) +} + +fn fit_to_bin(x: &mut B, bin_size: u64) { + if bin_size > 1 { + if x.start() % bin_size != 0 { + x.set_start(x.start() - x.start() % bin_size); + } + if x.end() % bin_size != 0 { + x.set_end(x.end() + bin_size - x.end() % bin_size); + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -509,7 +516,7 @@ mod tests { ]; let genome: ChromSizes = [("chr1", 50)].into_iter().collect(); - let output: Vec<_> = create_bedgraph_from_fragments( + let output: Vec<_> = create_bedgraph_from_sorted_fragments( fragments.clone().into_iter(), &genome, 3, @@ -525,7 +532,7 @@ mod tests { let expected = vec![1.0, 3.0, 4.0, 3.0, 2.0, 4.0, 3.0, 2.0]; assert_eq!(output, expected); - let output: Vec<_> = create_bedgraph_from_fragments( + let output: Vec<_> = create_bedgraph_from_sorted_fragments( fragments.clone().into_iter(), &genome, 5, @@ -550,9 +557,9 @@ mod tests { let reader = crate::utils::open_file_for_read("test/coverage.bdg.gz"); let mut reader = bed_utils::bed::io::Reader::new(reader, None); - let expected: Vec> = reader.records().map(|x| x.unwrap()).collect(); + let mut expected: Vec> = reader.records().map(|x| x.unwrap()).collect(); - let output = create_bedgraph_from_fragments( + let output = create_bedgraph_from_sorted_fragments( fragments.clone().into_iter(), &[("chr1", 248956422), ("chr2", 242193529)] .into_iter() @@ -564,9 +571,9 @@ mod tests { None, None, ); - assert_eq!(output, expected); + assert_eq!(output, expected, "Left: {:?}\n\n{:?}", &output[..10], &expected[..10]); - let output = create_bedgraph_from_fragments( + let output = create_bedgraph_from_sorted_fragments( fragments.into_iter(), &[("chr1", 248956422), ("chr2", 242193529)] .into_iter() @@ -580,28 +587,26 @@ mod tests { ); let scale_factor: f32 = expected .iter() - .map(|x| x.value * x.len() as f32) + .map(|x| x.value) .sum::() / 1e6; - assert_eq!( - output, - expected - .into_iter() - .map(|mut x| { - x.value = x.value / scale_factor; - x - }) - .collect::>() - ); + expected = expected + .into_iter() + .map(|mut x| { + x.value = x.value / scale_factor; + x + }) + .collect::>(); + assert_eq!(output, expected, "Left: {:?}\n\n{:?}", &output[..10], &expected[..10]); } #[test] fn test_smoothing() { let input = vec![ - (GenomicRange::new("chr1", 0, 100), 1.0), - (GenomicRange::new("chr1", 200, 300), 2.0), - (GenomicRange::new("chr1", 500, 600), 3.0), - (GenomicRange::new("chr1", 1000, 1100), 5.0), + BedGraph::new("chr1", 0, 100, 1.0), + BedGraph::new("chr1", 200, 300, 2.0), + BedGraph::new("chr1", 500, 600, 3.0), + BedGraph::new("chr1", 1000, 1100, 5.0), ]; assert_eq!( @@ -614,19 +619,19 @@ mod tests { ) .collect::>(), vec![ - (GenomicRange::new("chr1", 0, 100), 0.6), - (GenomicRange::new("chr1", 100, 200), 0.6), - (GenomicRange::new("chr1", 200, 300), 0.6), - (GenomicRange::new("chr1", 300, 400), 1.0), - (GenomicRange::new("chr1", 400, 500), 1.0), - (GenomicRange::new("chr1", 500, 600), 0.6), - (GenomicRange::new("chr1", 600, 700), 0.6), - (GenomicRange::new("chr1", 700, 800), 0.6), - (GenomicRange::new("chr1", 800, 900), 1.0), - (GenomicRange::new("chr1", 900, 1000), 1.0), - (GenomicRange::new("chr1", 1000, 1100), 1.0), - (GenomicRange::new("chr1", 1100, 1200), 1.0), - (GenomicRange::new("chr1", 1200, 1300), 1.0), + BedGraph::new("chr1", 0, 100, 0.6), + BedGraph::new("chr1", 100, 200, 0.6), + BedGraph::new("chr1", 200, 300, 0.6), + BedGraph::new("chr1", 300, 400, 1.0), + BedGraph::new("chr1", 400, 500, 1.0), + BedGraph::new("chr1", 500, 600, 0.6), + BedGraph::new("chr1", 600, 700, 0.6), + BedGraph::new("chr1", 700, 800, 0.6), + BedGraph::new("chr1", 800, 900, 1.0), + BedGraph::new("chr1", 900, 1000, 1.0), + BedGraph::new("chr1", 1000, 1100, 1.0), + BedGraph::new("chr1", 1100, 1200, 1.0), + BedGraph::new("chr1", 1200, 1300, 1.0), ], ); } diff --git a/snapatac2-python/Cargo.toml b/snapatac2-python/Cargo.toml index e383b96a..a0978978 100644 --- a/snapatac2-python/Cargo.toml +++ b/snapatac2-python/Cargo.toml @@ -16,7 +16,7 @@ anndata = "0.4.2" anndata-hdf5 = "0.3" pyanndata = "0.4.1" anyhow = "1.0" -bed-utils = "0.6.1" +bed-utils = "0.6.2" flate2 = "1.0" itertools = "0.13" indicatif = "0.17" diff --git a/snapatac2-python/python/snapatac2/export/__init__.py b/snapatac2-python/python/snapatac2/export/__init__.py index 0edbf599..d78a1f07 100644 --- a/snapatac2-python/python/snapatac2/export/__init__.py +++ b/snapatac2-python/python/snapatac2/export/__init__.py @@ -133,11 +133,13 @@ def export_coverage( - CPM (per bin) = #reads per bin / #mapped_reads (in millions). - BPM (per bin) = #reads per bin / sum of all reads per bin (in millions). include_for_norm - A list of string or a BED file containing the genomic loci to include for normalization. + A list of string (e.g., ["chr1:1-100", "chr2:2-200"]) or a BED file containing + the genomic loci to include for normalization. If specified, only the reads that overlap with these loci will be used for normalization. A typical use case is to include only the promoter regions for the normalization. exclude_for_norm - A list of string or a BED file containing the genomic loci to exclude for normalization. + A list of string (e.g., ["chr1:1-100", "chr2:2-200"]) or a BED file containing + the genomic loci to exclude for normalization. If specified, the reads that overlap with these loci will be excluded from normalization. If a read overlaps with both `include_for_norm` and `exclude_for_norm`, it will be excluded. min_frag_length diff --git a/snapatac2-python/src/utils.rs b/snapatac2-python/src/utils.rs index 6c4ca02d..2869df0d 100644 --- a/snapatac2-python/src/utils.rs +++ b/snapatac2-python/src/utils.rs @@ -4,12 +4,14 @@ pub use self::anndata::AnnDataLike; use bed_utils::extsort::ExternalSorterBuilder; use bed_utils::bed::{merge_sorted_bed, BEDLike}; +use polars::frame::DataFrame; use pyo3::{ prelude::*, types::PyIterator, PyResult, Python, }; use numpy::{Element, PyReadonlyArrayDyn, PyReadonlyArray, Ix1, Ix2, PyArray, IntoPyArray, PyArrayMethods}; +use pyo3_polars::PyDataFrame; use snapatac2_core::preprocessing::count_data::TranscriptParserOptions; use snapatac2_core::preprocessing::{Transcript, read_transcripts_from_gff, read_transcripts_from_gtf}; use snapatac2_core::utils; @@ -277,18 +279,39 @@ pub(crate) fn kmeans<'py>( Ok(model.predict(observations).targets.into_pyarray_bound(py)) } +/* +#[pyfunction] +pub(crate) fn read_promoters( + annotation: PathBuf, +) -> Result { + let transcripts = read_transcripts(annotation, &TranscriptParserOptions::default()); + let (chroms, starts, ends, strands, names) = transcripts.iter().map(|x| { + let strand = x.strand.as_str(); + + }).unzip(); + Ok(DataFrame::new(vec![ + ("chrom", chroms), + ("start", starts), + ("end", ends), + ("strand", strand), + ("name", name), + ])?) +} + */ + pub fn read_transcripts>(file_path: P, options: &TranscriptParserOptions) -> Vec { let path = if file_path.as_ref().extension().unwrap() == "gz" { file_path.as_ref().file_stem().unwrap().as_ref() } else { file_path.as_ref() }; + let file = BufReader::new(utils::open_file_for_read(&file_path)); if path.extension().unwrap() == "gff" { - read_transcripts_from_gff(BufReader::new(utils::open_file_for_read(file_path)), options).unwrap() + read_transcripts_from_gff(file, options).unwrap() } else if path.extension().unwrap() == "gtf" { - read_transcripts_from_gtf(BufReader::new(utils::open_file_for_read(file_path)), options).unwrap() + read_transcripts_from_gtf(file, options).unwrap() } else { - read_transcripts_from_gff(BufReader::new(utils::open_file_for_read(file_path.as_ref())), options) + read_transcripts_from_gff(file, options) .unwrap_or_else(|_| read_transcripts_from_gtf(BufReader::new(utils::open_file_for_read(file_path)), options).unwrap()) } }