Generate annotations for slices.
authorTom Clegg <tom@tomclegg.ca>
Mon, 13 Sep 2021 15:23:23 +0000 (11:23 -0400)
committerTom Clegg <tom@tomclegg.ca>
Mon, 13 Sep 2021 15:23:23 +0000 (11:23 -0400)
refs #17996

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

slicenumpy.go

index 601b35a9602d1e3800678f915890e138a1b761a8..55b3c0a140861f72a436f6739f42344f0ed9680a 100644 (file)
@@ -6,6 +6,7 @@ package lightning
 
 import (
        "bufio"
+       "bytes"
        "flag"
        "fmt"
        "io"
@@ -17,6 +18,7 @@ import (
        "strings"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
+       "github.com/arvados/lightning/hgvs"
        "github.com/kshedden/gonpy"
        "github.com/sirupsen/logrus"
        log "github.com/sirupsen/logrus"
@@ -41,6 +43,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
        priority := flags.Int("priority", 500, "container request priority")
        inputDir := flags.String("input-dir", "./in", "input `directory`")
        outputDir := flags.String("output-dir", "./out", "output `directory`")
+       ref := flags.String("ref", "", "reference name (if blank, choose last one that appears in input)")
        regionsFilename := flags.String("regions", "", "only output columns/annotations that intersect regions in specified bed `file`")
        expandRegions := flags.Int("expand-regions", 0, "expand specified regions by `N` base pairs on each side`")
        cmd.filter.Flags(flags)
@@ -63,7 +66,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        Name:        "lightning slice-numpy",
                        Client:      arvados.NewClientFromEnv(),
                        ProjectUUID: *projectUUID,
-                       RAM:         150000000000,
+                       RAM:         250000000000,
                        VCPUs:       32,
                        Priority:    *priority,
                        KeepCache:   1,
@@ -101,14 +104,20 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
        sort.Strings(infiles)
 
        var cgnames []string
-       refseqs := map[string]map[string][]tileLibRef{}
+       var refseq map[string][]tileLibRef
        in0, err := open(infiles[0])
        if err != nil {
                return 1
        }
+       taglen := -1
        DecodeLibrary(in0, strings.HasSuffix(infiles[0], ".gz"), func(ent *LibraryEntry) error {
+               if len(ent.TagSet) > 0 {
+                       taglen = len(ent.TagSet[0])
+               }
                for _, cseq := range ent.CompactSequences {
-                       refseqs[cseq.Name] = cseq.TileSequences
+                       if cseq.Name == *ref || *ref == "" {
+                               refseq = cseq.TileSequences
+                       }
                }
                for _, cg := range ent.CompactGenomes {
                        cgnames = append(cgnames, cg.Name)
@@ -119,6 +128,14 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                return 1
        }
        in0.Close()
+       if refseq == nil {
+               err = fmt.Errorf("%s: reference sequence not found", infiles[0])
+               return 1
+       }
+       if taglen < 0 {
+               err = fmt.Errorf("tagset not found")
+               return 1
+       }
        sort.Strings(cgnames)
 
        {
@@ -145,12 +162,15 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
        }
 
        log.Info("building list of reference tiles to load") // TODO: more efficient if we had saved all ref tiles in slice0
-       reftiledata := map[tileLibRef]*[]byte{}
-       for _, ref := range refseqs {
-               for _, cseq := range ref {
-                       for _, libref := range cseq {
-                               reftiledata[libref] = new([]byte)
-                       }
+       type reftileinfo struct {
+               seqname  string // chr1
+               pos      int    // distance from start of chr1 to start of tile
+               tiledata []byte // acgtggcaa...
+       }
+       reftile := map[tagID]*reftileinfo{}
+       for seqname, cseq := range refseq {
+               for _, libref := range cseq {
+                       reftile[libref.Tag] = &reftileinfo{seqname: seqname}
                }
        }
        log.Info("loading reference tiles from all slices")
@@ -166,9 +186,8 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        defer f.Close()
                        return DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error {
                                for _, tv := range ent.TileVariants {
-                                       libref := tileLibRef{tv.Tag, tv.Variant}
-                                       if dst, ok := reftiledata[libref]; ok {
-                                               *dst = tv.Sequence
+                                       if dst, ok := reftile[tv.Tag]; ok {
+                                               dst.tiledata = tv.Sequence
                                        }
                                }
                                return nil
@@ -177,6 +196,22 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
        }
        throttle.Wait()
 
+       log.Info("reconstructing reference sequences")
+       for seqname, cseq := range refseq {
+               seqname, cseq := seqname, cseq
+               throttle.Go(func() error {
+                       defer log.Printf("... %s done", seqname)
+                       pos := 0
+                       for _, libref := range cseq {
+                               rt := reftile[libref.Tag]
+                               rt.pos = pos
+                               pos += len(rt.tiledata) - taglen
+                       }
+                       return nil
+               })
+       }
+       throttle.Wait()
+
        log.Info("TODO: determining which tiles intersect given regions")
 
        log.Info("generating annotations and numpy matrix for each slice")
@@ -184,7 +219,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                infileIdx, infile := infileIdx, infile
                throttle.Go(func() error {
                        defer log.Infof("%s: done", infile)
-                       seq := map[tileLibRef][]byte{}
+                       seq := map[tagID][][]byte{}
                        cgs := make(map[string]CompactGenome, len(cgnames))
                        f, err := open(infile)
                        if err != nil {
@@ -193,7 +228,12 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        defer f.Close()
                        err = DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error {
                                for _, tv := range ent.TileVariants {
-                                       seq[tileLibRef{tv.Tag, tv.Variant}] = tv.Sequence
+                                       variants := seq[tv.Tag]
+                                       for len(variants) <= int(tv.Variant) {
+                                               variants = append(variants, nil)
+                                       }
+                                       variants[int(tv.Variant)] = tv.Sequence
+                                       seq[tv.Tag] = variants
                                }
                                for _, cg := range ent.CompactGenomes {
                                        cgs[cg.Name] = cg
@@ -203,6 +243,8 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        if err != nil {
                                return err
                        }
+                       tagstart := cgs[cgnames[0]].StartTag
+                       tagend := cgs[cgnames[0]].EndTag
 
                        log.Infof("TODO: %s: filtering", infile)
                        log.Infof("TODO: %s: tidying", infile)
@@ -210,30 +252,53 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
 
                        annotationsFilename := fmt.Sprintf("%s/matrix.%04d.annotations.csv", *outputDir, infileIdx)
                        log.Infof("%s: writing annotations to %s", infile, annotationsFilename)
-                       annow, err := os.Create(annotationsFilename)
+                       annof, err := os.Create(annotationsFilename)
                        if err != nil {
                                return err
                        }
-                       // for libref, seq := range seq {
-                       //      // TODO: diff from ref.
-                       // }
-                       err = annow.Close()
+                       annow := bufio.NewWriterSize(annof, 1<<20)
+                       for tag, variants := range seq {
+                               rt, ok := reftile[tag]
+                               if !ok {
+                                       // Reference does not use any
+                                       // variant of this tile.
+                                       // TODO: log this? mention it
+                                       // in annotations?
+                                       continue
+                               }
+                               outcol := tag - tagID(tagstart)
+                               reftilestr := strings.ToUpper(string(rt.tiledata))
+                               for v, seq := range variants {
+                                       if len(seq) == 0 || !bytes.HasSuffix(rt.tiledata, seq[len(seq)-taglen:]) {
+                                               continue
+                                       }
+                                       if lendiff := len(rt.tiledata) - len(seq); lendiff < -1000 || lendiff > 1000 {
+                                               continue
+                                       }
+                                       diffs, _ := hgvs.Diff(reftilestr, strings.ToUpper(string(seq)), 0)
+                                       for _, diff := range diffs {
+                                               diff.Position += rt.pos
+                                               fmt.Fprintf(annow, "%d,%d,%d,%s:g.%s\n", tag, outcol, v, rt.seqname, diff.String())
+                                       }
+                               }
+                       }
+                       err = annow.Flush()
+                       if err != nil {
+                               return err
+                       }
+                       err = annof.Close()
                        if err != nil {
                                return err
                        }
 
                        log.Infof("%s: preparing numpy", infile)
-                       tagstart := cgs[cgnames[0]].StartTag
-                       tagend := cgs[cgnames[0]].EndTag
                        rows := len(cgnames)
                        cols := 2 * int(tagend-tagstart)
                        out := make([]int16, rows*cols)
                        for row, name := range cgnames {
                                out := out[row*cols:]
                                for col, v := range cgs[name].Variants {
-                                       if v == 0 {
-                                               // out[col] = 0
-                                       } else if _, ok := seq[tileLibRef{tagstart + tagID(col/2), v}]; ok {
+                                       if variants, ok := seq[tagstart+tagID(col/2)]; ok && len(variants) > int(v) && len(variants[v]) > 0 {
                                                out[col] = int16(v)
                                        } else {
                                                out[col] = -1