Renumber variants by allele count.
[lightning.git] / slicenumpy.go
index 55b3c0a140861f72a436f6739f42344f0ed9680a..88fd817537a3020552e04032a345cef2d7ba0bc8 100644 (file)
@@ -16,6 +16,8 @@ import (
        "runtime"
        "sort"
        "strings"
+       "sync"
+       "sync/atomic"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "github.com/arvados/lightning/hgvs"
@@ -69,7 +71,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        RAM:         250000000000,
                        VCPUs:       32,
                        Priority:    *priority,
-                       KeepCache:   1,
+                       KeepCache:   2,
                        APIAccess:   true,
                }
                err = runner.TranslatePaths(inputDir, regionsFilename)
@@ -163,6 +165,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
 
        log.Info("building list of reference tiles to load") // TODO: more efficient if we had saved all ref tiles in slice0
        type reftileinfo struct {
+               variant  tileVariantID
                seqname  string // chr1
                pos      int    // distance from start of chr1 to start of tile
                tiledata []byte // acgtggcaa...
@@ -170,11 +173,11 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
        reftile := map[tagID]*reftileinfo{}
        for seqname, cseq := range refseq {
                for _, libref := range cseq {
-                       reftile[libref.Tag] = &reftileinfo{seqname: seqname}
+                       reftile[libref.Tag] = &reftileinfo{seqname: seqname, variant: libref.Variant}
                }
        }
        log.Info("loading reference tiles from all slices")
-       throttle := throttle{Max: runtime.NumCPU()}
+       throttle := throttle{Max: runtime.GOMAXPROCS(0)}
        for _, infile := range infiles {
                infile := infile
                throttle.Go(func() error {
@@ -186,7 +189,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        defer f.Close()
                        return DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error {
                                for _, tv := range ent.TileVariants {
-                                       if dst, ok := reftile[tv.Tag]; ok {
+                                       if dst, ok := reftile[tv.Tag]; ok && dst.variant == tv.Variant {
                                                dst.tiledata = tv.Sequence
                                        }
                                }
@@ -215,24 +218,25 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
        log.Info("TODO: determining which tiles intersect given regions")
 
        log.Info("generating annotations and numpy matrix for each slice")
+       var done int64
        for infileIdx, infile := range infiles {
                infileIdx, infile := infileIdx, infile
                throttle.Go(func() error {
-                       defer log.Infof("%s: done", infile)
-                       seq := map[tagID][][]byte{}
+                       seq := make(map[tagID][]TileVariant, 50000)
                        cgs := make(map[string]CompactGenome, len(cgnames))
                        f, err := open(infile)
                        if err != nil {
                                return err
                        }
                        defer f.Close()
+                       log.Infof("reading %s", infile)
                        err = DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error {
                                for _, tv := range ent.TileVariants {
                                        variants := seq[tv.Tag]
                                        for len(variants) <= int(tv.Variant) {
-                                               variants = append(variants, nil)
+                                               variants = append(variants, TileVariant{})
                                        }
-                                       variants[int(tv.Variant)] = tv.Sequence
+                                       variants[int(tv.Variant)] = tv
                                        seq[tv.Tag] = variants
                                }
                                for _, cg := range ent.CompactGenomes {
@@ -246,12 +250,49 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        tagstart := cgs[cgnames[0]].StartTag
                        tagend := cgs[cgnames[0]].EndTag
 
-                       log.Infof("TODO: %s: filtering", infile)
-                       log.Infof("TODO: %s: tidying", infile)
-                       log.Infof("TODO: %s: lowqual to -1", infile)
+                       // TODO: filters
+
+                       log.Infof("renumbering variants for tags %d-%d", tagstart, tagend)
+                       variantRemap := make([][]tileVariantID, tagend-tagstart)
+                       var wg sync.WaitGroup
+                       for tag, variants := range seq {
+                               tag, variants := tag, variants
+                               wg.Add(1)
+                               go func() {
+                                       defer wg.Done()
+                                       count := make([]tileVariantID, len(variants))
+                                       for _, cg := range cgs {
+                                               idx := (tag - tagstart) * 2
+                                               count[cg.Variants[idx]]++
+                                               count[cg.Variants[idx+1]]++
+                                       }
+                                       ranked := make([]tileVariantID, len(variants))
+                                       for i := range ranked {
+                                               ranked[i] = tileVariantID(i)
+                                       }
+                                       sort.Slice(ranked, func(i, j int) bool {
+                                               if i == 0 {
+                                                       return true // leave ranked[0] at position 0, unused
+                                               } else if j == 0 {
+                                                       return false // ditto
+                                               }
+                                               if cri, crj := count[ranked[i]], count[ranked[j]]; cri != crj {
+                                                       return cri > crj
+                                               } else {
+                                                       return bytes.Compare(variants[ranked[i]].Blake2b[:], variants[ranked[j]].Blake2b[:]) < 0
+                                               }
+                                       })
+                                       remap := make([]tileVariantID, len(ranked))
+                                       for i, r := range ranked {
+                                               remap[r] = tileVariantID(i)
+                                       }
+                                       variantRemap[tag-tagstart] = remap
+                               }()
+                       }
+                       wg.Wait()
 
                        annotationsFilename := fmt.Sprintf("%s/matrix.%04d.annotations.csv", *outputDir, infileIdx)
-                       log.Infof("%s: writing annotations to %s", infile, annotationsFilename)
+                       log.Infof("writing %s", annotationsFilename)
                        annof, err := os.Create(annotationsFilename)
                        if err != nil {
                                return err
@@ -268,17 +309,18 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                                }
                                outcol := tag - tagID(tagstart)
                                reftilestr := strings.ToUpper(string(rt.tiledata))
-                               for v, seq := range variants {
-                                       if len(seq) == 0 || !bytes.HasSuffix(rt.tiledata, seq[len(seq)-taglen:]) {
+                               remap := variantRemap[tag-tagstart]
+                               for v, tv := range variants {
+                                       if len(tv.Sequence) < taglen || !bytes.HasSuffix(rt.tiledata, tv.Sequence[len(tv.Sequence)-taglen:]) {
                                                continue
                                        }
-                                       if lendiff := len(rt.tiledata) - len(seq); lendiff < -1000 || lendiff > 1000 {
+                                       if lendiff := len(rt.tiledata) - len(tv.Sequence); lendiff < -1000 || lendiff > 1000 {
                                                continue
                                        }
-                                       diffs, _ := hgvs.Diff(reftilestr, strings.ToUpper(string(seq)), 0)
+                                       diffs, _ := hgvs.Diff(reftilestr, strings.ToUpper(string(tv.Sequence)), 0)
                                        for _, diff := range diffs {
                                                diff.Position += rt.pos
-                                               fmt.Fprintf(annow, "%d,%d,%d,%s:g.%s\n", tag, outcol, v, rt.seqname, diff.String())
+                                               fmt.Fprintf(annow, "%d,%d,%d,%s:g.%s\n", tag, outcol, remap[v], rt.seqname, diff.String())
                                        }
                                }
                        }
@@ -298,8 +340,8 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        for row, name := range cgnames {
                                out := out[row*cols:]
                                for col, v := range cgs[name].Variants {
-                                       if variants, ok := seq[tagstart+tagID(col/2)]; ok && len(variants) > int(v) && len(variants[v]) > 0 {
-                                               out[col] = int16(v)
+                                       if variants, ok := seq[tagstart+tagID(col/2)]; ok && len(variants) > int(v) && len(variants[v].Sequence) > 0 {
+                                               out[col] = int16(variantRemap[col/2][v])
                                        } else {
                                                out[col] = -1
                                        }
@@ -312,7 +354,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                                return err
                        }
                        defer output.Close()
-                       bufw := bufio.NewWriter(output)
+                       bufw := bufio.NewWriterSize(output, 1<<26)
                        npw, err := gonpy.NewWriter(nopCloser{bufw})
                        if err != nil {
                                return err
@@ -332,6 +374,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        if err != nil {
                                return err
                        }
+                       log.Infof("%s: done (%d/%d)", infile, int(atomic.AddInt64(&done, 1)), len(infiles))
                        return nil
                })
        }
@@ -340,18 +383,3 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
        }
        return 0
 }
-
-func (*sliceNumpy) writeLibRefs(fnm string, tilelib *tileLibrary, librefs []tileLibRef) error {
-       f, err := os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0666)
-       if err != nil {
-               return err
-       }
-       defer f.Close()
-       for i, libref := range librefs {
-               _, err = fmt.Fprintf(f, "%d,%d,%d\n", i, libref.Tag, libref.Variant)
-               if err != nil {
-                       return err
-               }
-       }
-       return f.Close()
-}