Accept multiple input libraries for slice→slicenumpy.
[lightning.git] / slice.go
index 10e5e01bc2fce55de83dfba0d7f0d5492f904bf3..be5bb87904f057ea0a0310358f9371e80d081ccb 100644 (file)
--- a/slice.go
+++ b/slice.go
@@ -7,12 +7,14 @@ package lightning
 import (
        "bufio"
        "encoding/gob"
+       "errors"
        "flag"
        "fmt"
        "io"
        "net/http"
        _ "net/http/pprof"
        "os"
+       "path/filepath"
        "runtime"
        "strings"
        "sync"
@@ -38,7 +40,6 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std
        runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
        projectUUID := flags.String("project", "", "project `UUID` for output data")
        priority := flags.Int("priority", 500, "container request priority")
-       inputDir := flags.String("input-dir", "./in", "input `directory`")
        outputDir := flags.String("output-dir", "./out", "output `directory`")
        tagsPerFile := flags.Int("tags-per-file", 50000, "tags per file (nfiles will be ~10M÷x)")
        err = flags.Parse(args)
@@ -48,6 +49,11 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std
        } else if err != nil {
                return 2
        }
+       inputDirs := flags.Args()
+       if len(inputDirs) == 0 {
+               err = errors.New("no input dirs specified")
+               return 2
+       }
 
        if *pprof != "" {
                go func() {
@@ -66,15 +72,16 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std
                        KeepCache:   50,
                        APIAccess:   true,
                }
-               err = runner.TranslatePaths(inputDir)
-               if err != nil {
-                       return 1
+               for i := range inputDirs {
+                       err = runner.TranslatePaths(&inputDirs[i])
+                       if err != nil {
+                               return 1
+                       }
                }
-               runner.Args = []string{"slice", "-local=true",
+               runner.Args = append([]string{"slice", "-local=true",
                        "-pprof", ":6060",
-                       "-input-dir", *inputDir,
                        "-output-dir", "/mnt/output",
-               }
+               }, inputDirs...)
                var output string
                output, err = runner.Run()
                if err != nil {
@@ -84,7 +91,7 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std
                return 0
        }
 
-       err = Slice(*outputDir, *inputDir, *tagsPerFile)
+       err = Slice(*tagsPerFile, *outputDir, inputDirs)
        if err != nil {
                return 1
        }
@@ -93,11 +100,25 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std
 
 // Read tags+tiles+genomes from srcdir, write to dstdir with (up to)
 // the specified number of tags per file.
-func Slice(dstdir, srcdir string, tagsPerFile int) error {
-       infiles, err := allGobFiles(srcdir)
-       if err != nil {
-               return err
+func Slice(tagsPerFile int, dstdir string, srcdirs []string) error {
+       var infiles []string
+       for _, srcdir := range srcdirs {
+               files, err := allGobFiles(srcdir)
+               if err != nil {
+                       return err
+               }
+               infiles = append(infiles, files...)
        }
+       // dirNamespace[dir] is an int in [0,len(dirNamespace)), used below to
+       // namespace variant numbers from different dirs.
+       dirNamespace := map[string]int{}
+       for _, path := range infiles {
+               dir, _ := filepath.Split(path)
+               if _, ok := dirNamespace[dir]; !ok {
+                       dirNamespace[dir] = len(dirNamespace)
+               }
+       }
+       namespaces := len(dirNamespace)
 
        var (
                tagset     [][]byte
@@ -124,7 +145,9 @@ func Slice(dstdir, srcdir string, tagsPerFile int) error {
                                return
                        }
                        defer f.Close()
-                       log.Printf("reading %s", path)
+                       dir, _ := filepath.Split(path)
+                       namespace := dirNamespace[dir]
+                       log.Printf("reading %s (namespace %d)", path, namespace)
                        err = DecodeLibrary(f, strings.HasSuffix(path, ".gz"), func(ent *LibraryEntry) error {
                                if err := throttle.Err(); err != nil {
                                        return err
@@ -152,6 +175,7 @@ func Slice(dstdir, srcdir string, tagsPerFile int) error {
                                }
                                atomic.AddInt64(&countTileVariants, int64(len(ent.TileVariants)))
                                for _, tv := range ent.TileVariants {
+                                       tv.Variant = tileVariantID(int(tv.Variant)*namespaces + namespace)
                                        err := encs[int(tv.Tag)/tagsPerFile].Encode(LibraryEntry{
                                                TileVariants: []TileVariant{tv},
                                        })
@@ -166,6 +190,11 @@ func Slice(dstdir, srcdir string, tagsPerFile int) error {
                                // Easier for downstream code.
                                atomic.AddInt64(&countGenomes, int64(len(ent.CompactGenomes)))
                                for _, cg := range ent.CompactGenomes {
+                                       for i, v := range cg.Variants {
+                                               if v > 0 {
+                                                       cg.Variants[i] = tileVariantID(int(v)*namespaces + namespace)
+                                               }
+                                       }
                                        for i, enc := range encs {
                                                start := i * tagsPerFile
                                                end := start + tagsPerFile
@@ -194,6 +223,13 @@ func Slice(dstdir, srcdir string, tagsPerFile int) error {
                                // slice. Easier for downstream code.
                                atomic.AddInt64(&countReferences, int64(len(ent.CompactSequences)))
                                if len(ent.CompactSequences) > 0 {
+                                       for _, cs := range ent.CompactSequences {
+                                               for _, tseq := range cs.TileSequences {
+                                                       for i, libref := range tseq {
+                                                               tseq[i].Variant = tileVariantID(int(libref.Variant)*namespaces + namespace)
+                                                       }
+                                               }
+                                       }
                                        err := encs[0].Encode(LibraryEntry{CompactSequences: ent.CompactSequences})
                                        if err != nil {
                                                return err