Accept multiple input libraries for slice→slicenumpy.
authorTom Clegg <tom@tomclegg.ca>
Thu, 23 Sep 2021 19:52:42 +0000 (15:52 -0400)
committerTom Clegg <tom@tomclegg.ca>
Thu, 23 Sep 2021 19:52:42 +0000 (15:52 -0400)
refs #17966

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

slice.go
slice_test.go
slicenumpy.go

index 10e5e01bc2fce55de83dfba0d7f0d5492f904bf3..be5bb87904f057ea0a0310358f9371e80d081ccb 100644 (file)
--- a/slice.go
+++ b/slice.go
@@ -7,12 +7,14 @@ package lightning
 import (
        "bufio"
        "encoding/gob"
+       "errors"
        "flag"
        "fmt"
        "io"
        "net/http"
        _ "net/http/pprof"
        "os"
+       "path/filepath"
        "runtime"
        "strings"
        "sync"
@@ -38,7 +40,6 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std
        runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
        projectUUID := flags.String("project", "", "project `UUID` for output data")
        priority := flags.Int("priority", 500, "container request priority")
-       inputDir := flags.String("input-dir", "./in", "input `directory`")
        outputDir := flags.String("output-dir", "./out", "output `directory`")
        tagsPerFile := flags.Int("tags-per-file", 50000, "tags per file (nfiles will be ~10M÷x)")
        err = flags.Parse(args)
@@ -48,6 +49,11 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std
        } else if err != nil {
                return 2
        }
+       inputDirs := flags.Args()
+       if len(inputDirs) == 0 {
+               err = errors.New("no input dirs specified")
+               return 2
+       }
 
        if *pprof != "" {
                go func() {
@@ -66,15 +72,16 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std
                        KeepCache:   50,
                        APIAccess:   true,
                }
-               err = runner.TranslatePaths(inputDir)
-               if err != nil {
-                       return 1
+               for i := range inputDirs {
+                       err = runner.TranslatePaths(&inputDirs[i])
+                       if err != nil {
+                               return 1
+                       }
                }
-               runner.Args = []string{"slice", "-local=true",
+               runner.Args = append([]string{"slice", "-local=true",
                        "-pprof", ":6060",
-                       "-input-dir", *inputDir,
                        "-output-dir", "/mnt/output",
-               }
+               }, inputDirs...)
                var output string
                output, err = runner.Run()
                if err != nil {
@@ -84,7 +91,7 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std
                return 0
        }
 
-       err = Slice(*outputDir, *inputDir, *tagsPerFile)
+       err = Slice(*tagsPerFile, *outputDir, inputDirs)
        if err != nil {
                return 1
        }
@@ -93,11 +100,25 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std
 
 // Read tags+tiles+genomes from srcdir, write to dstdir with (up to)
 // the specified number of tags per file.
-func Slice(dstdir, srcdir string, tagsPerFile int) error {
-       infiles, err := allGobFiles(srcdir)
-       if err != nil {
-               return err
+func Slice(tagsPerFile int, dstdir string, srcdirs []string) error {
+       var infiles []string
+       for _, srcdir := range srcdirs {
+               files, err := allGobFiles(srcdir)
+               if err != nil {
+                       return err
+               }
+               infiles = append(infiles, files...)
        }
+       // dirNamespace[dir] is an int in [0,len(dirNamespace)), used below to
+       // namespace variant numbers from different dirs.
+       dirNamespace := map[string]int{}
+       for _, path := range infiles {
+               dir, _ := filepath.Split(path)
+               if _, ok := dirNamespace[dir]; !ok {
+                       dirNamespace[dir] = len(dirNamespace)
+               }
+       }
+       namespaces := len(dirNamespace)
 
        var (
                tagset     [][]byte
@@ -124,7 +145,9 @@ func Slice(dstdir, srcdir string, tagsPerFile int) error {
                                return
                        }
                        defer f.Close()
-                       log.Printf("reading %s", path)
+                       dir, _ := filepath.Split(path)
+                       namespace := dirNamespace[dir]
+                       log.Printf("reading %s (namespace %d)", path, namespace)
                        err = DecodeLibrary(f, strings.HasSuffix(path, ".gz"), func(ent *LibraryEntry) error {
                                if err := throttle.Err(); err != nil {
                                        return err
@@ -152,6 +175,7 @@ func Slice(dstdir, srcdir string, tagsPerFile int) error {
                                }
                                atomic.AddInt64(&countTileVariants, int64(len(ent.TileVariants)))
                                for _, tv := range ent.TileVariants {
+                                       tv.Variant = tileVariantID(int(tv.Variant)*namespaces + namespace)
                                        err := encs[int(tv.Tag)/tagsPerFile].Encode(LibraryEntry{
                                                TileVariants: []TileVariant{tv},
                                        })
@@ -166,6 +190,11 @@ func Slice(dstdir, srcdir string, tagsPerFile int) error {
                                // Easier for downstream code.
                                atomic.AddInt64(&countGenomes, int64(len(ent.CompactGenomes)))
                                for _, cg := range ent.CompactGenomes {
+                                       for i, v := range cg.Variants {
+                                               if v > 0 {
+                                                       cg.Variants[i] = tileVariantID(int(v)*namespaces + namespace)
+                                               }
+                                       }
                                        for i, enc := range encs {
                                                start := i * tagsPerFile
                                                end := start + tagsPerFile
@@ -194,6 +223,13 @@ func Slice(dstdir, srcdir string, tagsPerFile int) error {
                                // slice. Easier for downstream code.
                                atomic.AddInt64(&countReferences, int64(len(ent.CompactSequences)))
                                if len(ent.CompactSequences) > 0 {
+                                       for _, cs := range ent.CompactSequences {
+                                               for _, tseq := range cs.TileSequences {
+                                                       for i, libref := range tseq {
+                                                               tseq[i].Variant = tileVariantID(int(libref.Variant)*namespaces + namespace)
+                                                       }
+                                               }
+                                       }
                                        err := encs[0].Encode(LibraryEntry{CompactSequences: ent.CompactSequences})
                                        if err != nil {
                                                return err
index c73d3d7dc0cd8496e4e02b9e93f84f41ad18a19b..532b7bb2e5b1653223faf3d20a986cb66ccd8657 100644 (file)
@@ -19,8 +19,20 @@ var _ = check.Suite(&sliceSuite{})
 
 func (s *sliceSuite) TestImportAndSlice(c *check.C) {
        tmpdir := c.MkDir()
+       err := os.Mkdir(tmpdir+"/lib1", 0777)
+       c.Assert(err, check.IsNil)
+       err = os.Mkdir(tmpdir+"/lib2", 0777)
+       c.Assert(err, check.IsNil)
+       err = os.Mkdir(tmpdir+"/lib3", 0777)
+       c.Assert(err, check.IsNil)
+       cwd, err := os.Getwd()
+       c.Assert(err, check.IsNil)
+       err = os.Symlink(cwd+"/testdata/pipeline1", tmpdir+"/pipeline1")
+       c.Assert(err, check.IsNil)
+       err = os.Symlink(cwd+"/testdata/pipeline1", tmpdir+"/pipeline1dup")
+       c.Assert(err, check.IsNil)
 
-       err := ioutil.WriteFile(tmpdir+"/chr1-12-100.bed", []byte("chr1\t12\t100\ttest.1\n"), 0644)
+       err = ioutil.WriteFile(tmpdir+"/chr1-12-100.bed", []byte("chr1\t12\t100\ttest.1\n"), 0644)
        c.Check(err, check.IsNil)
 
        c.Log("=== import testdata/ref ===")
@@ -29,7 +41,7 @@ func (s *sliceSuite) TestImportAndSlice(c *check.C) {
                "-tag-library", "testdata/tags",
                "-output-tiles",
                "-save-incomplete-tiles",
-               "-o", tmpdir + "/library1.gob",
+               "-o", tmpdir + "/lib1/library1.gob",
                "testdata/ref.fasta",
        }, nil, os.Stderr, os.Stderr)
        c.Assert(exited, check.Equals, 0)
@@ -39,29 +51,31 @@ func (s *sliceSuite) TestImportAndSlice(c *check.C) {
                "-local=true",
                "-tag-library", "testdata/tags",
                "-output-tiles",
-               "-o", tmpdir + "/library2.gob",
-               "testdata/pipeline1",
+               "-o", tmpdir + "/lib2/library2.gob",
+               tmpdir + "/pipeline1",
        }, nil, os.Stderr, os.Stderr)
        c.Assert(exited, check.Equals, 0)
 
-       c.Log("=== merge ===")
-       exited = (&merger{}).RunCommand("merge", []string{
+       c.Log("=== import pipeline1dup ===")
+       exited = (&importer{}).RunCommand("import", []string{
                "-local=true",
-               "-o", tmpdir + "/library.gob",
-               tmpdir + "/library1.gob",
-               tmpdir + "/library2.gob",
+               "-tag-library", "testdata/tags",
+               "-output-tiles",
+               "-o", tmpdir + "/lib3/library3.gob",
+               tmpdir + "/pipeline1dup",
        }, nil, os.Stderr, os.Stderr)
        c.Assert(exited, check.Equals, 0)
 
-       input := tmpdir + "/library.gob"
        slicedir := c.MkDir()
 
        c.Log("=== slice ===")
        exited = (&slicecmd{}).RunCommand("slice", []string{
                "-local=true",
-               "-input-dir=" + input,
                "-output-dir=" + slicedir,
                "-tags-per-file=2",
+               tmpdir + "/lib1",
+               tmpdir + "/lib2",
+               tmpdir + "/lib3",
        }, nil, os.Stderr, os.Stderr)
        c.Check(exited, check.Equals, 0)
        out, _ := exec.Command("find", slicedir, "-ls").CombinedOutput()
@@ -83,9 +97,9 @@ func (s *sliceSuite) TestImportAndSlice(c *check.C) {
        defer f.Close()
        npy, err := gonpy.NewReader(f)
        c.Assert(err, check.IsNil)
-       c.Check(npy.Shape, check.DeepEquals, []int{2, 4})
+       c.Check(npy.Shape, check.DeepEquals, []int{4, 4})
        variants, err := npy.GetInt16()
-       c.Check(variants, check.DeepEquals, []int16{3, 2, 1, 2, -1, -1, 1, 1})
+       c.Check(variants, check.DeepEquals, []int16{3, 2, 1, 2, -1, -1, 1, 1, 3, 2, 1, 2, -1, -1, 1, 1})
 
        annotations, err := ioutil.ReadFile(npydir + "/matrix.0000.annotations.csv")
        c.Assert(err, check.IsNil)
index 2fc2fd6149c0bf546a758cd58b99d8f3535971c8..a8d217fefa664c3d566a93554f01cf5224ef5367 100644 (file)
@@ -16,7 +16,6 @@ import (
        "runtime"
        "sort"
        "strings"
-       "sync"
        "sync/atomic"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
@@ -24,6 +23,7 @@ import (
        "github.com/kshedden/gonpy"
        "github.com/sirupsen/logrus"
        log "github.com/sirupsen/logrus"
+       "golang.org/x/crypto/blake2b"
 )
 
 type sliceNumpy struct {
@@ -177,10 +177,10 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                }
        }
        log.Info("loading reference tiles from all slices")
-       throttle := throttle{Max: runtime.GOMAXPROCS(0)}
+       throttle1 := throttle{Max: runtime.GOMAXPROCS(0)}
        for _, infile := range infiles {
                infile := infile
-               throttle.Go(func() error {
+               throttle1.Go(func() error {
                        defer log.Infof("%s: done", infile)
                        f, err := open(infile)
                        if err != nil {
@@ -197,12 +197,12 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        })
                })
        }
-       throttle.Wait()
+       throttle1.Wait()
 
        log.Info("reconstructing reference sequences")
        for seqname, cseq := range refseq {
                seqname, cseq := seqname, cseq
-               throttle.Go(func() error {
+               throttle1.Go(func() error {
                        defer log.Printf("... %s done", seqname)
                        pos := 0
                        for _, libref := range cseq {
@@ -213,7 +213,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        return nil
                })
        }
-       throttle.Wait()
+       throttle1.Wait()
 
        log.Info("TODO: determining which tiles intersect given regions")
 
@@ -221,7 +221,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
        var done int64
        for infileIdx, infile := range infiles {
                infileIdx, infile := infileIdx, infile
-               throttle.Go(func() error {
+               throttle1.Go(func() error {
                        seq := make(map[tagID][]TileVariant, 50000)
                        cgs := make(map[string]CompactGenome, len(cgnames))
                        f, err := open(infile)
@@ -233,6 +233,9 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        err = DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error {
                                for _, tv := range ent.TileVariants {
                                        variants := seq[tv.Tag]
+                                       if len(variants) == 0 {
+                                               variants = make([]TileVariant, 100)
+                                       }
                                        for len(variants) <= int(tv.Variant) {
                                                variants = append(variants, TileVariant{})
                                        }
@@ -252,46 +255,55 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
 
                        // TODO: filters
 
-                       log.Infof("renumbering variants for tags %d-%d", tagstart, tagend)
+                       log.Infof("renumber/dedup variants for tags %d-%d", tagstart, tagend)
                        variantRemap := make([][]tileVariantID, tagend-tagstart)
-                       var wg sync.WaitGroup
+                       throttle2 := throttle{Max: runtime.GOMAXPROCS(0)}
                        for tag, variants := range seq {
                                tag, variants := tag, variants
-                               wg.Add(1)
+                               throttle2.Acquire()
                                go func() {
-                                       defer wg.Done()
-                                       count := make([]tileVariantID, len(variants))
+                                       defer throttle2.Release()
+                                       count := make(map[[blake2b.Size256]byte]int, len(variants))
                                        for _, cg := range cgs {
                                                idx := (tag - tagstart) * 2
                                                if int(idx) < len(cg.Variants) {
-                                                       count[cg.Variants[idx]]++
-                                                       count[cg.Variants[idx+1]]++
+                                                       count[variants[cg.Variants[idx]].Blake2b]++
+                                                       count[variants[cg.Variants[idx+1]].Blake2b]++
                                                }
                                        }
-                                       ranked := make([]tileVariantID, len(variants))
-                                       for i := range ranked {
-                                               ranked[i] = tileVariantID(i)
+                                       // hash[i] will be the hash of
+                                       // the variant(s) that should
+                                       // be at rank i (0-based).
+                                       hash := make([][blake2b.Size256]byte, 0, len(count))
+                                       for b := range count {
+                                               hash = append(hash, b)
                                        }
-                                       sort.Slice(ranked, func(i, j int) bool {
-                                               if i == 0 {
-                                                       return true // leave ranked[0] at position 0, unused
-                                               } else if j == 0 {
-                                                       return false // ditto
-                                               }
-                                               if cri, crj := count[ranked[i]], count[ranked[j]]; cri != crj {
-                                                       return cri > crj
+                                       sort.Slice(hash, func(i, j int) bool {
+                                               bi, bj := &hash[i], &hash[j]
+                                               if ci, cj := count[*bi], count[*bj]; ci != cj {
+                                                       return ci > cj
                                                } else {
-                                                       return bytes.Compare(variants[ranked[i]].Blake2b[:], variants[ranked[j]].Blake2b[:]) < 0
+                                                       return bytes.Compare((*bi)[:], (*bj)[:]) < 0
                                                }
                                        })
-                                       remap := make([]tileVariantID, len(ranked))
-                                       for i, r := range ranked {
-                                               remap[r] = tileVariantID(i)
+                                       // rank[b] will be the 1-based
+                                       // new variant number for
+                                       // variants whose hash is b.
+                                       rank := make(map[[blake2b.Size256]byte]tileVariantID, len(hash))
+                                       for i, h := range hash {
+                                               rank[h] = tileVariantID(i + 1)
+                                       }
+                                       // remap[v] will be the new
+                                       // variant number for original
+                                       // variant number v.
+                                       remap := make([]tileVariantID, len(variants))
+                                       for i, tv := range variants {
+                                               remap[i] = rank[tv.Blake2b]
                                        }
                                        variantRemap[tag-tagstart] = remap
                                }()
                        }
-                       wg.Wait()
+                       throttle2.Wait()
 
                        annotationsFilename := fmt.Sprintf("%s/matrix.%04d.annotations.csv", *outputDir, infileIdx)
                        log.Infof("writing %s", annotationsFilename)
@@ -380,7 +392,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        return nil
                })
        }
-       if err = throttle.Wait(); err != nil {
+       if err = throttle1.Wait(); err != nil {
                return 1
        }
        return 0