Faster annotate.
authorTom Clegg <tom@tomclegg.ca>
Thu, 29 Oct 2020 07:34:07 +0000 (03:34 -0400)
committerTom Clegg <tom@tomclegg.ca>
Thu, 29 Oct 2020 07:34:07 +0000 (03:34 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@tomclegg.ca>

annotate.go

index 234bb67908b1c571e0805d0be8b882d3a426d8d1..de911261cb7dc54bfe1ff1528023b0e065f202d8 100644 (file)
@@ -10,9 +10,11 @@ import (
        "net/http"
        _ "net/http/pprof"
        "os"
+       "runtime"
        "sort"
        "strconv"
        "strings"
+       "sync"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "github.com/arvados/lightning/hgvs"
@@ -62,7 +64,7 @@ func (cmd *annotatecmd) RunCommand(prog string, args []string, stdin io.Reader,
                        Client:      arvados.NewClientFromEnv(),
                        ProjectUUID: *projectUUID,
                        RAM:         120000000000,
-                       VCPUs:       2,
+                       VCPUs:       32,
                        Priority:    *priority,
                }
                err = runner.TranslatePaths(inputFilename)
@@ -158,13 +160,32 @@ func (cmd *annotatecmd) exportTileDiffs(outw io.Writer, librdr io.Reader) error
        }
        sort.Slice(refs, func(i, j int) bool { return refs[i].Name < refs[j].Name })
        log.Infof("len(refs) %d", len(refs))
+
+       outch := make(chan string, 1)
+       var outwg sync.WaitGroup
+       defer outwg.Wait()
+       outwg.Add(1)
+       go func() {
+               defer outwg.Done()
+               for s := range outch {
+                       io.WriteString(outw, s)
+               }
+       }()
+       defer close(outch)
+
+       limiter := make(chan bool, runtime.NumCPU()+1)
+       var diffwg sync.WaitGroup
+       defer diffwg.Wait()
+
        for _, refcs := range refs {
+               refcs := refcs
                var seqnames []string
                for seqname := range refcs.TileSequences {
                        seqnames = append(seqnames, seqname)
                }
                sort.Strings(seqnames)
                for _, seqname := range seqnames {
+                       seqname := seqname
                        var refseq []byte
                        // tilestart[123] is the index into refseq
                        // where the tile for tag 123 was placed.
@@ -201,6 +222,7 @@ func (cmd *annotatecmd) exportTileDiffs(outw io.Writer, librdr io.Reader) error
                                        continue
                                }
                                for variant, tv := range tvs {
+                                       variant, tv := variant, tv
                                        if variant == 0 {
                                                continue
                                        }
@@ -216,25 +238,42 @@ func (cmd *annotatecmd) exportTileDiffs(outw io.Writer, librdr io.Reader) error
                                        } else if refendtagstart, ok := tilestart[endtagid]; !ok {
                                                // Ref ends a chromsome with a (possibly very large) variant of this tile, but genomes with this tile don't.
                                                // Give up. (TODO: something smarter)
-                                               log.Warnf("%x not annotating tilevar %d,%d because end tag %d is not in ref", tv.Blake2b[:13], tag, variant, endtagid)
+                                               log.Debugf("%x not annotating tilevar %d,%d because end tag %d is not in ref", tv.Blake2b[:13], tag, variant, endtagid)
                                                continue
                                        } else {
                                                // Non-terminal tile vs. non-terminal reference.
                                                refpart = refseq[refstart : refendtagstart+taglen]
                                                log.Tracef("\n%x tilevar %d,%d endtag %s endtagid %d refendtagstart %d", tv.Blake2b[:13], tag, variant, endtag, endtagid, refendtagstart)
                                        }
+                                       if len(refpart) > 10000 {
+                                               log.Warnf("%x tilevar %d,%d skipping long diff, ref %s seq %s ref len %d", tv.Blake2b[:13], tag, variant, refcs.Name, seqname, len(refpart))
+                                               continue
+                                       }
+                                       if len(tv.Sequence) > 10000 {
+                                               log.Warnf("%x tilevar %d,%d skipping long diff, ref %s seq %s variant len %d", tv.Blake2b[:13], tag, variant, refcs.Name, seqname, len(tv.Sequence))
+                                               continue
+                                       }
                                        // log.Printf("\n%x @ refstart %d \n< %s\n> %s\n", tv.Blake2b, refstart, refpart, tv.Sequence)
-                                       diffs, _ := hgvs.Diff(strings.ToUpper(string(refpart)), strings.ToUpper(string(tv.Sequence)), 0)
-                                       for _, diff := range diffs {
-                                               diff.Position += refstart
-                                               var varid string
-                                               if cmd.variantHash {
-                                                       varid = fmt.Sprintf("%x", tv.Blake2b)[:13]
-                                               } else {
-                                                       varid = strconv.Itoa(variant)
+
+                                       diffwg.Add(1)
+                                       limiter <- true
+                                       go func() {
+                                               defer func() {
+                                                       <-limiter
+                                                       diffwg.Done()
+                                               }()
+                                               diffs, _ := hgvs.Diff(strings.ToUpper(string(refpart)), strings.ToUpper(string(tv.Sequence)), 0)
+                                               for _, diff := range diffs {
+                                                       diff.Position += refstart
+                                                       var varid string
+                                                       if cmd.variantHash {
+                                                               varid = fmt.Sprintf("%x", tv.Blake2b)[:13]
+                                                       } else {
+                                                               varid = strconv.Itoa(variant)
+                                                       }
+                                                       outch <- fmt.Sprintf("%d\t%s\t%s\t%s:g.%s\n", tag, varid, refcs.Name, seqname, diff.String())
                                                }
-                                               fmt.Fprintf(outw, "%d\t%s\t%s\t%s:g.%s\n", tag, varid, refcs.Name, seqname, diff.String())
-                                       }
+                                       }()
                                }
                        }
                }