Fix some tests.
[lightning.git] / slice.go
index 50f219fb3cb6fba95d2b61817085a83f3ebf070b..63e751c8da9aa9a245a64d94c3948c4e0443c06e 100644 (file)
--- a/slice.go
+++ b/slice.go
@@ -7,15 +7,18 @@ package lightning
 import (
        "bufio"
        "encoding/gob"
+       "errors"
        "flag"
        "fmt"
        "io"
        "net/http"
        _ "net/http/pprof"
        "os"
+       "path/filepath"
        "runtime"
        "strings"
        "sync"
+       "sync/atomic"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "github.com/klauspost/pgzip"
@@ -34,10 +37,11 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std
        flags := flag.NewFlagSet("", flag.ContinueOnError)
        flags.SetOutput(stderr)
        pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
+       pprofdir := flags.String("pprof-dir", "", "write Go profile data to `directory` periodically")
        runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
        projectUUID := flags.String("project", "", "project `UUID` for output data")
        priority := flags.Int("priority", 500, "container request priority")
-       inputDir := flags.String("input-dir", "./in", "input `directory`")
+       preemptible := flags.Bool("preemptible", true, "request preemptible instance")
        outputDir := flags.String("output-dir", "./out", "output `directory`")
        tagsPerFile := flags.Int("tags-per-file", 50000, "tags per file (nfiles will be ~10M÷x)")
        err = flags.Parse(args)
@@ -47,33 +51,43 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std
        } else if err != nil {
                return 2
        }
+       inputDirs := flags.Args()
+       if len(inputDirs) == 0 {
+               err = errors.New("no input dirs specified")
+               return 2
+       }
 
        if *pprof != "" {
                go func() {
                        log.Println(http.ListenAndServe(*pprof, nil))
                }()
        }
+       if *pprofdir != "" {
+               go writeProfilesPeriodically(*pprofdir)
+       }
 
        if !*runlocal {
                runner := arvadosContainerRunner{
                        Name:        "lightning slice",
                        Client:      arvados.NewClientFromEnv(),
                        ProjectUUID: *projectUUID,
-                       RAM:         200000000000,
-                       VCPUs:       32,
+                       RAM:         500000000000,
+                       VCPUs:       64,
                        Priority:    *priority,
-                       KeepCache:   50,
+                       KeepCache:   2,
                        APIAccess:   true,
+                       Preemptible: *preemptible,
                }
-               err = runner.TranslatePaths(inputDir)
-               if err != nil {
-                       return 1
+               for i := range inputDirs {
+                       err = runner.TranslatePaths(&inputDirs[i])
+                       if err != nil {
+                               return 1
+                       }
                }
-               runner.Args = []string{"slice", "-local=true",
+               runner.Args = append([]string{"slice", "-local=true",
                        "-pprof", ":6060",
-                       "-input-dir", *inputDir,
                        "-output-dir", "/mnt/output",
-               }
+               }, inputDirs...)
                var output string
                output, err = runner.Run()
                if err != nil {
@@ -83,7 +97,7 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std
                return 0
        }
 
-       err = Slice(*outputDir, *inputDir, *tagsPerFile)
+       err = Slice(*tagsPerFile, *outputDir, inputDirs)
        if err != nil {
                return 1
        }
@@ -92,11 +106,25 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std
 
 // Read tags+tiles+genomes from srcdir, write to dstdir with (up to)
 // the specified number of tags per file.
-func Slice(dstdir, srcdir string, tagsPerFile int) error {
-       infiles, err := allGobFiles(srcdir)
-       if err != nil {
-               return err
+func Slice(tagsPerFile int, dstdir string, srcdirs []string) error {
+       var infiles []string
+       for _, srcdir := range srcdirs {
+               files, err := allFiles(srcdir, matchGobFile)
+               if err != nil {
+                       return err
+               }
+               infiles = append(infiles, files...)
+       }
+       // dirNamespace[dir] is an int in [0,len(dirNamespace)), used below to
+       // namespace variant numbers from different dirs.
+       dirNamespace := map[string]tileVariantID{}
+       for _, path := range infiles {
+               dir, _ := filepath.Split(path)
+               if _, ok := dirNamespace[dir]; !ok {
+                       dirNamespace[dir] = tileVariantID(len(dirNamespace))
+               }
        }
+       namespaces := tileVariantID(len(dirNamespace))
 
        var (
                tagset     [][]byte
@@ -105,22 +133,25 @@ func Slice(dstdir, srcdir string, tagsPerFile int) error {
                bufws      []*bufio.Writer
                gzws       []*pgzip.Writer
                encs       []*gob.Encoder
+
+               countTileVariants int64
+               countGenomes      int64
+               countReferences   int64
        )
 
        throttle := throttle{Max: runtime.GOMAXPROCS(0)}
-       for _, path := range infiles {
-               path := path
-               throttle.Acquire()
-               go func() {
-                       defer throttle.Release()
-                       f, err := open(path)
+       for _, infile := range infiles {
+               infile := infile
+               throttle.Go(func() error {
+                       f, err := open(infile)
                        if err != nil {
-                               throttle.Report(err)
-                               return
+                               return err
                        }
                        defer f.Close()
-                       log.Printf("reading %s", path)
-                       err = DecodeLibrary(f, strings.HasSuffix(path, ".gz"), func(ent *LibraryEntry) error {
+                       dir, _ := filepath.Split(infile)
+                       namespace := dirNamespace[dir]
+                       log.Printf("reading %s (namespace %d)", infile, namespace)
+                       return DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error {
                                if err := throttle.Err(); err != nil {
                                        return err
                                }
@@ -145,8 +176,14 @@ func Slice(dstdir, srcdir string, tagsPerFile int) error {
                                if err := throttle.Err(); err != nil {
                                        return err
                                }
+                               atomic.AddInt64(&countTileVariants, int64(len(ent.TileVariants)))
                                for _, tv := range ent.TileVariants {
-                                       err := encs[int(tv.Tag)/tagsPerFile].Encode(LibraryEntry{
+                                       tv.Variant = tv.Variant*namespaces + namespace
+                                       fileno := 0
+                                       if !tv.Ref {
+                                               fileno = int(tv.Tag) / tagsPerFile
+                                       }
+                                       err := encs[fileno].Encode(LibraryEntry{
                                                TileVariants: []TileVariant{tv},
                                        })
                                        if err != nil {
@@ -158,7 +195,13 @@ func Slice(dstdir, srcdir string, tagsPerFile int) error {
                                // genome, even if there are no
                                // variants in the relevant range.
                                // Easier for downstream code.
+                               atomic.AddInt64(&countGenomes, int64(len(ent.CompactGenomes)))
                                for _, cg := range ent.CompactGenomes {
+                                       for i, v := range cg.Variants {
+                                               if v > 0 {
+                                                       cg.Variants[i] = v*namespaces + namespace
+                                               }
+                                       }
                                        for i, enc := range encs {
                                                start := i * tagsPerFile
                                                end := start + tagsPerFile
@@ -185,7 +228,15 @@ func Slice(dstdir, srcdir string, tagsPerFile int) error {
                                }
                                // Write all ref seqs to the first
                                // slice. Easier for downstream code.
+                               atomic.AddInt64(&countReferences, int64(len(ent.CompactSequences)))
                                if len(ent.CompactSequences) > 0 {
+                                       for _, cs := range ent.CompactSequences {
+                                               for _, tseq := range cs.TileSequences {
+                                                       for i, libref := range tseq {
+                                                               tseq[i].Variant = libref.Variant*namespaces + namespace
+                                                       }
+                                               }
+                                       }
                                        err := encs[0].Encode(LibraryEntry{CompactSequences: ent.CompactSequences})
                                        if err != nil {
                                                return err
@@ -193,14 +244,14 @@ func Slice(dstdir, srcdir string, tagsPerFile int) error {
                                }
                                return nil
                        })
-                       throttle.Report(err)
-               }()
+               })
        }
        throttle.Wait()
        if throttle.Err() != nil {
                closeOutFiles(fs, bufws, gzws, encs)
                return throttle.Err()
        }
+       defer log.Printf("Total %d tile variants, %d genomes, %d reference sequences", countTileVariants, countGenomes, countReferences)
        return closeOutFiles(fs, bufws, gzws, encs)
 }