Accept multiple input libraries for slice→slicenumpy.
[lightning.git] / slicenumpy.go
index 2fc2fd6149c0bf546a758cd58b99d8f3535971c8..a8d217fefa664c3d566a93554f01cf5224ef5367 100644 (file)
@@ -16,7 +16,6 @@ import (
        "runtime"
        "sort"
        "strings"
-       "sync"
        "sync/atomic"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
@@ -24,6 +23,7 @@ import (
        "github.com/kshedden/gonpy"
        "github.com/sirupsen/logrus"
        log "github.com/sirupsen/logrus"
+       "golang.org/x/crypto/blake2b"
 )
 
 type sliceNumpy struct {
@@ -177,10 +177,10 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                }
        }
        log.Info("loading reference tiles from all slices")
-       throttle := throttle{Max: runtime.GOMAXPROCS(0)}
+       throttle1 := throttle{Max: runtime.GOMAXPROCS(0)}
        for _, infile := range infiles {
                infile := infile
-               throttle.Go(func() error {
+               throttle1.Go(func() error {
                        defer log.Infof("%s: done", infile)
                        f, err := open(infile)
                        if err != nil {
@@ -197,12 +197,12 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        })
                })
        }
-       throttle.Wait()
+       throttle1.Wait()
 
        log.Info("reconstructing reference sequences")
        for seqname, cseq := range refseq {
                seqname, cseq := seqname, cseq
-               throttle.Go(func() error {
+               throttle1.Go(func() error {
                        defer log.Printf("... %s done", seqname)
                        pos := 0
                        for _, libref := range cseq {
@@ -213,7 +213,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        return nil
                })
        }
-       throttle.Wait()
+       throttle1.Wait()
 
        log.Info("TODO: determining which tiles intersect given regions")
 
@@ -221,7 +221,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
        var done int64
        for infileIdx, infile := range infiles {
                infileIdx, infile := infileIdx, infile
-               throttle.Go(func() error {
+               throttle1.Go(func() error {
                        seq := make(map[tagID][]TileVariant, 50000)
                        cgs := make(map[string]CompactGenome, len(cgnames))
                        f, err := open(infile)
@@ -233,6 +233,9 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        err = DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error {
                                for _, tv := range ent.TileVariants {
                                        variants := seq[tv.Tag]
+                                       if len(variants) == 0 {
+                                               variants = make([]TileVariant, 100)
+                                       }
                                        for len(variants) <= int(tv.Variant) {
                                                variants = append(variants, TileVariant{})
                                        }
@@ -252,46 +255,55 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
 
                        // TODO: filters
 
-                       log.Infof("renumbering variants for tags %d-%d", tagstart, tagend)
+                       log.Infof("renumber/dedup variants for tags %d-%d", tagstart, tagend)
                        variantRemap := make([][]tileVariantID, tagend-tagstart)
-                       var wg sync.WaitGroup
+                       throttle2 := throttle{Max: runtime.GOMAXPROCS(0)}
                        for tag, variants := range seq {
                                tag, variants := tag, variants
-                               wg.Add(1)
+                               throttle2.Acquire()
                                go func() {
-                                       defer wg.Done()
-                                       count := make([]tileVariantID, len(variants))
+                                       defer throttle2.Release()
+                                       count := make(map[[blake2b.Size256]byte]int, len(variants))
                                        for _, cg := range cgs {
                                                idx := (tag - tagstart) * 2
                                                if int(idx) < len(cg.Variants) {
-                                                       count[cg.Variants[idx]]++
-                                                       count[cg.Variants[idx+1]]++
+                                                       count[variants[cg.Variants[idx]].Blake2b]++
+                                                       count[variants[cg.Variants[idx+1]].Blake2b]++
                                                }
                                        }
-                                       ranked := make([]tileVariantID, len(variants))
-                                       for i := range ranked {
-                                               ranked[i] = tileVariantID(i)
+                                       // hash[i] will be the hash of
+                                       // the variant(s) that should
+                                       // be at rank i (0-based).
+                                       hash := make([][blake2b.Size256]byte, 0, len(count))
+                                       for b := range count {
+                                               hash = append(hash, b)
                                        }
-                                       sort.Slice(ranked, func(i, j int) bool {
-                                               if i == 0 {
-                                                       return true // leave ranked[0] at position 0, unused
-                                               } else if j == 0 {
-                                                       return false // ditto
-                                               }
-                                               if cri, crj := count[ranked[i]], count[ranked[j]]; cri != crj {
-                                                       return cri > crj
+                                       sort.Slice(hash, func(i, j int) bool {
+                                               bi, bj := &hash[i], &hash[j]
+                                               if ci, cj := count[*bi], count[*bj]; ci != cj {
+                                                       return ci > cj
                                                } else {
-                                                       return bytes.Compare(variants[ranked[i]].Blake2b[:], variants[ranked[j]].Blake2b[:]) < 0
+                                                       return bytes.Compare((*bi)[:], (*bj)[:]) < 0
                                                }
                                        })
-                                       remap := make([]tileVariantID, len(ranked))
-                                       for i, r := range ranked {
-                                               remap[r] = tileVariantID(i)
+                                       // rank[b] will be the 1-based
+                                       // new variant number for
+                                       // variants whose hash is b.
+                                       rank := make(map[[blake2b.Size256]byte]tileVariantID, len(hash))
+                                       for i, h := range hash {
+                                               rank[h] = tileVariantID(i + 1)
+                                       }
+                                       // remap[v] will be the new
+                                       // variant number for original
+                                       // variant number v.
+                                       remap := make([]tileVariantID, len(variants))
+                                       for i, tv := range variants {
+                                               remap[i] = rank[tv.Blake2b]
                                        }
                                        variantRemap[tag-tagstart] = remap
                                }()
                        }
-                       wg.Wait()
+                       throttle2.Wait()
 
                        annotationsFilename := fmt.Sprintf("%s/matrix.%04d.annotations.csv", *outputDir, infileIdx)
                        log.Infof("writing %s", annotationsFilename)
@@ -380,7 +392,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        return nil
                })
        }
-       if err = throttle.Wait(); err != nil {
+       if err = throttle1.Wait(); err != nil {
                return 1
        }
        return 0