Skip to content

Commit

Permalink
add ex.export_coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
kaizhang committed Nov 5, 2023
1 parent 6455ab3 commit 156c6b7
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 202 deletions.
2 changes: 1 addition & 1 deletion docs/api/export.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ Exporting: `ex`
:toctree: _autosummary

ex.export_fragments
ex.export_bigwig
ex.export_coverage
2 changes: 2 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

- Support anndata v0.10.
- Make it easier to build custom genome objects.
- Add `ex.export_coverage` to export coverage tracks as bedGraph or bigWig files.
This deprecates `ex.export_bigwig`.

### Bugs fixed:

Expand Down
189 changes: 61 additions & 128 deletions snapatac2-core/src/export.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::{preprocessing::{count_data::{SnapData, GenomeCoverage, CoverageType, ChromSizes}, Fragment}, utils::{self, Compression}};
use crate::{preprocessing::{count_data::{SnapData, ChromSizes}, Fragment}, utils::{self, Compression}};

use anyhow::{Context, Result, ensure};
use itertools::Itertools;
use log::info;
use std::{
sync::{Arc, Mutex}, io::Write,
path::{Path, PathBuf}, collections::{BTreeMap, HashMap, HashSet},
path::{Path, PathBuf}, collections::{HashMap, HashSet},
};
use rayon::iter::{ParallelBridge, ParallelIterator, IntoParallelIterator};
use bed_utils::bed::{
Expand All @@ -17,6 +17,23 @@ use futures::executor::ThreadPool;
use indicatif::{ProgressIterator, style::ProgressStyle, ParallelProgressIterator};
use tempfile::Builder;

#[derive(Debug, Clone, Copy)]
pub enum CoverageOutputFormat {
BedGraph,
BigWig,
}

impl std::str::FromStr for CoverageOutputFormat {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_uppercase().as_str() {
"BEDGRAPH" => Ok(CoverageOutputFormat::BedGraph),
"BIGWIG" => Ok(CoverageOutputFormat::BigWig),
_ => Err(format!("unknown output format: {}", s)),
}
}
}

impl<T> Exporter for T where T: SnapData {}

pub trait Exporter: SnapData {
Expand Down Expand Up @@ -76,12 +93,11 @@ pub trait Exporter: SnapData {
Ok(files.into_iter().map(|(k, (v, _))| (k.to_string(), v)).collect())
}

fn export_bedgraph<P: AsRef<Path> + std::marker::Sync>(
fn export_coverage<P: AsRef<Path> + std::marker::Sync>(
&self,
group_by: &Vec<&str>,
selections: Option<HashSet<&str>>,
resolution: usize,
smooth_length: Option<u64>,
blacklist_regions: Option<&BedTree<()>>,
normalization: Option<Normalization>,
ignore_for_norm: Option<&HashSet<&str>>,
Expand All @@ -90,9 +106,11 @@ pub trait Exporter: SnapData {
dir: P,
prefix: &str,
suffix:&str,
format: CoverageOutputFormat,
compression: Option<Compression>,
compression_level: Option<u32>,
temp_dir: Option<P>,
num_threads: Option<usize>,
) -> Result<HashMap<String, PathBuf>> {
// Create directory
std::fs::create_dir_all(&dir)
Expand All @@ -115,132 +133,48 @@ pub trait Exporter: SnapData {


let chrom_sizes = self.read_chrom_sizes()?;
info!("Export fragments as bedgraph files...");
info!("Creating coverage files...");
let style = ProgressStyle::with_template(
"[{elapsed}] {bar:40.cyan/blue} {pos:>7}/{len:7} (eta: {eta})"
).unwrap();
fragment_files.into_iter().collect::<Vec<_>>().into_par_iter().map(|(grp, filename)| {
let output = dir.as_ref().join(
prefix.to_string() + grp.replace("/", "+").as_str() + suffix
);
let mut writer = utils::open_file_for_write(&output, compression, compression_level)?;

// Make BedGraph
create_bedgraph_from_fragments(
io::Reader::new(utils::open_file_for_read(filename), None).into_records().map(Result::unwrap),
&chrom_sizes,
resolution as u64,
smooth_length,
blacklist_regions,
normalization,
ignore_for_norm,
min_frag_length,
max_frag_length,
).into_iter().for_each(|x| writeln!(writer, "{}", x).unwrap());

Ok((grp.to_string(), output))
}).progress_with_style(style).collect()
}

fn export_bigwig<P: AsRef<Path>>(
&self,
group_by: &Vec<&str>,
selections: Option<HashSet<&str>>,
resolution: usize,
dir: P,
prefix: &str,
suffix:&str,
) -> Result<HashMap<String, PathBuf>> {
export_insertions_as_bigwig(
self.get_count_iter(500)?, group_by, selections, resolution, dir, prefix, suffix,
)
}
}

/// Export TN5 insertions as bigwig files
///
/// # Arguments
///
/// * `insertions` - TN5 insertion matrix
/// * `genome_index` -
/// * `chrom_sizes` -
fn export_insertions_as_bigwig<P, I>(
mut coverage: GenomeCoverage<I>,
group_by: &Vec<&str>,
selections: Option<HashSet<&str>>,
resolution: usize,
dir: P,
prefix: &str,
suffix:&str,
) -> Result<HashMap<String, PathBuf>>
where
I: ExactSizeIterator<Item = (CoverageType, usize, usize)>,
P: AsRef<Path>,
{
// Create directory
std::fs::create_dir_all(&dir)
.with_context(|| format!("cannot create directory: {}", dir.as_ref().display()))?;

let style = ProgressStyle::with_template(
"[{elapsed}] {bar:40.cyan/blue} {pos:>7}/{len:7} (eta: {eta})"
).unwrap();

coverage = coverage.with_resolution(resolution);
let index = coverage.get_gindex();
let chrom_sizes: HashMap<String, u32> = index.chrom_sizes().map(|(k, v)| (k.to_string(), v as u32)).collect();

// Collect groups
let mut groups: HashSet<&str> = group_by.iter().map(|x| *x).unique().collect();
if let Some(select) = selections { groups.retain(|x| select.contains(x)); }

// Collect counts
let mut counts: HashMap<&str, BTreeMap<usize, u32>> =
groups.into_iter().map(|grp| (grp, BTreeMap::new())).collect();
info!("Compute coverage for {} groups...", counts.len());
coverage.into_values::<u32>().progress_with_style(style.clone()).for_each(|(csr, start, end)|
(start..end).zip(csr.row_iter()).for_each(|(row_idx, row)|
if let Some(count) = counts.get_mut(group_by[row_idx]) {
row.col_indices().iter().zip(row.values()).for_each(|(i, v)|
*count.entry(*i).or_insert(0) += *v
let pool = if let Some(n) = num_threads {
rayon::ThreadPoolBuilder::new().num_threads(n)
} else {
rayon::ThreadPoolBuilder::new()
};
pool.build().unwrap().install(||
fragment_files.into_iter().collect::<Vec<_>>().into_par_iter().map(|(grp, filename)| {
let output = dir.as_ref().join(
prefix.to_string() + grp.replace("/", "+").as_str() + suffix
);
}
)
);

// Exporting
info!("Exporting bigwig files...");
counts.into_iter().progress_with_style(style).map(|(grp, count)| {
let filename = dir.as_ref().join(
prefix.to_string() + grp.replace("/", "+").as_str() + suffix
);

// compute normalization factor
let total_count: f64 = count.values().map(|x| *x as f64).sum();
let norm_factor = total_count * resolution as f64 / 1e9;

// Make BedGraph
let bedgraph: Vec<BedGraph<f32>> = count.into_iter().map(|(k, v)| {
let mut region = index.get_region(k);
region.set_end(region.start() + resolution as u64);
BedGraph::from_bed(&region, (v as f64 / norm_factor) as f32)
}).group_by(|x| x.value).into_iter().flat_map(|(_, groups)|
merge_sorted_bed_with(
groups,
|beds| {
let mut iter = beds.into_iter();
let mut first = iter.next().unwrap();
if let Some(last) = iter.last() {
first.set_end(last.end());
}
first
},
)
).collect();
// Make BedGraph
let bedgraph = create_bedgraph_from_fragments(
io::Reader::new(utils::open_file_for_read(filename), None).into_records().map(Result::unwrap),
&chrom_sizes,
resolution as u64,
blacklist_regions,
normalization,
ignore_for_norm,
min_frag_length,
max_frag_length,
);

create_bigwig_from_bedgraph(bedgraph, &chrom_sizes, filename.as_path())?;
match format {
CoverageOutputFormat::BedGraph => {
let mut writer = utils::open_file_for_write(&output, compression, compression_level)?;
bedgraph.into_iter().for_each(|x| writeln!(writer, "{}", x).unwrap());
},
CoverageOutputFormat::BigWig => {
create_bigwig_from_bedgraph(bedgraph, &chrom_sizes, &output)?;
},
}

Ok((grp.to_string(), filename))
}).collect()
Ok((grp.to_string(), output))
}).progress_with_style(style).collect()
)
}
}

#[derive(Debug, Clone, Copy)]
Expand Down Expand Up @@ -294,13 +228,12 @@ fn create_bedgraph_from_fragments<I>(
fragments: I,
chrom_sizes: &ChromSizes,
bin_size: u64,
smooth_length: Option<u64>,
blacklist_regions: Option<&BedTree<()>>,
normalization: Option<Normalization>,
ignore_for_norm: Option<&HashSet<&str>>,
min_frag_length: Option<u64>,
max_frag_length: Option<u64>,
) -> Vec<BedGraph<f64>>
) -> Vec<BedGraph<f32>>
where
I: Iterator<Item = Fragment>,
{
Expand All @@ -323,7 +256,7 @@ where

let norm_factor = match normalization {
None => 1.0,
Some(Normalization::RPKM) => total_count * bin_size as f64 / 1e9,
Some(Normalization::RPKM) => total_count * bin_size as f32 / 1e9,
Some(Normalization::CPM) => total_count / 1e6,
Some(Normalization::BPM) => todo!(),
Some(Normalization::RPGC) => todo!(),
Expand All @@ -348,12 +281,12 @@ where
/// Create a bigwig file from BedGraph records.
fn create_bigwig_from_bedgraph<P: AsRef<Path>>(
mut bedgraph: Vec<BedGraph<f32>>,
chrom_sizes: &HashMap<String, u32>,
chrom_sizes: &ChromSizes,
filename: P,
) -> Result<()> {
// perform clipping to make sure the end of each region is within the range.
bedgraph.iter_mut().group_by(|x| x.chrom().to_string()).into_iter().for_each(|(chr, groups)| {
let size = *chrom_sizes.get(&chr).expect(&format!("chromosome not found: {}", chr)) as u64;
let size = chrom_sizes.get(&chr).expect(&format!("chromosome not found: {}", chr));
let bed = groups.last().unwrap();
if bed.end() > size {
bed.set_end(size);
Expand All @@ -362,7 +295,7 @@ fn create_bigwig_from_bedgraph<P: AsRef<Path>>(

// write to bigwig file
BigWigWrite::create_file(filename.as_ref().to_str().unwrap().to_string()).write(
chrom_sizes.clone(),
chrom_sizes.into_iter().map(|(k, v)| (k.to_string(), *v as u32)).collect(),
bigtools::bbi::bedchromdata::BedParserStreamingIterator::new(
BedParser::wrap_iter(bedgraph.into_iter().map(|x| {
let val = bigtools::bbi::Value {
Expand Down
9 changes: 9 additions & 0 deletions snapatac2-core/src/preprocessing/bam.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub use mark_duplicates::{filter_bam, group_bam_by_barcode, BarcodeLocation, Fla

use bed_utils::bed::BEDLike;
use noodles::{bam, sam::record::data::field::Tag};
use indicatif::{style::ProgressStyle, ProgressBar, ProgressDrawTarget, ProgressIterator};
use regex::Regex;
use anyhow::{Result, bail};
use std::{io::Write, path::Path};
Expand Down Expand Up @@ -94,6 +95,13 @@ pub fn make_fragment_file<P1: AsRef<Path>, P2: AsRef<Path>, P3: AsRef<Path>>(

let mut output = open_file_for_write(output_file, compression, compression_level)?;

let spinner = ProgressBar::with_draw_target(None, ProgressDrawTarget::stderr_with_hz(1))
.with_style(
ProgressStyle::with_template(
"{spinner} Wrote {human_pos} fragments in {elapsed} ({per_sec}) ...",
)
.unwrap(),
);
let mut flagstat = FlagStat::default();
let filtered_records = filter_bam(
reader.lazy_records().map(|x| x.unwrap()),
Expand All @@ -110,6 +118,7 @@ pub fn make_fragment_file<P1: AsRef<Path>, P2: AsRef<Path>, P3: AsRef<Path>>(
chunk_size,
)
.into_fragments(&header)
.progress_with(spinner)
.for_each(|mut rec| {
if rec.strand().is_none() {
let new_start = rec.start().saturating_add_signed(shift_left);
Expand Down
Loading

0 comments on commit 156c6b7

Please sign in to comment.