Fix some tests.
[lightning.git] / slice.go
index be5bb87904f057ea0a0310358f9371e80d081ccb..63e751c8da9aa9a245a64d94c3948c4e0443c06e 100644 (file)
--- a/slice.go
+++ b/slice.go
@@ -37,9 +37,11 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std
        flags := flag.NewFlagSet("", flag.ContinueOnError)
        flags.SetOutput(stderr)
        pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
+       pprofdir := flags.String("pprof-dir", "", "write Go profile data to `directory` periodically")
        runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
        projectUUID := flags.String("project", "", "project `UUID` for output data")
        priority := flags.Int("priority", 500, "container request priority")
+       preemptible := flags.Bool("preemptible", true, "request preemptible instance")
        outputDir := flags.String("output-dir", "./out", "output `directory`")
        tagsPerFile := flags.Int("tags-per-file", 50000, "tags per file (nfiles will be ~10M÷x)")
        err = flags.Parse(args)
@@ -60,17 +62,21 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std
                        log.Println(http.ListenAndServe(*pprof, nil))
                }()
        }
+       if *pprofdir != "" {
+               go writeProfilesPeriodically(*pprofdir)
+       }
 
        if !*runlocal {
                runner := arvadosContainerRunner{
                        Name:        "lightning slice",
                        Client:      arvados.NewClientFromEnv(),
                        ProjectUUID: *projectUUID,
-                       RAM:         200000000000,
-                       VCPUs:       32,
+                       RAM:         500000000000,
+                       VCPUs:       64,
                        Priority:    *priority,
-                       KeepCache:   50,
+                       KeepCache:   2,
                        APIAccess:   true,
+                       Preemptible: *preemptible,
                }
                for i := range inputDirs {
                        err = runner.TranslatePaths(&inputDirs[i])
@@ -103,7 +109,7 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std
 func Slice(tagsPerFile int, dstdir string, srcdirs []string) error {
        var infiles []string
        for _, srcdir := range srcdirs {
-               files, err := allGobFiles(srcdir)
+               files, err := allFiles(srcdir, matchGobFile)
                if err != nil {
                        return err
                }
@@ -111,14 +117,14 @@ func Slice(tagsPerFile int, dstdir string, srcdirs []string) error {
        }
        // dirNamespace[dir] is an int in [0,len(dirNamespace)), used below to
        // namespace variant numbers from different dirs.
-       dirNamespace := map[string]int{}
+       dirNamespace := map[string]tileVariantID{}
        for _, path := range infiles {
                dir, _ := filepath.Split(path)
                if _, ok := dirNamespace[dir]; !ok {
-                       dirNamespace[dir] = len(dirNamespace)
+                       dirNamespace[dir] = tileVariantID(len(dirNamespace))
                }
        }
-       namespaces := len(dirNamespace)
+       namespaces := tileVariantID(len(dirNamespace))
 
        var (
                tagset     [][]byte
@@ -134,21 +140,18 @@ func Slice(tagsPerFile int, dstdir string, srcdirs []string) error {
        )
 
        throttle := throttle{Max: runtime.GOMAXPROCS(0)}
-       for _, path := range infiles {
-               path := path
-               throttle.Acquire()
-               go func() {
-                       defer throttle.Release()
-                       f, err := open(path)
+       for _, infile := range infiles {
+               infile := infile
+               throttle.Go(func() error {
+                       f, err := open(infile)
                        if err != nil {
-                               throttle.Report(err)
-                               return
+                               return err
                        }
                        defer f.Close()
-                       dir, _ := filepath.Split(path)
+                       dir, _ := filepath.Split(infile)
                        namespace := dirNamespace[dir]
-                       log.Printf("reading %s (namespace %d)", path, namespace)
-                       err = DecodeLibrary(f, strings.HasSuffix(path, ".gz"), func(ent *LibraryEntry) error {
+                       log.Printf("reading %s (namespace %d)", infile, namespace)
+                       return DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error {
                                if err := throttle.Err(); err != nil {
                                        return err
                                }
@@ -175,8 +178,12 @@ func Slice(tagsPerFile int, dstdir string, srcdirs []string) error {
                                }
                                atomic.AddInt64(&countTileVariants, int64(len(ent.TileVariants)))
                                for _, tv := range ent.TileVariants {
-                                       tv.Variant = tileVariantID(int(tv.Variant)*namespaces + namespace)
-                                       err := encs[int(tv.Tag)/tagsPerFile].Encode(LibraryEntry{
+                                       tv.Variant = tv.Variant*namespaces + namespace
+                                       fileno := 0
+                                       if !tv.Ref {
+                                               fileno = int(tv.Tag) / tagsPerFile
+                                       }
+                                       err := encs[fileno].Encode(LibraryEntry{
                                                TileVariants: []TileVariant{tv},
                                        })
                                        if err != nil {
@@ -192,7 +199,7 @@ func Slice(tagsPerFile int, dstdir string, srcdirs []string) error {
                                for _, cg := range ent.CompactGenomes {
                                        for i, v := range cg.Variants {
                                                if v > 0 {
-                                                       cg.Variants[i] = tileVariantID(int(v)*namespaces + namespace)
+                                                       cg.Variants[i] = v*namespaces + namespace
                                                }
                                        }
                                        for i, enc := range encs {
@@ -226,7 +233,7 @@ func Slice(tagsPerFile int, dstdir string, srcdirs []string) error {
                                        for _, cs := range ent.CompactSequences {
                                                for _, tseq := range cs.TileSequences {
                                                        for i, libref := range tseq {
-                                                               tseq[i].Variant = tileVariantID(int(libref.Variant)*namespaces + namespace)
+                                                               tseq[i].Variant = libref.Variant*namespaces + namespace
                                                        }
                                                }
                                        }
@@ -237,8 +244,7 @@ func Slice(tagsPerFile int, dstdir string, srcdirs []string) error {
                                }
                                return nil
                        })
-                       throttle.Report(err)
-               }()
+               })
        }
        throttle.Wait()
        if throttle.Err() != nil {