X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/2d1b6eb5c7adf27029ad3f900b9e357c2e66e105..d5a86b8d482c8897627898110e2dfa6d43798228:/slice.go diff --git a/slice.go b/slice.go index 10e5e01bc2..7d2ab87784 100644 --- a/slice.go +++ b/slice.go @@ -7,12 +7,14 @@ package lightning import ( "bufio" "encoding/gob" + "errors" "flag" "fmt" "io" "net/http" _ "net/http/pprof" "os" + "path/filepath" "runtime" "strings" "sync" @@ -38,7 +40,6 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)") projectUUID := flags.String("project", "", "project `UUID` for output data") priority := flags.Int("priority", 500, "container request priority") - inputDir := flags.String("input-dir", "./in", "input `directory`") outputDir := flags.String("output-dir", "./out", "output `directory`") tagsPerFile := flags.Int("tags-per-file", 50000, "tags per file (nfiles will be ~10M÷x)") err = flags.Parse(args) @@ -48,6 +49,11 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std } else if err != nil { return 2 } + inputDirs := flags.Args() + if len(inputDirs) == 0 { + err = errors.New("no input dirs specified") + return 2 + } if *pprof != "" { go func() { @@ -66,15 +72,16 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std KeepCache: 50, APIAccess: true, } - err = runner.TranslatePaths(inputDir) - if err != nil { - return 1 + for i := range inputDirs { + err = runner.TranslatePaths(&inputDirs[i]) + if err != nil { + return 1 + } } - runner.Args = []string{"slice", "-local=true", + runner.Args = append([]string{"slice", "-local=true", "-pprof", ":6060", - "-input-dir", *inputDir, "-output-dir", "/mnt/output", - } + }, inputDirs...) var output string output, err = runner.Run() if err != nil { @@ -84,7 +91,7 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std return 0 } - err = Slice(*outputDir, *inputDir, *tagsPerFile) + err = Slice(*tagsPerFile, *outputDir, inputDirs) if err != nil { return 1 } @@ -93,11 +100,25 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std // Read tags+tiles+genomes from srcdir, write to dstdir with (up to) // the specified number of tags per file. -func Slice(dstdir, srcdir string, tagsPerFile int) error { - infiles, err := allGobFiles(srcdir) - if err != nil { - return err +func Slice(tagsPerFile int, dstdir string, srcdirs []string) error { + var infiles []string + for _, srcdir := range srcdirs { + files, err := allGobFiles(srcdir) + if err != nil { + return err + } + infiles = append(infiles, files...) } + // dirNamespace[dir] is an int in [0,len(dirNamespace)), used below to + // namespace variant numbers from different dirs. + dirNamespace := map[string]tileVariantID{} + for _, path := range infiles { + dir, _ := filepath.Split(path) + if _, ok := dirNamespace[dir]; !ok { + dirNamespace[dir] = tileVariantID(len(dirNamespace)) + } + } + namespaces := tileVariantID(len(dirNamespace)) var ( tagset [][]byte @@ -113,19 +134,21 @@ func Slice(dstdir, srcdir string, tagsPerFile int) error { ) throttle := throttle{Max: runtime.GOMAXPROCS(0)} - for _, path := range infiles { - path := path + for _, infile := range infiles { + infile := infile throttle.Acquire() go func() { defer throttle.Release() - f, err := open(path) + f, err := open(infile) if err != nil { throttle.Report(err) return } defer f.Close() - log.Printf("reading %s", path) - err = DecodeLibrary(f, strings.HasSuffix(path, ".gz"), func(ent *LibraryEntry) error { + dir, _ := filepath.Split(infile) + namespace := dirNamespace[dir] + log.Printf("reading %s (namespace %d)", infile, namespace) + err = DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error { if err := throttle.Err(); err != nil { return err } @@ -152,7 +175,12 @@ func Slice(dstdir, srcdir string, tagsPerFile int) error { } atomic.AddInt64(&countTileVariants, int64(len(ent.TileVariants))) for _, tv := range ent.TileVariants { - err := encs[int(tv.Tag)/tagsPerFile].Encode(LibraryEntry{ + tv.Variant = tv.Variant*namespaces + namespace + fileno := 0 + if !tv.Ref { + fileno = int(tv.Tag) / tagsPerFile + } + err := encs[fileno].Encode(LibraryEntry{ TileVariants: []TileVariant{tv}, }) if err != nil { @@ -166,6 +194,11 @@ func Slice(dstdir, srcdir string, tagsPerFile int) error { // Easier for downstream code. atomic.AddInt64(&countGenomes, int64(len(ent.CompactGenomes))) for _, cg := range ent.CompactGenomes { + for i, v := range cg.Variants { + if v > 0 { + cg.Variants[i] = v*namespaces + namespace + } + } for i, enc := range encs { start := i * tagsPerFile end := start + tagsPerFile @@ -194,6 +227,13 @@ func Slice(dstdir, srcdir string, tagsPerFile int) error { // slice. Easier for downstream code. atomic.AddInt64(&countReferences, int64(len(ent.CompactSequences))) if len(ent.CompactSequences) > 0 { + for _, cs := range ent.CompactSequences { + for _, tseq := range cs.TileSequences { + for i, libref := range tseq { + tseq[i].Variant = libref.Variant*namespaces + namespace + } + } + } err := encs[0].Encode(LibraryEntry{CompactSequences: ent.CompactSequences}) if err != nil { return err