Add test for variant at right end of spanning tile.
[lightning.git] / slicenumpy.go
index 0cc9896b59a35dd6645c48617413a9d59a882e26..1322b91544140df640bc1c53dcad133155df358f 100644 (file)
@@ -8,6 +8,7 @@ import (
        "bufio"
        "bytes"
        "encoding/gob"
+       "errors"
        "flag"
        "fmt"
        "io"
@@ -33,6 +34,8 @@ import (
        "golang.org/x/crypto/blake2b"
 )
 
+const annotationMaxTileSpan = 100
+
 type sliceNumpy struct {
        filter                filter
        threads               int
@@ -165,7 +168,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
 
        cmd.cgnames = nil
        var tagset [][]byte
-       DecodeLibrary(in0, strings.HasSuffix(infiles[0], ".gz"), func(ent *LibraryEntry) error {
+       err = DecodeLibrary(in0, strings.HasSuffix(infiles[0], ".gz"), func(ent *LibraryEntry) error {
                if len(ent.TagSet) > 0 {
                        tagset = ent.TagSet
                }
@@ -215,6 +218,10 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
        if err != nil {
                return 1
        }
+       if len(cmd.cgnames) == 0 {
+               err = fmt.Errorf("fatal: 0 cases, 0 controls, nothing to do")
+               return 1
+       }
        cmd.minCoverage = int(math.Ceil(cmd.filter.MinCoverage * float64(len(cmd.cgnames))))
 
        {
@@ -250,11 +257,14 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                seqname  string // chr1
                pos      int    // distance from start of chromosome to starttag
                tiledata []byte // acgtggcaa...
+               excluded bool   // true if excluded by regions file
+               nexttag  tagID  // tagID of following tile (-1 for last tag of chromosome)
        }
        isdup := map[tagID]bool{}
        reftile := map[tagID]*reftileinfo{}
        for seqname, cseq := range refseq {
                pos := 0
+               lastreftag := tagID(-1)
                for _, libref := range cseq {
                        if cmd.filter.MaxTag >= 0 && libref.Tag > tagID(cmd.filter.MaxTag) {
                                continue
@@ -291,7 +301,12 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                                        variant:  libref.Variant,
                                        tiledata: tiledata,
                                        pos:      pos,
+                                       nexttag:  -1,
+                               }
+                               if lastreftag >= 0 {
+                                       reftile[lastreftag].nexttag = libref.Tag
                                }
+                               lastreftag = libref.Tag
                        }
                        pos += len(tiledata) - taglen
                }
@@ -307,9 +322,9 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                }
                log.Printf("before applying mask, len(reftile) == %d", len(reftile))
                log.Printf("deleting reftile entries for regions outside %d intervals", mask.Len())
-               for tag, rt := range reftile {
+               for _, rt := range reftile {
                        if !mask.Check(strings.TrimPrefix(rt.seqname, "chr"), rt.pos, rt.pos+len(rt.tiledata)) {
-                               delete(reftile, tag)
+                               rt.excluded = true
                        }
                }
                log.Printf("after applying mask, len(reftile) == %d", len(reftile))
@@ -359,10 +374,12 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                onehotChunkSize = make([]uint32, len(infiles))
                onehotXrefs = make([][]onehotXref, len(infiles))
        }
+       chunkStartTag := make([]tagID, len(infiles))
 
        throttleMem := throttle{Max: cmd.threads} // TODO: estimate using mem and data size
        throttleNumpyMem := throttle{Max: cmd.threads/2 + 1}
        log.Info("generating annotations and numpy matrix for each slice")
+       var errSkip = errors.New("skip infile")
        var done int64
        for infileIdx, infile := range infiles {
                infileIdx, infile := infileIdx, infile
@@ -380,10 +397,22 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                                        if tv.Ref {
                                                continue
                                        }
+                                       // Skip tile with no
+                                       // corresponding ref tile, if
+                                       // mask is in play (we can't
+                                       // determine coordinates for
+                                       // these)
                                        if mask != nil && reftile[tv.Tag] == nil {
-                                               // Don't waste
-                                               // time/memory on
-                                               // masked-out tiles.
+                                               continue
+                                       }
+                                       // Skip tile whose
+                                       // corresponding ref tile is
+                                       // outside target regions --
+                                       // unless it's a potential
+                                       // spanning tile.
+                                       if mask != nil && reftile[tv.Tag].excluded &&
+                                               (int(tv.Tag+1) >= len(tagset) ||
+                                                       (bytes.HasSuffix(tv.Sequence, tagset[tv.Tag+1]) && reftile[tv.Tag+1] != nil && !reftile[tv.Tag+1].excluded)) {
                                                continue
                                        }
                                        if tv.Tag == cmd.debugTag {
@@ -400,6 +429,9 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                                        seq[tv.Tag] = variants
                                }
                                for _, cg := range ent.CompactGenomes {
+                                       if cmd.filter.MaxTag >= 0 && cg.StartTag > tagID(cmd.filter.MaxTag) {
+                                               return errSkip
+                                       }
                                        if !matchGenome.MatchString(cg.Name) {
                                                continue
                                        }
@@ -413,11 +445,14 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                                }
                                return nil
                        })
-                       if err != nil {
+                       if err == errSkip {
+                               return nil
+                       } else if err != nil {
                                return err
                        }
                        tagstart := cgs[cmd.cgnames[0]].StartTag
                        tagend := cgs[cmd.cgnames[0]].EndTag
+                       chunkStartTag[infileIdx] = tagstart
 
                        // TODO: filters
 
@@ -426,9 +461,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        throttleCPU := throttle{Max: runtime.GOMAXPROCS(0)}
                        for tag, variants := range seq {
                                tag, variants := tag, variants
-                               throttleCPU.Acquire()
-                               go func() {
-                                       defer throttleCPU.Release()
+                               throttleCPU.Go(func() error {
                                        count := make(map[[blake2b.Size256]byte]int, len(variants))
 
                                        rt := reftile[tag]
@@ -442,9 +475,9 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                                                        v := cg.Variants[idx+allele]
                                                        if v > 0 && len(variants[v].Sequence) > 0 {
                                                                count[variants[v].Blake2b]++
-                                                               if tag == cmd.debugTag {
-                                                                       log.Printf("tag %d cg %s allele %d tv %d hash %x count is now %d", tag, cgname, allele, v, variants[v].Blake2b[:3], count[variants[v].Blake2b])
-                                                               }
+                                                       }
+                                                       if v > 0 && tag == cmd.debugTag {
+                                                               log.Printf("tag %d cg %s allele %d tv %d hash %x count is now %d", tag, cgname, allele, v, variants[v].Blake2b[:3], count[variants[v].Blake2b])
                                                        }
                                                }
                                        }
@@ -483,7 +516,11 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                                                remap[i] = rank[tv.Blake2b]
                                        }
                                        if tag == cmd.debugTag {
-                                               log.Printf("tag %d remap %+v", tag, remap)
+                                               for in, out := range remap {
+                                                       if out > 0 {
+                                                               log.Printf("tag %d remap %d => %d", tag, in, out)
+                                                       }
+                                               }
                                        }
                                        variantRemap[tag-tagstart] = remap
                                        if rt != nil {
@@ -493,7 +530,8 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                                                }
                                                rt.variant = refrank
                                        }
-                               }()
+                                       return nil
+                               })
                        }
                        throttleCPU.Wait()
 
@@ -511,12 +549,25 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        for tag := tagstart; tag < tagend; tag++ {
                                rt := reftile[tag]
                                if rt == nil && mask != nil {
-                                       // Excluded by specified regions
+                                       // With no ref tile, we don't
+                                       // have coordinates to say
+                                       // this is in the desired
+                                       // regions -- so it's not.
+                                       // TODO: handle ref spanning
+                                       // tile case.
                                        continue
                                }
-                               if cmd.filter.MaxTag >= 0 && tag > tagID(cmd.filter.MaxTag) {
+                               if rt != nil && rt.excluded {
+                                       // TODO: don't skip yet --
+                                       // first check for spanning
+                                       // tile variants that
+                                       // intersect non-excluded ref
+                                       // tiles.
                                        continue
                                }
+                               if cmd.filter.MaxTag >= 0 && tag > tagID(cmd.filter.MaxTag) {
+                                       break
+                               }
                                remap := variantRemap[tag-tagstart]
                                maxv := tileVariantID(0)
                                for _, v := range remap {
@@ -525,12 +576,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                                        }
                                }
                                if *onehotChunked || *onehotSingle {
-                                       if tag == cmd.debugTag {
-                                               log.WithFields(logrus.Fields{
-                                                       "cgs[2].Variants[tag*2:(tag+1)*2]": cgs[cmd.cgnames[2]].Variants[(tag-tagstart)*2 : (tag-tagstart+1)*2],
-                                               }).Info("before tv2homhet")
-                                       }
-                                       onehot, xrefs := cmd.tv2homhet(cgs, maxv, remap, tag, tagstart)
+                                       onehot, xrefs := cmd.tv2homhet(cgs, maxv, remap, tag, tagstart, seq)
                                        if tag == cmd.debugTag {
                                                log.WithFields(logrus.Fields{
                                                        "onehot": onehot,
@@ -543,6 +589,10 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                                if rt == nil {
                                        // Reference does not use any
                                        // variant of this tile
+                                       //
+                                       // TODO: diff against the
+                                       // relevant portion of the
+                                       // ref's spanning tile
                                        outcol++
                                        continue
                                }
@@ -559,11 +609,29 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                                        } else {
                                                done[v] = true
                                        }
-                                       if len(tv.Sequence) < taglen || !bytes.HasSuffix(rt.tiledata, tv.Sequence[len(tv.Sequence)-taglen:]) {
+                                       if len(tv.Sequence) < taglen {
+                                               continue
+                                       }
+                                       // if reftilestr doesn't end
+                                       // in the same tag as tv,
+                                       // extend reftilestr with
+                                       // following ref tiles until
+                                       // it does (up to an arbitrary
+                                       // sanity-check limit)
+                                       reftilestr := reftilestr
+                                       endtagstr := strings.ToUpper(string(tv.Sequence[len(tv.Sequence)-taglen:]))
+                                       for i, rt := 0, rt; i < annotationMaxTileSpan && !strings.HasSuffix(reftilestr, endtagstr) && rt.nexttag >= 0; i++ {
+                                               rt = reftile[rt.nexttag]
+                                               reftilestr += strings.ToUpper(string(rt.tiledata[taglen:]))
+                                       }
+                                       if mask != nil && !mask.Check(strings.TrimPrefix(rt.seqname, "chr"), rt.pos, rt.pos+len(reftilestr)) {
+                                               continue
+                                       }
+                                       if !strings.HasSuffix(reftilestr, endtagstr) {
                                                fmt.Fprintf(annow, "%d,%d,%d,,%s,%d,,,\n", tag, outcol, v, rt.seqname, rt.pos)
                                                continue
                                        }
-                                       if lendiff := len(rt.tiledata) - len(tv.Sequence); lendiff < -1000 || lendiff > 1000 {
+                                       if lendiff := len(reftilestr) - len(tv.Sequence); lendiff < -1000 || lendiff > 1000 {
                                                fmt.Fprintf(annow, "%d,%d,%d,,%s,%d,,,\n", tag, outcol, v, rt.seqname, rt.pos)
                                                continue
                                        }
@@ -676,19 +744,26 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                                cols := 2 * outcol
                                out := make([]int16, rows*cols)
                                for row, name := range cmd.cgnames {
-                                       out := out[row*cols:]
-                                       outcol := 0
+                                       outidx := row * cols
                                        for col, v := range cgs[name].Variants {
                                                tag := tagstart + tagID(col/2)
-                                               if mask != nil && reftile[tag] == nil || (cmd.filter.MaxTag >= 0 && tag > tagID(cmd.filter.MaxTag)) {
+                                               if cmd.filter.MaxTag >= 0 && tag > tagID(cmd.filter.MaxTag) {
+                                                       break
+                                               }
+                                               if rt := reftile[tag]; rt == nil || rt.excluded {
                                                        continue
                                                }
-                                               if variants, ok := seq[tag]; ok && len(variants) > int(v) && len(variants[v].Sequence) > 0 {
-                                                       out[outcol] = int16(variantRemap[tag-tagstart][v])
+                                               if v == 0 {
+                                                       out[outidx] = 0 // tag not found / spanning tile
+                                               } else if variants, ok := seq[tag]; ok && int(v) < len(variants) && len(variants[v].Sequence) > 0 {
+                                                       out[outidx] = int16(variantRemap[tag-tagstart][v])
                                                } else {
-                                                       out[outcol] = -1
+                                                       out[outidx] = -1 // low quality tile variant
+                                               }
+                                               if tag == cmd.debugTag {
+                                                       log.Printf("tag %d row %d col %d outidx %d v %d out %d", tag, row, col, outidx, v, out[outidx])
                                                }
-                                               outcol++
+                                               outidx++
                                        }
                                }
                                seq = nil
@@ -987,10 +1062,32 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        return 1
                }
                fnm = fmt.Sprintf("%s/onehot-columns.npy", *outputDir)
-               err = writeNumpyInt32(fnm, onehotXref2int32(xrefs), 4, len(xrefs))
+               err = writeNumpyInt32(fnm, onehotXref2int32(xrefs), 5, len(xrefs))
+               if err != nil {
+                       return 1
+               }
+       }
+       if !*mergeOutput && !*onehotChunked && !*onehotSingle {
+               tagoffsetFilename := *outputDir + "/chunk-tag-offset.csv"
+               log.Infof("writing tag offsets to %s", tagoffsetFilename)
+               var f *os.File
+               f, err = os.Create(tagoffsetFilename)
                if err != nil {
                        return 1
                }
+               defer f.Close()
+               for idx, offset := range chunkStartTag {
+                       _, err = fmt.Fprintf(f, "%q,%d\n", fmt.Sprintf("matrix.%04d.npy", idx), offset)
+                       if err != nil {
+                               err = fmt.Errorf("write %s: %w", tagoffsetFilename, err)
+                               return 1
+                       }
+               }
+               err = f.Close()
+               if err != nil {
+                       err = fmt.Errorf("close %s: %w", tagoffsetFilename, err)
+                       return 1
+               }
        }
        return 0
 }
@@ -1224,7 +1321,7 @@ func allele2homhet(colpair [2][]int8) {
 type onehotXref struct {
        tag     tagID
        variant tileVariantID
-       het     bool
+       hom     bool
        pvalue  float64
 }
 
@@ -1234,7 +1331,7 @@ const onehotXrefSize = unsafe.Sizeof(onehotXref{})
 // variants of a single tile/tag#.
 //
 // Return nil if no tile variant passes Χ² filter.
-func (cmd *sliceNumpy) tv2homhet(cgs map[string]CompactGenome, maxv tileVariantID, remap []tileVariantID, tag, chunkstarttag tagID) ([][]int8, []onehotXref) {
+func (cmd *sliceNumpy) tv2homhet(cgs map[string]CompactGenome, maxv tileVariantID, remap []tileVariantID, tag, chunkstarttag tagID, seq map[tagID][]TileVariant) ([][]int8, []onehotXref) {
        if tag == cmd.debugTag {
                tv := make([]tileVariantID, len(cmd.cgnames)*2)
                for i, name := range cmd.cgnames {
@@ -1255,7 +1352,13 @@ func (cmd *sliceNumpy) tv2homhet(cgs map[string]CompactGenome, maxv tileVariantI
        tagoffset := tag - chunkstarttag
        coverage := 0
        for _, cg := range cgs {
-               if cg.Variants[tagoffset*2] > 0 && cg.Variants[tagoffset*2+1] > 0 {
+               alleles := 0
+               for _, v := range cg.Variants[tagoffset*2 : tagoffset*2+2] {
+                       if v > 0 && int(v) < len(seq[tag]) && len(seq[tag][v].Sequence) > 0 {
+                               alleles++
+                       }
+               }
+               if alleles == 2 {
                        coverage++
                }
        }
@@ -1294,7 +1397,7 @@ func (cmd *sliceNumpy) tv2homhet(cgs map[string]CompactGenome, maxv tileVariantI
                xref = append(xref, onehotXref{
                        tag:     tag,
                        variant: tileVariantID(col >> 1),
-                       het:     col&1 == 1,
+                       hom:     col&1 == 0,
                        pvalue:  p,
                })
        }
@@ -1319,14 +1422,15 @@ func bool2int8(in []bool) []int8 {
 // P-value row contains 1000000x actual p-value.
 func onehotXref2int32(xrefs []onehotXref) []int32 {
        xcols := len(xrefs)
-       xdata := make([]int32, 4*xcols)
+       xdata := make([]int32, 5*xcols)
        for i, xref := range xrefs {
                xdata[i] = int32(xref.tag)
                xdata[xcols+i] = int32(xref.variant)
-               if xref.het {
+               if xref.hom {
                        xdata[xcols*2+i] = 1
                }
                xdata[xcols*3+i] = int32(xref.pvalue * 1000000)
+               xdata[xcols*4+i] = int32(-math.Log10(xref.pvalue) * 1000000)
        }
        return xdata
 }