Split anno2vcf output by chromosome.
authorTom Clegg <tom@tomclegg.ca>
Mon, 1 Nov 2021 13:31:04 +0000 (09:31 -0400)
committerTom Clegg <tom@tomclegg.ca>
Mon, 1 Nov 2021 14:42:05 +0000 (10:42 -0400)
refs #17763

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

anno2vcf.go

index a1eb0ae2a66b8a4b962c4bbbc47e1a8559e9e7e2..47af7e5caf55ea81d566a6dd7a15e4b979b2899b 100644 (file)
@@ -102,133 +102,142 @@ func (cmd *anno2vcf) RunCommand(prog string, args []string, stdin io.Reader, std
        type call struct {
                tile      int
                variant   int
-               sequence  []byte
                position  int
                deletion  []byte
                insertion []byte
        }
-       var allcalls []*call
+       allcalls := map[string][]*call{}
        var mtx sync.Mutex
-       throttle := throttle{Max: runtime.GOMAXPROCS(0)}
+       thr := throttle{Max: runtime.GOMAXPROCS(0)}
        log.Print("reading input files")
        for _, fi := range fis {
                if !strings.HasSuffix(fi.Name(), "annotations.csv") {
                        continue
                }
                filename := *inputDir + "/" + fi.Name()
-               throttle.Acquire()
-               go func() {
-                       defer throttle.Release()
+               thr.Go(func() error {
                        log.Printf("reading %s", filename)
                        buf, err := ioutil.ReadFile(filename)
                        if err != nil {
-                               throttle.Report(fmt.Errorf("%s: %s", filename, err))
-                               return
+                               return fmt.Errorf("%s: %s", filename, err)
                        }
                        lines := bytes.Split(buf, []byte{'\n'})
-                       calls := make([]*call, 0, len(lines))
+                       calls := map[string][]*call{}
                        for lineIdx, line := range lines {
                                if len(line) == 0 {
                                        continue
                                }
-                               if lineIdx & ^0xfff == 0 && throttle.Err() != nil {
-                                       return
+                               if lineIdx & ^0xfff == 0 && thr.Err() != nil {
+                                       return nil
                                }
                                fields := bytes.Split(line, []byte{','})
                                if len(fields) != 8 {
-                                       throttle.Report(fmt.Errorf("%s line %d: wrong number of fields (%d != %d): %q", fi.Name(), lineIdx+1, len(fields), 8, line))
-                                       return
+                                       return fmt.Errorf("%s line %d: wrong number of fields (%d != %d): %q", fi.Name(), lineIdx+1, len(fields), 8, line)
                                }
                                tile, _ := strconv.ParseInt(string(fields[0]), 10, 64)
                                variant, _ := strconv.ParseInt(string(fields[2]), 10, 64)
                                position, _ := strconv.ParseInt(string(fields[5]), 10, 64)
-                               calls = append(calls, &call{
+                               seq := string(fields[4])
+                               if calls[seq] == nil {
+                                       calls[seq] = make([]*call, 0, len(lines)/50)
+                               }
+                               calls[seq] = append(calls[seq], &call{
                                        tile:      int(tile),
                                        variant:   int(variant),
-                                       sequence:  append([]byte(nil), fields[4]...),
                                        position:  int(position),
                                        deletion:  append([]byte(nil), fields[6]...),
                                        insertion: append([]byte(nil), fields[7]...),
                                })
                        }
                        mtx.Lock()
-                       allcalls = append(allcalls, calls...)
+                       for seq, seqcalls := range calls {
+                               allcalls[seq] = append(allcalls[seq], seqcalls...)
+                       }
                        mtx.Unlock()
-               }()
-       }
-       throttle.Wait()
-       if throttle.Err() != nil {
-               log.Print(throttle.Err())
-               return 1
+                       return nil
+               })
        }
-       log.Print("sorting")
-       sort.Slice(allcalls, func(i, j int) bool {
-               ii, jj := allcalls[i], allcalls[j]
-               if cmp := bytes.Compare(ii.sequence, jj.sequence); cmp != 0 {
-                       return cmp < 0
-               }
-               if cmp := ii.position - jj.position; cmp != 0 {
-                       return cmp < 0
-               }
-               if cmp := len(ii.deletion) - len(jj.deletion); cmp != 0 {
-                       return cmp < 0
-               }
-               if cmp := bytes.Compare(ii.insertion, jj.insertion); cmp != 0 {
-                       return cmp < 0
-               }
-               if cmp := ii.tile - jj.tile; cmp != 0 {
-                       return cmp < 0
-               }
-               return ii.variant < jj.variant
-       })
-
-       vcfFilename := *outputDir + "/annotations.vcf"
-       log.Printf("writing %s", vcfFilename)
-       f, err := os.Create(vcfFilename)
+       err = thr.Wait()
        if err != nil {
                return 1
        }
-       defer f.Close()
-       bufw := bufio.NewWriterSize(f, 1<<20)
-       _, err = fmt.Fprintf(bufw, `##fileformat=VCFv4.0
+       thr = throttle{Max: len(allcalls)}
+       for seq, seqcalls := range allcalls {
+               seq, seqcalls := seq, seqcalls
+               thr.Go(func() error {
+                       log.Printf("%s: sorting", seq)
+                       sort.Slice(seqcalls, func(i, j int) bool {
+                               ii, jj := seqcalls[i], seqcalls[j]
+                               if cmp := ii.position - jj.position; cmp != 0 {
+                                       return cmp < 0
+                               }
+                               if cmp := len(ii.deletion) - len(jj.deletion); cmp != 0 {
+                                       return cmp < 0
+                               }
+                               if cmp := bytes.Compare(ii.insertion, jj.insertion); cmp != 0 {
+                                       return cmp < 0
+                               }
+                               if cmp := ii.tile - jj.tile; cmp != 0 {
+                                       return cmp < 0
+                               }
+                               return ii.variant < jj.variant
+                       })
+
+                       vcfFilename := fmt.Sprintf("%s/annotations.%s.vcf", *outputDir, seq)
+                       log.Printf("%s: writing %s", seq, vcfFilename)
+
+                       f, err := os.Create(vcfFilename)
+                       if err != nil {
+                               return err
+                       }
+                       defer f.Close()
+                       bufw := bufio.NewWriterSize(f, 1<<20)
+                       _, err = fmt.Fprintf(bufw, `##fileformat=VCFv4.0
 ##INFO=<ID=TV,Number=.,Type=String,Description="tile-variant">
-#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO
+#CHROM POS     ID      REF     ALT     QUAL    FILTER  INFO
 `)
-       if err != nil {
-               return 1
-       }
-       placeholder := []byte{'.'}
-       for i := 0; i < len(allcalls); {
-               call := allcalls[i]
-               i++
-               info := fmt.Sprintf("TV=,%d-%d,", call.tile, call.variant)
-               for i < len(allcalls) &&
-                       bytes.Equal(call.sequence, allcalls[i].sequence) &&
-                       call.position == allcalls[i].position &&
-                       len(call.deletion) == len(allcalls[i].deletion) &&
-                       bytes.Equal(call.insertion, allcalls[i].insertion) {
-                       call = allcalls[i]
-                       i++
-                       info += fmt.Sprintf("%d-%d,", call.tile, call.variant)
-               }
-               deletion := call.deletion
-               if len(deletion) == 0 {
-                       deletion = placeholder
-               }
-               insertion := call.insertion
-               if len(insertion) == 0 {
-                       insertion = placeholder
-               }
-               _, err = fmt.Fprintf(bufw, "%s\t%d\t.\t%s\t%s\t.\t.\t%s\n", call.sequence, call.position, deletion, insertion, info)
-               if err != nil {
-                       return 1
-               }
-       }
-       err = bufw.Flush()
-       if err != nil {
-               return 1
+                       if err != nil {
+                               return err
+                       }
+                       placeholder := []byte{'.'}
+                       for i := 0; i < len(seqcalls); {
+                               call := seqcalls[i]
+                               i++
+                               info := fmt.Sprintf("TV=,%d-%d,", call.tile, call.variant)
+                               for i < len(seqcalls) &&
+                                       call.position == seqcalls[i].position &&
+                                       len(call.deletion) == len(seqcalls[i].deletion) &&
+                                       bytes.Equal(call.insertion, seqcalls[i].insertion) {
+                                       call = seqcalls[i]
+                                       i++
+                                       info += fmt.Sprintf("%d-%d,", call.tile, call.variant)
+                               }
+                               deletion := call.deletion
+                               if len(deletion) == 0 {
+                                       deletion = placeholder
+                               }
+                               insertion := call.insertion
+                               if len(insertion) == 0 {
+                                       insertion = placeholder
+                               }
+                               _, err = fmt.Fprintf(bufw, "%s\t%d\t.\t%s\t%s\t.\t.\t%s\n", seq, call.position, deletion, insertion, info)
+                               if err != nil {
+                                       return err
+                               }
+                       }
+                       err = bufw.Flush()
+                       if err != nil {
+                               return err
+                       }
+                       err = f.Close()
+                       if err != nil {
+                               return err
+                       }
+                       log.Printf("%s: done", seq)
+                       return nil
+               })
        }
-       err = f.Close()
+       err = thr.Wait()
        if err != nil {
                return 1
        }