From a5ed112a0785b00164bd325da10dad66680cddee Mon Sep 17 00:00:00 2001 From: yhoogstrate Date: Tue, 4 Mar 2014 11:26:04 +0100 Subject: [PATCH 1/2] Update bam_plcmd.c 1) Fixed the mixed output issue. 2) Improved speed --- samtools-0.1.19/bam_plcmd.c | 249 +++++++++++++++++++++++++++--------- 1 file changed, 186 insertions(+), 63 deletions(-) diff --git a/samtools-0.1.19/bam_plcmd.c b/samtools-0.1.19/bam_plcmd.c index 7bce5d5..1107285 100644 --- a/samtools-0.1.19/bam_plcmd.c +++ b/samtools-0.1.19/bam_plcmd.c @@ -27,6 +27,35 @@ static inline int printw(int c, FILE *fp) return 0; } + +/** + * @section DESCRIPTION + * A partial port of "printw"-function without the flexibility to change + * output stream. This function seems useless for the case where its + * used because stdout was hardcoded. + * + * @author Youri Hoogstrate + * + * @date 2014-mar-03 + */ +static inline int kprintw(int c, kstring_t *stdout_buffer_ref) +{ + char buf[16]; + int l, x; + if (c == 0) { + return kputc('0', stdout_buffer_ref);//replaces: return fputc('0', fp); + } + for (l = 0, x = c < 0? -c : c; x > 0; x /= 10) buf[l++] = x%10 + '0'; + if (c < 0) buf[l++] = '-'; + buf[l] = 0; + for (x = 0; x < l/2; ++x) { + int y = buf[x]; + buf[x] = buf[l-1-x]; buf[l-1-x] = y; + } + kputs(buf, stdout_buffer_ref);//replaces: fputs(buf, fp); + return 0; +} + static inline void pileup_seq(const bam_pileup1_t *p, int pos, int ref_len, const char *ref) { int j; @@ -62,6 +91,58 @@ static inline void pileup_seq(const bam_pileup1_t *p, int pos, int ref_len, cons if (p->is_tail) putchar('$'); } + +/** + * @section DESCRIPTION + * A port of "pileup_seq"-function able to handle the kstring_t + * structure. + * + * @author Youri Hoogstrate + * + * @date 2014-mar-03 + */ +static inline void kpileup_seq(const bam_pileup1_t *p, int pos, int ref_len, const char *ref, kstring_t *stdout_buffer_ref) +{ + int j; + if (p->is_head) { + kputc('^', stdout_buffer_ref);//replaces: putchar('^'); + kputc((p->b->core.qual > 93? 126 : p->b->core.qual + 33), stdout_buffer_ref);//replaces: putchar(p->b->core.qual > 93? 126 : p->b->core.qual + 33); + } + if (!p->is_del) { + int c = bam_nt16_rev_table[bam1_seqi(bam1_seq(p->b), p->qpos)]; + if (ref) { + int rb = pos < ref_len? ref[pos] : 'N'; + if (c == '=' || bam_nt16_table[c] == bam_nt16_table[rb]) c = bam1_strand(p->b)? ',' : '.'; + else c = bam1_strand(p->b)? tolower(c) : toupper(c); + } else { + if (c == '=') c = bam1_strand(p->b)? ',' : '.'; + else c = bam1_strand(p->b)? tolower(c) : toupper(c); + } + kputc(c, stdout_buffer_ref);// replaces: putchar(c); + } + else { + kputc(p->is_refskip? (bam1_strand(p->b)? '<' : '>') : '*', stdout_buffer_ref);//replaces: putchar(p->is_refskip? (bam1_strand(p->b)? '<' : '>') : '*'); + } + if (p->indel > 0) { + kputc('+', stdout_buffer_ref);// replaces: putchar('+'); + kprintw(p->indel, stdout_buffer_ref); //replaces: printw(p->indel, stdout); + for (j = 1; j <= p->indel; ++j) { + int c = bam_nt16_rev_table[bam1_seqi(bam1_seq(p->b), p->qpos + j)]; + kputc(putchar(bam1_strand(p->b)? tolower(c) : toupper(c)), stdout_buffer_ref);//replaces: putchar(bam1_strand(p->b)? tolower(c) : toupper(c)); + } + } else if (p->indel < 0) { + kprintw(p->indel, stdout_buffer_ref); //replaces: printw(p->indel, stdout); + for (j = 1; j <= -p->indel; ++j) { + int c = (ref && (int)pos+j < ref_len)? ref[pos+j] : 'N'; + kputc(bam1_strand(p->b)? tolower(c) : toupper(c), stdout_buffer_ref);//replaces: putchar(bam1_strand(p->b)? tolower(c) : toupper(c)); + } + } + if (p->is_tail) { + kputc('$', stdout_buffer_ref);//replaces: putchar('$'); + } +} + + #include #include "bam2bcf.h" #include "sample.h" @@ -253,6 +334,9 @@ void * mpileup_kern ( int max_indel_depth = params->max_indel_depth; const void *rghash = params->rghash; + kstring_t stdout_buffer; + stdout_buffer.l = stdout_buffer.m = 0; stdout_buffer.s = 0; + static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER; n_plp = calloc(n, sizeof(int)); @@ -276,7 +360,7 @@ void * mpileup_kern ( iter = bam_mplp_init(n, mplp_func, (void**)data); bam_mplp_set_maxcnt(iter, 8000); - + memset(&buf, 0, sizeof(kstring_t)); memset(&bc, 0, sizeof(bcf_call_t)); while (bam_mplp_auto(iter, &tid, &pos, n_plp, plp) > 0) { @@ -323,63 +407,97 @@ void * mpileup_kern ( } } } else { - pthread_mutex_lock(&write_lock); - printf("%s\t%d\t%c", h->target_name[tid], pos + 1, (ref && pos < ref_len)? ref[pos] : 'N'); - pthread_mutex_unlock(&write_lock); - for (i = 0; i < n; ++i) { - int j, cnt; - for (j = cnt = 0; j < n_plp[i]; ++j) { - const bam_pileup1_t *p = plp[i] + j; - if (bam1_qual(p->b)[p->qpos] >= conf->min_baseQ) ++cnt; - } - pthread_mutex_lock(&write_lock); - printf("\t%d\t", cnt); - if (n_plp[i] == 0) { - printf("*\t*"); // FIXME: printf() is very slow... - if (conf->flag & MPLP_PRINT_POS) printf("\t*"); - pthread_mutex_unlock(&write_lock); - } else { - pthread_mutex_unlock(&write_lock); - for (j = 0; j < n_plp[i]; ++j) { - const bam_pileup1_t *p = plp[i] + j; - if (bam1_qual(p->b)[p->qpos] >= conf->min_baseQ) - pileup_seq(plp[i] + j, pos, ref_len, ref); - } - pthread_mutex_lock(&write_lock); - putchar('\t'); - for (j = 0; j < n_plp[i]; ++j) { - const bam_pileup1_t *p = plp[i] + j; - int c = bam1_qual(p->b)[p->qpos]; - if (c >= conf->min_baseQ) { - c = c + 33 < 126? c + 33 : 126; - putchar(c); - } - } - if (conf->flag & MPLP_PRINT_MAPQ) { - putchar('\t'); - for (j = 0; j < n_plp[i]; ++j) { - int c = plp[i][j].b->core.qual + 33; - if (c > 126) c = 126; - putchar(c); - } - } - if (conf->flag & MPLP_PRINT_POS) { - putchar('\t'); - for (j = 0; j < n_plp[i]; ++j) { - if (j > 0) putchar(','); - printf("%d", plp[i][j].qpos + 1); // FIXME: printf() is very slow... - } - } - pthread_mutex_unlock(&write_lock); - } - } - pthread_mutex_lock(&write_lock); - putchar('\n'); - pthread_mutex_unlock(&write_lock); - } -// fprintf (stderr,"[/bam_mplp_auto]\n"); - }//end While - free(n_plp); free(plp); free(buf.s); + /** + * @section DESCRIPTION + * This region contained a locking error; different parts of + * writing to stdout were locked separatly. As consequence, + * parts of the output of different threads could end up + * together. This error is issued at: + * https://github.com/mydatascience/parallel-mpileup/issues/1 + * + * The problem is that the suffix is extended during the for + * loop(s) and should therefore be put in a temp string. + * To avoid the allocation of i strings I make use of the + * kstring_t structure which is able to "grow" by only + * allocating memory if the string size increases. + * + * @date 2014-mar-03 + */ + + stdout_buffer.l = 0; + + ksprintf(&stdout_buffer, "%s\t%d\t%c", h->target_name[tid], pos + 1, (ref && pos < ref_len)? ref[pos] : 'N');// replaces: printf("%s\t%d\t%c", h->target_name[tid], pos + 1, (ref && pos < ref_len)? ref[pos] : 'N'); + + for (i = 0; i < n; ++i) { + int j, cnt; + for (j = cnt = 0; j < n_plp[i]; ++j) { + const bam_pileup1_t *p = plp[i] + j; + if (bam1_qual(p->b)[p->qpos] >= conf->min_baseQ) { + ++cnt; + } + } + + ksprintf(&stdout_buffer, "\t%d\t", cnt); + + if (n_plp[i] == 0) { + ksprintf(&stdout_buffer, "*\t*", cnt);// replaces: printf("*\t*"); // FIXME: printf() is very slow... + + if (conf->flag & MPLP_PRINT_POS) + { + ksprintf(&stdout_buffer, "\t*", cnt);// replaces: printf("\t*"); + } + } else { + for (j = 0; j < n_plp[i]; ++j) { + const bam_pileup1_t *p = plp[i] + j; + if (bam1_qual(p->b)[p->qpos] >= conf->min_baseQ) + { + kpileup_seq(plp[i] + j, pos, ref_len, ref, &stdout_buffer);// replaces: pileup_seq(plp[i] + j, pos, ref_len, ref); + } + } + + + kputc('\t', &stdout_buffer);// replaces: putchar('\t'); + for (j = 0; j < n_plp[i]; ++j) { + const bam_pileup1_t *p = plp[i] + j; + int c = bam1_qual(p->b)[p->qpos]; + if (c >= conf->min_baseQ) { + c = c + 33 < 126? c + 33 : 126; + kputc(c, &stdout_buffer);// replaces: putchar(c); + } + } + if (conf->flag & MPLP_PRINT_MAPQ) { + kputc('\t', &stdout_buffer);// replaces: putchar('\t'); + for (j = 0; j < n_plp[i]; ++j) { + int c = plp[i][j].b->core.qual + 33; + if (c > 126) c = 126; + kputc(c, &stdout_buffer);// replaces: putchar(c); + } + } + if (conf->flag & MPLP_PRINT_POS) { + kputc('\t', &stdout_buffer);// replaces: putchar('\t'); + for (j = 0; j < n_plp[i]; ++j) { + if (j > 0){ + kputc(',', &stdout_buffer);// replaces: putchar(','); + } + ksprintf(&stdout_buffer, "%d", plp[i][j].qpos + 1);//replaces: printf("%d", plp[i][j].qpos + 1); // FIXME: printf() is very slow... + } + } + } + } + + // IF you want to use locks, use only one lock per output-line! Otherwise output of different lines becomes mixed. + pthread_mutex_lock(&write_lock); + puts(stdout_buffer.s); + pthread_mutex_unlock(&write_lock); + + stdout_buffer.l = 0; + } +// fprintf (stderr,"[/bam_mplp_auto]\n"); + }//end While + + /** ------------------------- end fix --------------------------- */ + + free(n_plp); free(plp); free(buf.s); free(stdout_buffer.s); free(bc.PL); free(bcr); free(params); for (i = 0; i < gplp.n; ++i) free(gplp.plp[i]); @@ -569,13 +687,11 @@ static int mpileup(mplp_conf_t *conf, int n, char **fn) kernel_args->rghash = rghash; pthread_create(&threads[i], NULL, mpileup_kern, kernel_args); - } - for (i = 0; i < conf->num_threads; ++i) { pthread_join(threads[i], NULL); - } + } if (conf->flag & MPLP_GLF) { bcf_write_queue_destroy(bp, bh); } @@ -790,8 +906,15 @@ int bam_mpileup(int argc, char *argv[]) if ( read_file_list(file_list,&nfiles,&fn) ) return 1; mpileup(&mplp,nfiles,fn); for (c=0; c Date: Tue, 4 Mar 2014 11:29:26 +0100 Subject: [PATCH 2/2] Update bam_plcmd.c Fixed introduced error --- samtools-0.1.19/bam_plcmd.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/samtools-0.1.19/bam_plcmd.c b/samtools-0.1.19/bam_plcmd.c index 1107285..6b84d6d 100644 --- a/samtools-0.1.19/bam_plcmd.c +++ b/samtools-0.1.19/bam_plcmd.c @@ -910,8 +910,6 @@ int bam_mpileup(int argc, char *argv[]) } else { mpileup(&mplp, argc - optind, argv + optind); - for (c=0; c