diff --git a/c/bam2bedgraph.c b/c/bam2bedgraph.c index b6eef25..f77444a 100644 --- a/c/bam2bedgraph.c +++ b/c/bam2bedgraph.c @@ -34,6 +34,7 @@ #include #include #include +#include "htslib/thread_pool.h" #include "bam_access.h" #include "utils.h" @@ -41,6 +42,7 @@ static char *input_file = NULL; static char *output_file = NULL; static char *region = NULL; static int filter = 0; +int nthreads = 0; // shared pool uint8_t is_overlap = 0; KHASH_MAP_INIT_STR(strh,uint8_t) @@ -49,19 +51,20 @@ void print_usage (int exit_code){ printf ("Usage: bam2bedgraph -i input.[cr|b]am -o file [-r region] [-h] [-v]\n\n"); printf ("Create a BEDGraph file of genomic coverage. BAM file must be sorted.\n"); - printf ("-i --input Path to bam/cram input file. [default: stdin]\n"); - printf ("-o --output File path for output. [default: stdout]\n\n"); + printf ("-i --input Path to bam/cram input file. [default: stdin]\n"); + printf ("-o --output File path for output. [default: stdout]\n\n"); printf ("Optional:\n"); - printf ("-r --region Region in bam to access.\n"); - printf ("-f --filter Ignore reads with the filter flags [int].\n"); - printf ("-a --overlap Use overlapping read check.\n\n"); + printf ("-r --region Region in bam to access.\n"); + printf ("-f --filter [int] Ignore reads with the filter flags.\n"); + printf ("-@ --num_threads [int] Use thread pool with specified number of threads.\n"); + printf ("-a --overlap Use overlapping read check.\n\n"); printf ("Other:\n"); - printf ("-h --help Display this usage information.\n"); - printf ("-v --version Prints the version number.\n\n"); + printf ("-h --help Display this usage information.\n"); + printf ("-v --version Prints the version number.\n\n"); exit(exit_code); } -void options(int argc, char *argv[]){ +int options(int argc, char *argv[]){ const struct option long_opts[] = { {"version", no_argument, 0, 'v'}, @@ -71,6 +74,7 @@ void options(int argc, char *argv[]){ {"filter",required_argument,0,'f'}, {"output",required_argument,0,'o'}, {"overlap", no_argument, 0, 'a'}, + {"num_threads",required_argument,0,'@'}, {"rna",no_argument,0, 'a'}, { NULL, 0, NULL, 0} @@ -80,7 +84,7 @@ void options(int argc, char *argv[]){ int iarg = 0; //Iterate through options - while((iarg = getopt_long(argc, argv, "i:o:r:f:avh", long_opts, &index)) != -1){ + while((iarg = getopt_long(argc, argv, "i:o:r:f:@:avh", long_opts, &index)) != -1){ switch(iarg){ case 'i': input_file = optarg; @@ -95,10 +99,7 @@ void options(int argc, char *argv[]){ break; case 'f': - if(sscanf(optarg, "%i", &filter) != 1){ - printf("Error parsing -f argument '%s'. Should be an integer > 0",optarg); - print_usage(1); - } + check(sscanf(optarg, "%i", &filter) == 1, "Error parsing -f argument '%s'. Should be an integer > 0",optarg); break; case 'a': @@ -129,16 +130,16 @@ void options(int argc, char *argv[]){ input_file = "-"; // htslib recognises this as a special case } if (strcmp(input_file,"-") != 0) { - if(check_exist(input_file) != 1){ - printf("Input file (-i) %s does not exist.\n",input_file); - print_usage(1); - } + check(check_exist(input_file) == 1, "Input file (-i) %s does not exist.\n",input_file); } if (output_file==NULL || strcmp(output_file,"/dev/stdout")==0) { output_file = "-"; // we recognise this as a special case } - return; + return 0; +error: + print_usage(1); + return -1; } // callback for bam_plbuf_init() @@ -203,7 +204,8 @@ static int pileup_func_overlap(uint32_t tid, uint32_t position, int n, const bam } int main(int argc, char *argv[]){ - options(argc, argv); + int chk_opt = options(argc, argv); + check(chk_opt==0,"Error parsing options."); tmpstruct_t tmp; FILE *out = NULL; if (strcmp(output_file,"-")==0) { @@ -221,10 +223,10 @@ int main(int argc, char *argv[]){ func_reg = &pileup_func_overlap; } if(region == NULL){ - check = process_bam_file(input_file, func, &tmp, filter, NULL); + check = process_bam_file(input_file, func, &tmp, filter, nthreads,NULL); check(check==1,"Error parsing bam file."); }else{ - check = process_bam_region(input_file, func_reg, &tmp, filter, region, NULL); + check = process_bam_region(input_file, func_reg, &tmp, filter, region, nthreads, NULL); check(check==1,"Error parsing bam region."); } fprintf(out,"%s\t%d\t%d\t%d\n", tmp.head->target_name[tmp.ltid], tmp.lstart,tmp.lpos+1, tmp.lcoverage); diff --git a/c/bam2bw.c b/c/bam2bw.c index 4074bfc..97b0df3 100644 --- a/c/bam2bw.c +++ b/c/bam2bw.c @@ -54,6 +54,7 @@ int filter = 4; char base = 0; uint8_t is_overlap = 0; int include_zeroes = 0; +int nthreads = 0; // shared pool uint32_t single = 1; char *last_contig = ""; @@ -67,6 +68,7 @@ void print_usage (int exit_code){ printf("-c --region [file] A samtools style region (contig:start-stop) or a bed file of regions over which to produce the bigwig file\n"); printf("-z --include-zeroes Include zero coverage regions as additional entries to the bw file\n"); printf("-r --reference [file] Path to reference genome.fa file (required for cram if ref_path cannot be resolved)\n"); + printf("-@ --num_threads [int] Use thread pool with specified number of threads.\n"); printf("-a --overlap Use overlapping read check\n\n"); printf ("Other:\n"); printf("-h --help Display this usage information.\n"); @@ -93,7 +95,7 @@ int get_int_length(int input){ return (input == 0 ? 1 : (int)(log10(input)+1)); } -void setup_options(int argc, char *argv[]){ +int setup_options(int argc, char *argv[]){ const struct option long_opts[] = { {"input", required_argument, 0, 'i'}, @@ -103,6 +105,7 @@ void setup_options(int argc, char *argv[]){ {"reference",required_argument, 0, 'r'}, {"include-zeroes",no_argument, 0, 'z'}, {"overlap", no_argument, 0, 'a'}, + {"num_threads",required_argument,0,'@'}, {"help", no_argument, 0, 'h'}, {"version", no_argument, 0, 'v'}, { NULL, 0, NULL, 0} @@ -113,24 +116,21 @@ void setup_options(int argc, char *argv[]){ int iarg = 0; //Iterate through options - while((iarg = getopt_long(argc, argv, "F:i:o:c:r:azhv",long_opts, &index)) != -1){ + while((iarg = getopt_long(argc, argv, "F:i:o:c:r:@:azhv",long_opts, &index)) != -1){ switch(iarg){ case 'F': - if(sscanf(optarg, "%i", &filter) != 1){ - fprintf(stderr,"Error parsing -F|--filter argument '%s'. Should be an integer > 0",optarg); - print_usage(1); - } + check(sscanf(optarg, "%i", &filter)==1,"Error parsing -F|--filter argument '%s'. Should be an integer > 0",optarg); break; case 'i': input_file = optarg; - if(check_exist(input_file) != 1){ - fprintf(stderr,"Input bam file %s does not appear to exist.\n",input_file); - print_usage(1); - } + check(check_exist(input_file)==1, "Input bam file %s does not appear to exist.\n",input_file); break; case 'o': out_file = optarg; break; + case '@': + check(sscanf(optarg, "%i", &nthreads)==1, "Error parsing -@ argument '%s'. Should be an integer > 0", optarg); + break; case 'h': print_usage (0); break; @@ -141,10 +141,7 @@ void setup_options(int argc, char *argv[]){ region_store = optarg; //First check for a region format int res = check_region_string(region_store); - if(res<0){ - fprintf(stderr,"Region %s is not in correct format or not an existing bed file.\n",region_store); - print_usage(1); - } + check(res>=0,"Region %s is not in correct format or not an existing bed file.\n",region_store); break; case 'r': reference = optarg; @@ -164,17 +161,16 @@ void setup_options(int argc, char *argv[]){ }//End of iteration through options - if(input_file==NULL){ - fprintf(stderr,"Required option -i|--input-bam not defined.\n"); - print_usage(1); - } + check(input_file!=NULL, "Required option -i|--input-bam not defined.\n"); - if(is_base && region_store==NULL){ - fprintf(stderr,"Option -r|--region must be used with the -b|--base option.\n"); - print_usage(1); - } + if(is_base){ + check(region_store!=NULL, "Option -r|--region must be used with the -b|--base option.\n"); + } - return; + return 0; +error: + print_usage(1); + return -1; } // callback for bam_plbuf_init() @@ -301,7 +297,8 @@ uint32_t getContigLength(char *contig,chromList_t *chromList){ } int main(int argc, char *argv[]){ - setup_options(argc, argv); + int check_opt = setup_options(argc, argv); + check(check_opt==0,"Error parsing options."); tmpstruct_t tmp; int no_of_regions = 0; @@ -350,8 +347,7 @@ int main(int argc, char *argv[]){ char *contig = malloc(sizeof(char)*1024); check_mem(contig); int beg,end; - int chk = sscanf(region_store,"%[^:]:%d-%d",contig,&beg,&end); - check(chk==3,"Error reading line '%s' from regions bed file.",region_store); + check(sscanf(region_store,"%[^:]:%d-%d",contig,&beg,&end)==3,"Error reading line '%s' from regions bed file.",region_store); //Attempt to add contig as key to hash int absent; @@ -435,7 +431,7 @@ int main(int argc, char *argv[]){ } int i=0; for(i=0;i 0",optarg); - print_usage(1); - } + check(sscanf(optarg, "%i", &filter)==1, "Error parsing -F|--filter argument '%s'. Should be an integer > 0",optarg); break; case 'i': input_file = optarg; - if(check_exist(input_file) != 1){ - fprintf(stderr,"Input bam file %s does not appear to exist.\n",input_file); - print_usage(1); - } + check(check_exist(input_file)==1, "Input bam file %s does not appear to exist.\n",input_file); break; case 'o': out_file = optarg; break; + case '@': + check(sscanf(optarg, "%i", &nthreads)==1, "Error parsing -@ argument '%s'. Should be an integer > 0", optarg); + break; case 'h': print_usage (0); break; @@ -137,10 +137,7 @@ void setup_options(int argc, char *argv[]){ region_store = optarg; //First check for a region format int res = check_region_string(region_store); - if(res<0){ - fprintf(stderr,"Region %s is not in correct format or not an existing bed file.\n",region_store); - print_usage(1); - } + check(res>=0, "Region %s is not in correct format or not an existing bed file.\n",region_store); break; case 'a': is_overlap = 1; @@ -157,15 +154,17 @@ void setup_options(int argc, char *argv[]){ }//End of iteration through options - if(input_file==NULL){ - fprintf(stderr,"Required option -i|--input-bam not defined.\n"); - print_usage(1); - } + check(input_file!=NULL, "Required option -i|--input-bam not defined.\n"); if(out_file == NULL){ out_file = "output.bam.bw"; } - return; + + return 0; + +error: + print_usage (1); + return -1; } // callback for bam_plbuf_init() @@ -296,7 +295,8 @@ bigWigFile_t *initialise_bw_output(char *out_file, chromList_t *chromList){ } int main(int argc, char *argv[]){ - setup_options(argc, argv); + int chk_opt = setup_options(argc, argv); + check(chk_opt==0,"Error parsing options"); tmpstruct_t *perbase = NULL; int no_of_regions = 0; int sq_lines = get_no_of_SQ_lines(input_file); @@ -435,7 +435,7 @@ int main(int argc, char *argv[]){ perbase[k].reg_start = sta; perbase[k].reg_stop = sto; } - chck = process_bam_region_bases(input_file, func_reg, perbase, filter, our_region_list[i], reference); + chck = process_bam_region_bases(input_file, func_reg, perbase, filter, our_region_list[i], nthreads, reference); check(chck==1,"Error processing bam region."); int b=0; for(b=0;b<4;b++){ diff --git a/c/bam_access.c b/c/bam_access.c index 3ce0a04..135ce10 100644 --- a/c/bam_access.c +++ b/c/bam_access.c @@ -240,11 +240,12 @@ int get_no_of_SQ_lines(char *bam_loc){ return -1; } -int process_bam_file(char *input_file, bw_func pileup_func, tmpstruct_t *tmp, int filter, char *reference){ +int process_bam_file(char *input_file, bw_func pileup_func, tmpstruct_t *tmp, int filter, int nthreads, char *reference){ bam_plp_t buf = NULL; bam1_t *b = NULL; hts_itr_t *iter = NULL; + htsThreadPool p = {NULL, 0}; tmp->beg = 0; tmp->end = 0x7fffffff; tmp->lstart = 0; tmp->lcoverage = 0; @@ -258,6 +259,15 @@ int process_bam_file(char *input_file, bw_func pileup_func, tmpstruct_t *tmp, in check(check==0,"Error setting reference %s for hts file %s.",reference,input_file); } tmp->head = sam_hdr_read(tmp->in); + check(tmp->head!=NULL,"Error getting header from file %s.",input_file); + + // Create and share the thread pool + if (nthreads > 0) { + p.pool = hts_tpool_init(nthreads); + check(p.pool != NULL, "Error creating thread pool"); + hts_set_opt(tmp->in, HTS_OPT_THREAD_POOL, &p); + } + buf = bam_plp_init(0,0); //Iterate through each read in bam file. b = bam_init1(); @@ -285,6 +295,7 @@ int process_bam_file(char *input_file, bw_func pileup_func, tmpstruct_t *tmp, in bam_destroy1(b); if(iter) sam_itr_destroy(iter); + if (p.pool) hts_tpool_destroy(p.pool); return 1; error: @@ -293,16 +304,18 @@ int process_bam_file(char *input_file, bw_func pileup_func, tmpstruct_t *tmp, in if(tmp->in) hts_close(tmp->in); if(tmp->head) bam_hdr_destroy(tmp->head); if(tmp->in) hts_close(tmp->in); + if (p.pool) hts_tpool_destroy(p.pool); return -1; } -int process_bam_region_bases(char *input_file, bw_func_reg perbase_pileup_func, tmpstruct_t *perbase, int filter, char *region, char *reference){ +int process_bam_region_bases(char *input_file, bw_func_reg perbase_pileup_func, tmpstruct_t *perbase, int filter, char *region, int nthreads, char *reference){ bam_plp_t buf = NULL; bam1_t *b = NULL; hts_itr_t *iter = NULL; htsFile *file = NULL; hts_idx_t *idx = NULL; bam_hdr_t *head = NULL; + htsThreadPool p = {NULL, 0}; file= hts_open(input_file, "r"); check(file!=0,"Fail to open [CR|B]AM file %s\n", input_file); if(reference){ @@ -310,6 +323,15 @@ int process_bam_region_bases(char *input_file, bw_func_reg perbase_pileup_func, check(check==0,"Error setting reference %s for hts file %s.",reference,input_file); } head = sam_hdr_read(file); + check(head!=NULL,"Error getting header from file %s.",input_file); + + // Create and share the thread pool + if (nthreads > 0) { + p.pool = hts_tpool_init(nthreads); + check(p.pool != NULL, "Error creating thread pool"); + hts_set_opt(file, HTS_OPT_THREAD_POOL, &p); + } + buf = bam_plp_init(0,0); //Iterate through each read in bam file. b = bam_init1(); @@ -381,20 +403,23 @@ int process_bam_region_bases(char *input_file, bw_func_reg perbase_pileup_func, if(iter) sam_itr_destroy(iter); if(idx) hts_idx_destroy(idx); if(file) hts_close(file); + if (p.pool) hts_tpool_destroy(p.pool); return 1; error: if(iter) sam_itr_destroy(iter); if(idx) hts_idx_destroy(idx); if(file) hts_close(file); + if (p.pool) hts_tpool_destroy(p.pool); if(head) bam_hdr_destroy(head); return -1; } -int process_bam_region(char *input_file, bw_func_reg pileup_func, tmpstruct_t *tmp, int filter, char *region, char *reference){ +int process_bam_region(char *input_file, bw_func_reg pileup_func, tmpstruct_t *tmp, int filter, char *region, int nthreads, char *reference){ bam_plp_t buf = NULL; bam1_t *b = NULL; hts_itr_t *iter = NULL; + htsThreadPool p = {NULL, 0}; tmp->beg = 0; tmp->end = 0x7fffffff; tmp->lstart = 0; tmp->lcoverage = 0; @@ -408,6 +433,16 @@ int process_bam_region(char *input_file, bw_func_reg pileup_func, tmpstruct_t *t check(check==0,"Error setting reference %s for hts file %s.",reference,input_file); } tmp->head = sam_hdr_read(tmp->in); + + check(tmp->head!=NULL,"Error getting header from file %s.",input_file); + + // Create and share the thread pool + if (nthreads > 0) { + p.pool = hts_tpool_init(nthreads); + check(p.pool != NULL, "Error creating thread pool"); + hts_set_opt(tmp->in, HTS_OPT_THREAD_POOL, &p); + } + buf = bam_plp_init(0,0); //Iterate through each read in bam file. b = bam_init1(); @@ -465,11 +500,13 @@ int process_bam_region(char *input_file, bw_func_reg pileup_func, tmpstruct_t *t if(iter) sam_itr_destroy(iter); if(tmp->idx) hts_idx_destroy(tmp->idx); if(tmp->in) hts_close(tmp->in); + if (p.pool) hts_tpool_destroy(p.pool); return 1; error: if(iter) sam_itr_destroy(iter); if(tmp->idx) hts_idx_destroy(tmp->idx); if(tmp->in) hts_close(tmp->in); + if (p.pool) hts_tpool_destroy(p.pool); if(tmp->head) bam_hdr_destroy(tmp->head); return -1; } diff --git a/c/bam_access.h b/c/bam_access.h index a08556b..91907a5 100644 --- a/c/bam_access.h +++ b/c/bam_access.h @@ -34,6 +34,7 @@ #define _bam_access_h #include "dbg.h" +#include "htslib/thread_pool.h" #include "htslib/sam.h" #include "khash.h" #include "bigWig.h" @@ -59,11 +60,11 @@ typedef int (*bw_func)(uint32_t tid, uint32_t position, int n, const bam_pileup1 typedef int (*bw_func_reg)(uint32_t tid, uint32_t position, int n, const bam_pileup1_t *pl, void *data, uint32_t reg_start); -int process_bam_file(char *input_file, bw_func func, tmpstruct_t *tmp, int filter, char *ref); +int process_bam_file(char *input_file, bw_func func, tmpstruct_t *tmp, int filter, int nthreads, char *ref); -int process_bam_region(char *input_file, bw_func_reg func, tmpstruct_t *tmp, int filter, char *region, char *ref); +int process_bam_region(char *input_file, bw_func_reg func, tmpstruct_t *tmp, int filter, char *region, int nthreads, char *ref); -int process_bam_region_bases(char *input_file, bw_func_reg func, tmpstruct_t *tmp, int filter, char *region, char *ref); +int process_bam_region_bases(char *input_file, bw_func_reg func, tmpstruct_t *tmp, int filter, char *region, int nthreads, char *ref); int get_no_of_SQ_lines(char *input_file);