From: Tom Clegg Date: Mon, 1 Nov 2021 13:31:04 +0000 (-0400) Subject: Split anno2vcf output by chromosome. X-Git-Url: https://git.arvados.org/lightning.git/commitdiff_plain/0bf5a0d12da4b3675a0c9bd735a482c9cd9812da Split anno2vcf output by chromosome. refs #17763 Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- diff --git a/anno2vcf.go b/anno2vcf.go index a1eb0ae2a6..47af7e5caf 100644 --- a/anno2vcf.go +++ b/anno2vcf.go @@ -102,133 +102,142 @@ func (cmd *anno2vcf) RunCommand(prog string, args []string, stdin io.Reader, std type call struct { tile int variant int - sequence []byte position int deletion []byte insertion []byte } - var allcalls []*call + allcalls := map[string][]*call{} var mtx sync.Mutex - throttle := throttle{Max: runtime.GOMAXPROCS(0)} + thr := throttle{Max: runtime.GOMAXPROCS(0)} log.Print("reading input files") for _, fi := range fis { if !strings.HasSuffix(fi.Name(), "annotations.csv") { continue } filename := *inputDir + "/" + fi.Name() - throttle.Acquire() - go func() { - defer throttle.Release() + thr.Go(func() error { log.Printf("reading %s", filename) buf, err := ioutil.ReadFile(filename) if err != nil { - throttle.Report(fmt.Errorf("%s: %s", filename, err)) - return + return fmt.Errorf("%s: %s", filename, err) } lines := bytes.Split(buf, []byte{'\n'}) - calls := make([]*call, 0, len(lines)) + calls := map[string][]*call{} for lineIdx, line := range lines { if len(line) == 0 { continue } - if lineIdx & ^0xfff == 0 && throttle.Err() != nil { - return + if lineIdx & ^0xfff == 0 && thr.Err() != nil { + return nil } fields := bytes.Split(line, []byte{','}) if len(fields) != 8 { - throttle.Report(fmt.Errorf("%s line %d: wrong number of fields (%d != %d): %q", fi.Name(), lineIdx+1, len(fields), 8, line)) - return + return fmt.Errorf("%s line %d: wrong number of fields (%d != %d): %q", fi.Name(), lineIdx+1, len(fields), 8, line) } tile, _ := strconv.ParseInt(string(fields[0]), 10, 64) variant, _ := strconv.ParseInt(string(fields[2]), 10, 64) position, _ := strconv.ParseInt(string(fields[5]), 10, 64) - calls = append(calls, &call{ + seq := string(fields[4]) + if calls[seq] == nil { + calls[seq] = make([]*call, 0, len(lines)/50) + } + calls[seq] = append(calls[seq], &call{ tile: int(tile), variant: int(variant), - sequence: append([]byte(nil), fields[4]...), position: int(position), deletion: append([]byte(nil), fields[6]...), insertion: append([]byte(nil), fields[7]...), }) } mtx.Lock() - allcalls = append(allcalls, calls...) + for seq, seqcalls := range calls { + allcalls[seq] = append(allcalls[seq], seqcalls...) + } mtx.Unlock() - }() - } - throttle.Wait() - if throttle.Err() != nil { - log.Print(throttle.Err()) - return 1 + return nil + }) } - log.Print("sorting") - sort.Slice(allcalls, func(i, j int) bool { - ii, jj := allcalls[i], allcalls[j] - if cmp := bytes.Compare(ii.sequence, jj.sequence); cmp != 0 { - return cmp < 0 - } - if cmp := ii.position - jj.position; cmp != 0 { - return cmp < 0 - } - if cmp := len(ii.deletion) - len(jj.deletion); cmp != 0 { - return cmp < 0 - } - if cmp := bytes.Compare(ii.insertion, jj.insertion); cmp != 0 { - return cmp < 0 - } - if cmp := ii.tile - jj.tile; cmp != 0 { - return cmp < 0 - } - return ii.variant < jj.variant - }) - - vcfFilename := *outputDir + "/annotations.vcf" - log.Printf("writing %s", vcfFilename) - f, err := os.Create(vcfFilename) + err = thr.Wait() if err != nil { return 1 } - defer f.Close() - bufw := bufio.NewWriterSize(f, 1<<20) - _, err = fmt.Fprintf(bufw, `##fileformat=VCFv4.0 + thr = throttle{Max: len(allcalls)} + for seq, seqcalls := range allcalls { + seq, seqcalls := seq, seqcalls + thr.Go(func() error { + log.Printf("%s: sorting", seq) + sort.Slice(seqcalls, func(i, j int) bool { + ii, jj := seqcalls[i], seqcalls[j] + if cmp := ii.position - jj.position; cmp != 0 { + return cmp < 0 + } + if cmp := len(ii.deletion) - len(jj.deletion); cmp != 0 { + return cmp < 0 + } + if cmp := bytes.Compare(ii.insertion, jj.insertion); cmp != 0 { + return cmp < 0 + } + if cmp := ii.tile - jj.tile; cmp != 0 { + return cmp < 0 + } + return ii.variant < jj.variant + }) + + vcfFilename := fmt.Sprintf("%s/annotations.%s.vcf", *outputDir, seq) + log.Printf("%s: writing %s", seq, vcfFilename) + + f, err := os.Create(vcfFilename) + if err != nil { + return err + } + defer f.Close() + bufw := bufio.NewWriterSize(f, 1<<20) + _, err = fmt.Fprintf(bufw, `##fileformat=VCFv4.0 ##INFO= -#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO +#CHROM POS ID REF ALT QUAL FILTER INFO `) - if err != nil { - return 1 - } - placeholder := []byte{'.'} - for i := 0; i < len(allcalls); { - call := allcalls[i] - i++ - info := fmt.Sprintf("TV=,%d-%d,", call.tile, call.variant) - for i < len(allcalls) && - bytes.Equal(call.sequence, allcalls[i].sequence) && - call.position == allcalls[i].position && - len(call.deletion) == len(allcalls[i].deletion) && - bytes.Equal(call.insertion, allcalls[i].insertion) { - call = allcalls[i] - i++ - info += fmt.Sprintf("%d-%d,", call.tile, call.variant) - } - deletion := call.deletion - if len(deletion) == 0 { - deletion = placeholder - } - insertion := call.insertion - if len(insertion) == 0 { - insertion = placeholder - } - _, err = fmt.Fprintf(bufw, "%s\t%d\t.\t%s\t%s\t.\t.\t%s\n", call.sequence, call.position, deletion, insertion, info) - if err != nil { - return 1 - } - } - err = bufw.Flush() - if err != nil { - return 1 + if err != nil { + return err + } + placeholder := []byte{'.'} + for i := 0; i < len(seqcalls); { + call := seqcalls[i] + i++ + info := fmt.Sprintf("TV=,%d-%d,", call.tile, call.variant) + for i < len(seqcalls) && + call.position == seqcalls[i].position && + len(call.deletion) == len(seqcalls[i].deletion) && + bytes.Equal(call.insertion, seqcalls[i].insertion) { + call = seqcalls[i] + i++ + info += fmt.Sprintf("%d-%d,", call.tile, call.variant) + } + deletion := call.deletion + if len(deletion) == 0 { + deletion = placeholder + } + insertion := call.insertion + if len(insertion) == 0 { + insertion = placeholder + } + _, err = fmt.Fprintf(bufw, "%s\t%d\t.\t%s\t%s\t.\t.\t%s\n", seq, call.position, deletion, insertion, info) + if err != nil { + return err + } + } + err = bufw.Flush() + if err != nil { + return err + } + err = f.Close() + if err != nil { + return err + } + log.Printf("%s: done", seq) + return nil + }) } - err = f.Close() + err = thr.Wait() if err != nil { return 1 }