X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/056c033351153c6e4f20ff70796a1dca14878302..c5be98ec79b8bd3dbd8d129daa6648b0f8ac5b61:/slice.go diff --git a/slice.go b/slice.go index 50f219fb3c..4dbe6ab652 100644 --- a/slice.go +++ b/slice.go @@ -7,15 +7,18 @@ package lightning import ( "bufio" "encoding/gob" + "errors" "flag" "fmt" "io" "net/http" _ "net/http/pprof" "os" + "path/filepath" "runtime" "strings" "sync" + "sync/atomic" "git.arvados.org/arvados.git/sdk/go/arvados" "github.com/klauspost/pgzip" @@ -37,7 +40,6 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)") projectUUID := flags.String("project", "", "project `UUID` for output data") priority := flags.Int("priority", 500, "container request priority") - inputDir := flags.String("input-dir", "./in", "input `directory`") outputDir := flags.String("output-dir", "./out", "output `directory`") tagsPerFile := flags.Int("tags-per-file", 50000, "tags per file (nfiles will be ~10M÷x)") err = flags.Parse(args) @@ -47,6 +49,11 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std } else if err != nil { return 2 } + inputDirs := flags.Args() + if len(inputDirs) == 0 { + err = errors.New("no input dirs specified") + return 2 + } if *pprof != "" { go func() { @@ -59,21 +66,22 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std Name: "lightning slice", Client: arvados.NewClientFromEnv(), ProjectUUID: *projectUUID, - RAM: 200000000000, - VCPUs: 32, + RAM: 500000000000, + VCPUs: 64, Priority: *priority, - KeepCache: 50, + KeepCache: 2, APIAccess: true, } - err = runner.TranslatePaths(inputDir) - if err != nil { - return 1 + for i := range inputDirs { + err = runner.TranslatePaths(&inputDirs[i]) + if err != nil { + return 1 + } } - runner.Args = []string{"slice", "-local=true", + runner.Args = append([]string{"slice", "-local=true", "-pprof", ":6060", - "-input-dir", *inputDir, "-output-dir", "/mnt/output", - } + }, inputDirs...) var output string output, err = runner.Run() if err != nil { @@ -83,7 +91,7 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std return 0 } - err = Slice(*outputDir, *inputDir, *tagsPerFile) + err = Slice(*tagsPerFile, *outputDir, inputDirs) if err != nil { return 1 } @@ -92,11 +100,25 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std // Read tags+tiles+genomes from srcdir, write to dstdir with (up to) // the specified number of tags per file. -func Slice(dstdir, srcdir string, tagsPerFile int) error { - infiles, err := allGobFiles(srcdir) - if err != nil { - return err +func Slice(tagsPerFile int, dstdir string, srcdirs []string) error { + var infiles []string + for _, srcdir := range srcdirs { + files, err := allGobFiles(srcdir) + if err != nil { + return err + } + infiles = append(infiles, files...) } + // dirNamespace[dir] is an int in [0,len(dirNamespace)), used below to + // namespace variant numbers from different dirs. + dirNamespace := map[string]tileVariantID{} + for _, path := range infiles { + dir, _ := filepath.Split(path) + if _, ok := dirNamespace[dir]; !ok { + dirNamespace[dir] = tileVariantID(len(dirNamespace)) + } + } + namespaces := tileVariantID(len(dirNamespace)) var ( tagset [][]byte @@ -105,22 +127,25 @@ func Slice(dstdir, srcdir string, tagsPerFile int) error { bufws []*bufio.Writer gzws []*pgzip.Writer encs []*gob.Encoder + + countTileVariants int64 + countGenomes int64 + countReferences int64 ) throttle := throttle{Max: runtime.GOMAXPROCS(0)} - for _, path := range infiles { - path := path - throttle.Acquire() - go func() { - defer throttle.Release() - f, err := open(path) + for _, infile := range infiles { + infile := infile + throttle.Go(func() error { + f, err := open(infile) if err != nil { - throttle.Report(err) - return + return err } defer f.Close() - log.Printf("reading %s", path) - err = DecodeLibrary(f, strings.HasSuffix(path, ".gz"), func(ent *LibraryEntry) error { + dir, _ := filepath.Split(infile) + namespace := dirNamespace[dir] + log.Printf("reading %s (namespace %d)", infile, namespace) + return DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error { if err := throttle.Err(); err != nil { return err } @@ -145,8 +170,14 @@ func Slice(dstdir, srcdir string, tagsPerFile int) error { if err := throttle.Err(); err != nil { return err } + atomic.AddInt64(&countTileVariants, int64(len(ent.TileVariants))) for _, tv := range ent.TileVariants { - err := encs[int(tv.Tag)/tagsPerFile].Encode(LibraryEntry{ + tv.Variant = tv.Variant*namespaces + namespace + fileno := 0 + if !tv.Ref { + fileno = int(tv.Tag) / tagsPerFile + } + err := encs[fileno].Encode(LibraryEntry{ TileVariants: []TileVariant{tv}, }) if err != nil { @@ -158,7 +189,13 @@ func Slice(dstdir, srcdir string, tagsPerFile int) error { // genome, even if there are no // variants in the relevant range. // Easier for downstream code. + atomic.AddInt64(&countGenomes, int64(len(ent.CompactGenomes))) for _, cg := range ent.CompactGenomes { + for i, v := range cg.Variants { + if v > 0 { + cg.Variants[i] = v*namespaces + namespace + } + } for i, enc := range encs { start := i * tagsPerFile end := start + tagsPerFile @@ -185,7 +222,15 @@ func Slice(dstdir, srcdir string, tagsPerFile int) error { } // Write all ref seqs to the first // slice. Easier for downstream code. + atomic.AddInt64(&countReferences, int64(len(ent.CompactSequences))) if len(ent.CompactSequences) > 0 { + for _, cs := range ent.CompactSequences { + for _, tseq := range cs.TileSequences { + for i, libref := range tseq { + tseq[i].Variant = libref.Variant*namespaces + namespace + } + } + } err := encs[0].Encode(LibraryEntry{CompactSequences: ent.CompactSequences}) if err != nil { return err @@ -193,14 +238,14 @@ func Slice(dstdir, srcdir string, tagsPerFile int) error { } return nil }) - throttle.Report(err) - }() + }) } throttle.Wait() if throttle.Err() != nil { closeOutFiles(fs, bufws, gzws, encs) return throttle.Err() } + defer log.Printf("Total %d tile variants, %d genomes, %d reference sequences", countTileVariants, countGenomes, countReferences) return closeOutFiles(fs, bufws, gzws, encs) }