From 156c6b7d97bf709a66d082638a9b5359263e89b6 Mon Sep 17 00:00:00 2001 From: Kai Zhang Date: Sat, 4 Nov 2023 20:48:37 -0700 Subject: [PATCH] add `ex.export_coverage` --- docs/api/export.rst | 2 +- docs/changelog.md | 2 + snapatac2-core/src/export.rs | 189 ++++++------------ snapatac2-core/src/preprocessing/bam.rs | 9 + snapatac2-python/snapatac2/export/__init__.py | 96 ++++----- snapatac2-python/src/export.rs | 34 +--- snapatac2-python/src/lib.rs | 3 +- 7 files changed, 133 insertions(+), 202 deletions(-) diff --git a/docs/api/export.rst b/docs/api/export.rst index 07eaf86ee..26f2bf5e8 100644 --- a/docs/api/export.rst +++ b/docs/api/export.rst @@ -7,4 +7,4 @@ Exporting: `ex` :toctree: _autosummary ex.export_fragments - ex.export_bigwig \ No newline at end of file + ex.export_coverage \ No newline at end of file diff --git a/docs/changelog.md b/docs/changelog.md index d4cdd7922..1da50eff0 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -6,6 +6,8 @@ - Support anndata v0.10. - Make it easier to build custom genome objects. +- Add `ex.export_coverage` to export coverage tracks as bedGraph or bigWig files. + This deprecates `ex.export_bigwig`. ### Bugs fixed: diff --git a/snapatac2-core/src/export.rs b/snapatac2-core/src/export.rs index bea894bb7..83a6ccffd 100644 --- a/snapatac2-core/src/export.rs +++ b/snapatac2-core/src/export.rs @@ -1,11 +1,11 @@ -use crate::{preprocessing::{count_data::{SnapData, GenomeCoverage, CoverageType, ChromSizes}, Fragment}, utils::{self, Compression}}; +use crate::{preprocessing::{count_data::{SnapData, ChromSizes}, Fragment}, utils::{self, Compression}}; use anyhow::{Context, Result, ensure}; use itertools::Itertools; use log::info; use std::{ sync::{Arc, Mutex}, io::Write, - path::{Path, PathBuf}, collections::{BTreeMap, HashMap, HashSet}, + path::{Path, PathBuf}, collections::{HashMap, HashSet}, }; use rayon::iter::{ParallelBridge, ParallelIterator, IntoParallelIterator}; use bed_utils::bed::{ @@ -17,6 +17,23 @@ use futures::executor::ThreadPool; use indicatif::{ProgressIterator, style::ProgressStyle, ParallelProgressIterator}; use tempfile::Builder; +#[derive(Debug, Clone, Copy)] +pub enum CoverageOutputFormat { + BedGraph, + BigWig, +} + +impl std::str::FromStr for CoverageOutputFormat { + type Err = String; + fn from_str(s: &str) -> Result { + match s.to_uppercase().as_str() { + "BEDGRAPH" => Ok(CoverageOutputFormat::BedGraph), + "BIGWIG" => Ok(CoverageOutputFormat::BigWig), + _ => Err(format!("unknown output format: {}", s)), + } + } +} + impl Exporter for T where T: SnapData {} pub trait Exporter: SnapData { @@ -76,12 +93,11 @@ pub trait Exporter: SnapData { Ok(files.into_iter().map(|(k, (v, _))| (k.to_string(), v)).collect()) } - fn export_bedgraph + std::marker::Sync>( + fn export_coverage + std::marker::Sync>( &self, group_by: &Vec<&str>, selections: Option>, resolution: usize, - smooth_length: Option, blacklist_regions: Option<&BedTree<()>>, normalization: Option, ignore_for_norm: Option<&HashSet<&str>>, @@ -90,9 +106,11 @@ pub trait Exporter: SnapData { dir: P, prefix: &str, suffix:&str, + format: CoverageOutputFormat, compression: Option, compression_level: Option, temp_dir: Option

, + num_threads: Option, ) -> Result> { // Create directory std::fs::create_dir_all(&dir) @@ -115,132 +133,48 @@ pub trait Exporter: SnapData { let chrom_sizes = self.read_chrom_sizes()?; - info!("Export fragments as bedgraph files..."); + info!("Creating coverage files..."); let style = ProgressStyle::with_template( "[{elapsed}] {bar:40.cyan/blue} {pos:>7}/{len:7} (eta: {eta})" ).unwrap(); - fragment_files.into_iter().collect::>().into_par_iter().map(|(grp, filename)| { - let output = dir.as_ref().join( - prefix.to_string() + grp.replace("/", "+").as_str() + suffix - ); - let mut writer = utils::open_file_for_write(&output, compression, compression_level)?; - - // Make BedGraph - create_bedgraph_from_fragments( - io::Reader::new(utils::open_file_for_read(filename), None).into_records().map(Result::unwrap), - &chrom_sizes, - resolution as u64, - smooth_length, - blacklist_regions, - normalization, - ignore_for_norm, - min_frag_length, - max_frag_length, - ).into_iter().for_each(|x| writeln!(writer, "{}", x).unwrap()); - - Ok((grp.to_string(), output)) - }).progress_with_style(style).collect() - } - - fn export_bigwig>( - &self, - group_by: &Vec<&str>, - selections: Option>, - resolution: usize, - dir: P, - prefix: &str, - suffix:&str, - ) -> Result> { - export_insertions_as_bigwig( - self.get_count_iter(500)?, group_by, selections, resolution, dir, prefix, suffix, - ) - } -} - -/// Export TN5 insertions as bigwig files -/// -/// # Arguments -/// -/// * `insertions` - TN5 insertion matrix -/// * `genome_index` - -/// * `chrom_sizes` - -fn export_insertions_as_bigwig( - mut coverage: GenomeCoverage, - group_by: &Vec<&str>, - selections: Option>, - resolution: usize, - dir: P, - prefix: &str, - suffix:&str, -) -> Result> -where - I: ExactSizeIterator, - P: AsRef, -{ - // Create directory - std::fs::create_dir_all(&dir) - .with_context(|| format!("cannot create directory: {}", dir.as_ref().display()))?; - - let style = ProgressStyle::with_template( - "[{elapsed}] {bar:40.cyan/blue} {pos:>7}/{len:7} (eta: {eta})" - ).unwrap(); - - coverage = coverage.with_resolution(resolution); - let index = coverage.get_gindex(); - let chrom_sizes: HashMap = index.chrom_sizes().map(|(k, v)| (k.to_string(), v as u32)).collect(); - - // Collect groups - let mut groups: HashSet<&str> = group_by.iter().map(|x| *x).unique().collect(); - if let Some(select) = selections { groups.retain(|x| select.contains(x)); } - // Collect counts - let mut counts: HashMap<&str, BTreeMap> = - groups.into_iter().map(|grp| (grp, BTreeMap::new())).collect(); - info!("Compute coverage for {} groups...", counts.len()); - coverage.into_values::().progress_with_style(style.clone()).for_each(|(csr, start, end)| - (start..end).zip(csr.row_iter()).for_each(|(row_idx, row)| - if let Some(count) = counts.get_mut(group_by[row_idx]) { - row.col_indices().iter().zip(row.values()).for_each(|(i, v)| - *count.entry(*i).or_insert(0) += *v + let pool = if let Some(n) = num_threads { + rayon::ThreadPoolBuilder::new().num_threads(n) + } else { + rayon::ThreadPoolBuilder::new() + }; + pool.build().unwrap().install(|| + fragment_files.into_iter().collect::>().into_par_iter().map(|(grp, filename)| { + let output = dir.as_ref().join( + prefix.to_string() + grp.replace("/", "+").as_str() + suffix ); - } - ) - ); - - // Exporting - info!("Exporting bigwig files..."); - counts.into_iter().progress_with_style(style).map(|(grp, count)| { - let filename = dir.as_ref().join( - prefix.to_string() + grp.replace("/", "+").as_str() + suffix - ); - - // compute normalization factor - let total_count: f64 = count.values().map(|x| *x as f64).sum(); - let norm_factor = total_count * resolution as f64 / 1e9; - // Make BedGraph - let bedgraph: Vec> = count.into_iter().map(|(k, v)| { - let mut region = index.get_region(k); - region.set_end(region.start() + resolution as u64); - BedGraph::from_bed(®ion, (v as f64 / norm_factor) as f32) - }).group_by(|x| x.value).into_iter().flat_map(|(_, groups)| - merge_sorted_bed_with( - groups, - |beds| { - let mut iter = beds.into_iter(); - let mut first = iter.next().unwrap(); - if let Some(last) = iter.last() { - first.set_end(last.end()); - } - first - }, - ) - ).collect(); + // Make BedGraph + let bedgraph = create_bedgraph_from_fragments( + io::Reader::new(utils::open_file_for_read(filename), None).into_records().map(Result::unwrap), + &chrom_sizes, + resolution as u64, + blacklist_regions, + normalization, + ignore_for_norm, + min_frag_length, + max_frag_length, + ); - create_bigwig_from_bedgraph(bedgraph, &chrom_sizes, filename.as_path())?; + match format { + CoverageOutputFormat::BedGraph => { + let mut writer = utils::open_file_for_write(&output, compression, compression_level)?; + bedgraph.into_iter().for_each(|x| writeln!(writer, "{}", x).unwrap()); + }, + CoverageOutputFormat::BigWig => { + create_bigwig_from_bedgraph(bedgraph, &chrom_sizes, &output)?; + }, + } - Ok((grp.to_string(), filename)) - }).collect() + Ok((grp.to_string(), output)) + }).progress_with_style(style).collect() + ) + } } #[derive(Debug, Clone, Copy)] @@ -294,13 +228,12 @@ fn create_bedgraph_from_fragments( fragments: I, chrom_sizes: &ChromSizes, bin_size: u64, - smooth_length: Option, blacklist_regions: Option<&BedTree<()>>, normalization: Option, ignore_for_norm: Option<&HashSet<&str>>, min_frag_length: Option, max_frag_length: Option, -) -> Vec> +) -> Vec> where I: Iterator, { @@ -323,7 +256,7 @@ where let norm_factor = match normalization { None => 1.0, - Some(Normalization::RPKM) => total_count * bin_size as f64 / 1e9, + Some(Normalization::RPKM) => total_count * bin_size as f32 / 1e9, Some(Normalization::CPM) => total_count / 1e6, Some(Normalization::BPM) => todo!(), Some(Normalization::RPGC) => todo!(), @@ -348,12 +281,12 @@ where /// Create a bigwig file from BedGraph records. fn create_bigwig_from_bedgraph>( mut bedgraph: Vec>, - chrom_sizes: &HashMap, + chrom_sizes: &ChromSizes, filename: P, ) -> Result<()> { // perform clipping to make sure the end of each region is within the range. bedgraph.iter_mut().group_by(|x| x.chrom().to_string()).into_iter().for_each(|(chr, groups)| { - let size = *chrom_sizes.get(&chr).expect(&format!("chromosome not found: {}", chr)) as u64; + let size = chrom_sizes.get(&chr).expect(&format!("chromosome not found: {}", chr)); let bed = groups.last().unwrap(); if bed.end() > size { bed.set_end(size); @@ -362,7 +295,7 @@ fn create_bigwig_from_bedgraph>( // write to bigwig file BigWigWrite::create_file(filename.as_ref().to_str().unwrap().to_string()).write( - chrom_sizes.clone(), + chrom_sizes.into_iter().map(|(k, v)| (k.to_string(), *v as u32)).collect(), bigtools::bbi::bedchromdata::BedParserStreamingIterator::new( BedParser::wrap_iter(bedgraph.into_iter().map(|x| { let val = bigtools::bbi::Value { diff --git a/snapatac2-core/src/preprocessing/bam.rs b/snapatac2-core/src/preprocessing/bam.rs index 9418b992b..b1478d7b2 100644 --- a/snapatac2-core/src/preprocessing/bam.rs +++ b/snapatac2-core/src/preprocessing/bam.rs @@ -3,6 +3,7 @@ pub use mark_duplicates::{filter_bam, group_bam_by_barcode, BarcodeLocation, Fla use bed_utils::bed::BEDLike; use noodles::{bam, sam::record::data::field::Tag}; +use indicatif::{style::ProgressStyle, ProgressBar, ProgressDrawTarget, ProgressIterator}; use regex::Regex; use anyhow::{Result, bail}; use std::{io::Write, path::Path}; @@ -94,6 +95,13 @@ pub fn make_fragment_file, P2: AsRef, P3: AsRef>( let mut output = open_file_for_write(output_file, compression, compression_level)?; + let spinner = ProgressBar::with_draw_target(None, ProgressDrawTarget::stderr_with_hz(1)) + .with_style( + ProgressStyle::with_template( + "{spinner} Wrote {human_pos} fragments in {elapsed} ({per_sec}) ...", + ) + .unwrap(), + ); let mut flagstat = FlagStat::default(); let filtered_records = filter_bam( reader.lazy_records().map(|x| x.unwrap()), @@ -110,6 +118,7 @@ pub fn make_fragment_file, P2: AsRef, P3: AsRef>( chunk_size, ) .into_fragments(&header) + .progress_with(spinner) .for_each(|mut rec| { if rec.strand().is_none() { let new_start = rec.start().saturating_add_signed(shift_left); diff --git a/snapatac2-python/snapatac2/export/__init__.py b/snapatac2-python/snapatac2/export/__init__.py index 71199d56e..8e0535bed 100644 --- a/snapatac2-python/snapatac2/export/__init__.py +++ b/snapatac2-python/snapatac2/export/__init__.py @@ -69,12 +69,11 @@ def export_fragments( adata, list(ids), list(groupby), out_dir, prefix, suffix, selections, compression, compression_level, ) -def export_bedgraph( +def export_coverage( adata: internal.AnnData | internal.AnnDataSet, groupby: str | list[str], selections: list[str] | None = None, - resolution: int = 1, - smooth_length: int | None = None, + bin_size: int = 10, blacklist: Path | None = None, normalization: Literal["RPKM", "CPM"] | None = "RPKM", ignore_for_norm: list[str] | None = None, @@ -83,11 +82,22 @@ def export_bedgraph( out_dir: Path = "./", prefix: str = "", suffix: str = ".bedgraph.zst", + output_format: Literal["bedgraph", "bigwig"] | None = None, compression: Literal["gzip", "zstandard"] | None = None, compression_level: int | None = None, tempdir: Path | None = None, + n_jobs: int = 8, ) -> dict[str, str]: - """Export and save fragments in a BED format file. + """Export and save coverage in a bedgraph or bigwig format file. + + This function generates coverage tracks (bigWig or bedGraph) for each group + of cells defined in `groupby`. The coverage is calculated as the number of reads + per bin, where bins are short consecutive counting windows of a defined size. + For paired-end data, the reads are extended to the fragment length and the + coverage is calculated as the number of fragments per bin. + There are several options for normalization. The default is RPKM, which + normalizes by the total number of reads and the length of the region. + The normalization can be disabled by setting `normalization=None`. Parameters ---------- @@ -99,19 +109,33 @@ def export_bedgraph( `.obs[groupby]`. selections Export only the selected groups. - ids - Cell ids add to the bed records. If `None`, `.obs_names` is used. + bin_size + Size of the bins, in bases, for the output of the bigwig/bedgraph file. + blacklist + A BED file containing the blacklisted regions. + normalization + Normalization method. If `None`, no normalization is performed. + ignore_for_norm + A list of chromosomes to ignore for normalization. + min_frag_length + Minimum fragment length to be included in the computation. + max_frag_length + Maximum fragment length to be included in the computation. out_dir Directory for saving the outputs. prefix Text added to the output file name. suffix Text added to the output file name. + output_format + Output format. If `None`, it is inferred from the suffix. compression Compression type. If `None`, it is inferred from the suffix. compression_level Compression level. 1-9 for gzip, 1-22 for zstandard. If `None`, it is set to 6 for gzip and 3 for zstandard. + n_jobs + Number of threads to use. If `<= 0`, use all available threads. Returns ------- @@ -124,16 +148,28 @@ def export_bedgraph( if selections is not None: selections = set(selections) + _suffix = suffix if compression is None: if suffix.endswith(".gz"): compression = "gzip" + _suffix = suffix[:-3] elif suffix.endswith(".zst"): compression = "zstandard" + _suffix = suffix[:-4] - return internal.export_bedgraph( - adata, list(groupby), resolution, out_dir, prefix, suffix, selections, - smooth_length, blacklist, normalization, ignore_for_norm, min_frag_length, - max_frag_length, compression, compression_level, tempdir + if output_format is None: + if suffix.endswith(".bw") or suffix.endswith(".bigwig"): + output_format = "bigwig" + elif _suffix.endswith(".bedgraph") or _suffix.endswith(".bg") or _suffix.endswith(".bdg"): + output_format = "bedgraph" + else: + raise ValueError("Cannot infer the output format from the suffix.") + + n_jobs = None if n_jobs <= 0 else n_jobs + return internal.export_coverage( + adata, list(groupby), bin_size, out_dir, prefix, suffix, output_format, selections, + blacklist, normalization, ignore_for_norm, min_frag_length, + max_frag_length, compression, compression_level, tempdir, n_jobs, ) def export_bigwig( @@ -145,39 +181,7 @@ def export_bigwig( prefix: str = "", suffix: str = ".bw", ) -> dict[str, str]: - """ - Export and create BigWig format files. - - Parameters - ---------- - adata - The (annotated) data matrix of shape `n_obs` x `n_vars`. - Rows correspond to cells and columns to regions. - groupby - Group the cells. If a `str`, groups are obtained from - `.obs[groupby]`. - selections - Export only the selected groups. - resolution - resolution. - out_dir - Directory for saving the outputs. - prefix - Text added to the output file name. - suffix - Text added to the output file name. - - Returns - ------- - dict[str, str] - A dictionary contains `(groupname, filename)` pairs. The file names are - formatted as `{prefix}{groupname}{suffix}`. - """ - if isinstance(groupby, str): - groupby = adata.obs[groupby] - if selections is not None: - selections = set(selections) - - return internal.export_bigwig( - adata, list(groupby), selections, resolution, out_dir, prefix, suffix, - ) \ No newline at end of file + import warnings + warnings.warn("Use `ex.export_coverage` instead.", DeprecationWarning) + return export_coverage(adata, groupby=groupby, selections=selections, resolution=resolution, + out_dir=out_dir, prefix=prefix, suffix=suffix, output_format="bigwig") \ No newline at end of file diff --git a/snapatac2-python/src/export.rs b/snapatac2-python/src/export.rs index d787a22e0..b0c831dcb 100644 --- a/snapatac2-python/src/export.rs +++ b/snapatac2-python/src/export.rs @@ -1,5 +1,5 @@ use crate::utils::AnnDataLike; -use snapatac2_core::{export::{Exporter, Normalization}, utils}; +use snapatac2_core::{export::{Exporter, Normalization, CoverageOutputFormat}, utils}; use std::ops::Deref; use anndata::Backend; @@ -34,15 +34,15 @@ pub fn export_fragments( } #[pyfunction] -pub fn export_bedgraph( +pub fn export_coverage( anndata: AnnDataLike, group_by: Vec<&str>, resolution: usize, dir: PathBuf, prefix: &str, suffix:&str, + output_format: &str, selections: Option>, - smooth_length: Option, blacklist: Option, normalization: Option<&str>, ignore_for_norm: Option>, @@ -51,6 +51,7 @@ pub fn export_bedgraph( compression: Option<&str>, compression_level: Option, temp_dir: Option, + num_threads: Option, ) -> Result> { let black: Option> = blacklist.map(|black| { Reader::new(utils::open_file_for_read(black), None) @@ -60,34 +61,17 @@ pub fn export_bedgraph( }); let normalization = normalization.map(|x| Normalization::from_str(x).unwrap()); + let output_format = CoverageOutputFormat::from_str(output_format).unwrap(); macro_rules! run { ($data:expr) => { - $data.export_bedgraph( - &group_by, selections, resolution, smooth_length, black.as_ref(), normalization, + $data.export_coverage( + &group_by, selections, resolution, black.as_ref(), normalization, ignore_for_norm.as_ref(), min_frag_length, max_frag_length, dir, prefix, - suffix, compression.map(|x| utils::Compression::from_str(x).unwrap()), compression_level, temp_dir + suffix, output_format, compression.map(|x| utils::Compression::from_str(x).unwrap()), + compression_level, temp_dir, num_threads, ) } } crate::with_anndata!(&anndata, run) -} - - -#[pyfunction] -pub fn export_bigwig( - anndata: AnnDataLike, - group_by: Vec<&str>, - selections: Option>, - resolution: usize, - dir: PathBuf, - prefix: &str, - suffix: &str, -) -> Result> { - macro_rules! run { - ($data:expr) => { - $data.export_bigwig(&group_by, selections, resolution, dir, prefix, suffix) - } - } - crate::with_anndata!(&anndata, run) } \ No newline at end of file diff --git a/snapatac2-python/src/lib.rs b/snapatac2-python/src/lib.rs index d3aed6024..c1455704a 100644 --- a/snapatac2-python/src/lib.rs +++ b/snapatac2-python/src/lib.rs @@ -50,8 +50,7 @@ fn _snapatac2(_py: Python, m: &PyModule) -> PyResult<()> { m.add_function(wrap_pyfunction!(preprocessing::fragment_size_distribution, m)?)?; m.add_function(wrap_pyfunction!(export::export_fragments, m)?)?; - m.add_function(wrap_pyfunction!(export::export_bedgraph, m)?)?; - m.add_function(wrap_pyfunction!(export::export_bigwig, m)?)?; + m.add_function(wrap_pyfunction!(export::export_coverage, m)?)?; m.add_function(wrap_pyfunction!(call_peaks::export_tags, m)?)?; m.add_function(wrap_pyfunction!(call_peaks::create_fwtrack_obj, m)?)?;