From: Tom Clegg Date: Sun, 12 Sep 2021 19:24:20 +0000 (-0400) Subject: Slice imported data by tag#. X-Git-Url: https://git.arvados.org/lightning.git/commitdiff_plain/a26128ae9f71b8881a0cc015cceb547cf5d84097 Slice imported data by tag#. refs #17996 Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- diff --git a/cmd.go b/cmd.go index 710080f97b..a4df2e0fcb 100644 --- a/cmd.go +++ b/cmd.go @@ -29,6 +29,7 @@ var ( "export": &exporter{}, "export-numpy": &exportNumpy{}, "flake": &flakecmd{}, + "slice": &slicecmd{}, "numpy-comvar": &numpyComVar{}, "filter": &filtercmd{}, "build-docker-image": &buildDockerImage{}, diff --git a/gob.go b/gob.go index 893b8cadc6..5682db3771 100644 --- a/gob.go +++ b/gob.go @@ -18,6 +18,8 @@ import ( type CompactGenome struct { Name string Variants []tileVariantID + StartTag tagID + EndTag tagID } type CompactSequence struct { diff --git a/slice.go b/slice.go new file mode 100644 index 0000000000..01ffb1fc18 --- /dev/null +++ b/slice.go @@ -0,0 +1,252 @@ +// Copyright (C) The Lightning Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package lightning + +import ( + "bufio" + "encoding/gob" + "flag" + "fmt" + "io" + "net/http" + _ "net/http/pprof" + "os" + "runtime" + "strings" + "sync" + + "git.arvados.org/arvados.git/sdk/go/arvados" + "github.com/klauspost/pgzip" + log "github.com/sirupsen/logrus" +) + +type slicecmd struct{} + +func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { + var err error + defer func() { + if err != nil { + fmt.Fprintf(stderr, "%s\n", err) + } + }() + flags := flag.NewFlagSet("", flag.ContinueOnError) + flags.SetOutput(stderr) + pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`") + runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)") + projectUUID := flags.String("project", "", "project `UUID` for output data") + priority := flags.Int("priority", 500, "container request priority") + inputDir := flags.String("input-dir", "./in", "input `directory`") + outputDir := flags.String("output-dir", "./out", "output `directory`") + tagsPerFile := flags.Int("tags-per-file", 50000, "tags per file (nfiles will be ~10M÷x)") + err = flags.Parse(args) + if err == flag.ErrHelp { + err = nil + return 0 + } else if err != nil { + return 2 + } + + if *pprof != "" { + go func() { + log.Println(http.ListenAndServe(*pprof, nil)) + }() + } + + if !*runlocal { + runner := arvadosContainerRunner{ + Name: "lightning slice", + Client: arvados.NewClientFromEnv(), + ProjectUUID: *projectUUID, + RAM: 200000000000, + VCPUs: 4, + Priority: *priority, + KeepCache: 50, + APIAccess: true, + } + err = runner.TranslatePaths(inputDir) + if err != nil { + return 1 + } + runner.Args = []string{"slice", "-local=true", + "-pprof", ":6060", + "-input-dir", *inputDir, + "-output-dir", "/mnt/output", + } + var output string + output, err = runner.Run() + if err != nil { + return 1 + } + fmt.Fprintln(stdout, output) + return 0 + } + + err = Slice(*outputDir, *inputDir, *tagsPerFile) + if err != nil { + return 1 + } + return 0 +} + +// Read tags+tiles+genomes from srcdir, write to dstdir with (up to) +// the specified number of tags per file. +func Slice(dstdir, srcdir string, tagsPerFile int) error { + infiles, err := allGobFiles(srcdir) + if err != nil { + return err + } + + var ( + tagset [][]byte + tagsetOnce sync.Once + fs []*os.File + bufws []*bufio.Writer + gzws []*pgzip.Writer + encs []*gob.Encoder + ) + + throttle := throttle{Max: runtime.NumCPU()} + for _, path := range infiles { + path := path + throttle.Acquire() + go func() { + defer throttle.Release() + f, err := open(path) + if err != nil { + throttle.Report(err) + return + } + defer f.Close() + log.Printf("reading %s", path) + err = DecodeLibrary(f, strings.HasSuffix(path, ".gz"), func(ent *LibraryEntry) error { + if err := throttle.Err(); err != nil { + return err + } + if len(ent.TagSet) > 0 { + tagsetOnce.Do(func() { + tagset = ent.TagSet + var err error + fs, bufws, gzws, encs, err = openOutFiles(dstdir, len(ent.TagSet), tagsPerFile) + if err != nil { + throttle.Report(err) + return + } + for _, enc := range encs { + err = enc.Encode(LibraryEntry{TagSet: tagset}) + if err != nil { + throttle.Report(err) + return + } + } + }) + } + if err := throttle.Err(); err != nil { + return err + } + for _, tv := range ent.TileVariants { + err := encs[int(tv.Tag)/tagsPerFile].Encode(LibraryEntry{ + TileVariants: []TileVariant{tv}, + }) + if err != nil { + return err + } + } + // Here, each output file gets a + // CompactGenome entry for each + // genome, even if there are no + // variants in the relevant range. + // Easier for downstream code. + for _, cg := range ent.CompactGenomes { + for i, enc := range encs { + start := i * tagsPerFile + end := start + tagsPerFile + if max := len(cg.Variants)/2 + int(cg.StartTag); end > max { + end = max + } + if start < int(cg.StartTag) { + start = int(cg.StartTag) + } + var variants []tileVariantID + if start < end { + variants = cg.Variants[(start-int(cg.StartTag))*2 : (end-int(cg.StartTag))*2] + } + err := enc.Encode(LibraryEntry{CompactGenomes: []CompactGenome{{ + Name: cg.Name, + Variants: variants, + StartTag: tagID(start), + EndTag: tagID(start + tagsPerFile), + }}}) + if err != nil { + return err + } + } + } + // Write all ref seqs to the first + // slice. Easier for downstream code. + if len(ent.CompactSequences) > 0 { + err := encs[0].Encode(LibraryEntry{CompactSequences: ent.CompactSequences}) + if err != nil { + return err + } + } + return nil + }) + throttle.Report(err) + }() + } + throttle.Wait() + if throttle.Err() != nil { + closeOutFiles(fs, bufws, gzws, encs) + return throttle.Err() + } + return closeOutFiles(fs, bufws, gzws, encs) +} + +func openOutFiles(dstdir string, tags, tagsPerFile int) (fs []*os.File, bufws []*bufio.Writer, gzws []*pgzip.Writer, encs []*gob.Encoder, err error) { + nfiles := (tags + tagsPerFile - 1) / tagsPerFile + fs = make([]*os.File, nfiles) + bufws = make([]*bufio.Writer, nfiles) + gzws = make([]*pgzip.Writer, nfiles) + encs = make([]*gob.Encoder, nfiles) + for i := 0; i*tagsPerFile < tags; i++ { + fs[i], err = os.Create(dstdir + fmt.Sprintf("/library%04d.gob.gz", i)) + if err != nil { + return + } + bufws[i] = bufio.NewWriterSize(fs[i], 1<<26) + gzws[i] = pgzip.NewWriter(bufws[i]) + encs[i] = gob.NewEncoder(gzws[i]) + } + return +} + +func closeOutFiles(fs []*os.File, bufws []*bufio.Writer, gzws []*pgzip.Writer, encs []*gob.Encoder) error { + var firstErr error + for _, gzw := range gzws { + if gzw != nil { + err := gzw.Close() + if err != nil && firstErr == nil { + firstErr = err + } + } + } + for _, bufw := range bufws { + if bufw != nil { + err := bufw.Flush() + if err != nil && firstErr == nil { + firstErr = err + } + } + } + for _, f := range fs { + if f != nil { + err := f.Close() + if err != nil && firstErr == nil { + firstErr = err + } + } + } + return firstErr +} diff --git a/slice_test.go b/slice_test.go new file mode 100644 index 0000000000..0d1fe3a4c6 --- /dev/null +++ b/slice_test.go @@ -0,0 +1,64 @@ +// Copyright (C) The Lightning Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package lightning + +import ( + "io/ioutil" + "os" + "os/exec" + + "gopkg.in/check.v1" +) + +type sliceSuite struct{} + +var _ = check.Suite(&sliceSuite{}) + +func (s *sliceSuite) TestImportAndSlice(c *check.C) { + tmpdir := c.MkDir() + + err := ioutil.WriteFile(tmpdir+"/chr1-12-100.bed", []byte("chr1\t12\t100\ttest.1\n"), 0644) + c.Check(err, check.IsNil) + + exited := (&importer{}).RunCommand("import", []string{ + "-local=true", + "-tag-library", "testdata/tags", + "-output-tiles", + "-save-incomplete-tiles", + "-o", tmpdir + "/library1.gob", + "testdata/ref.fasta", + }, nil, os.Stderr, os.Stderr) + c.Assert(exited, check.Equals, 0) + + exited = (&importer{}).RunCommand("import", []string{ + "-local=true", + "-tag-library", "testdata/tags", + "-output-tiles", + // "-save-incomplete-tiles", + "-o", tmpdir + "/library2.gob", + "testdata/pipeline1", + }, nil, os.Stderr, os.Stderr) + c.Assert(exited, check.Equals, 0) + + exited = (&merger{}).RunCommand("merge", []string{ + "-local=true", + "-o", tmpdir + "/library.gob", + tmpdir + "/library1.gob", + tmpdir + "/library2.gob", + }, nil, os.Stderr, os.Stderr) + c.Assert(exited, check.Equals, 0) + + input := tmpdir + "/library.gob" + + exited = (&slicecmd{}).RunCommand("slice", []string{ + "-local=true", + "-input-dir=" + input, + "-output-dir=" + tmpdir, + "-tags-per-file=2", + }, nil, os.Stderr, os.Stderr) + c.Check(exited, check.Equals, 0) + out, _ := exec.Command("find", tmpdir, "-ls").CombinedOutput() + c.Logf("%s", out) +} diff --git a/tilelib.go b/tilelib.go index 4ad607c1e5..e396e1c9d0 100644 --- a/tilelib.go +++ b/tilelib.go @@ -234,36 +234,36 @@ func (tilelib *tileLibrary) loadCompactSequences(cseqs []CompactSequence, varian return nil } -func (tilelib *tileLibrary) LoadDir(ctx context.Context, path string) error { +func allGobFiles(path string) ([]string, error) { var files []string - var walk func(string) error - walk = func(path string) error { - f, err := open(path) - if err != nil { - return err - } - defer f.Close() - fis, err := f.Readdir(-1) - if err != nil { - files = append(files, path) - return nil - } - for _, fi := range fis { - if fi.Name() == "." || fi.Name() == ".." { - continue - } else if child := path + "/" + fi.Name(); fi.IsDir() { - err = walk(child) - if err != nil { - return err - } - } else if strings.HasSuffix(child, ".gob") || strings.HasSuffix(child, ".gob.gz") { - files = append(files, child) + f, err := open(path) + if err != nil { + return nil, err + } + defer f.Close() + fis, err := f.Readdir(-1) + if err != nil { + return []string{path}, nil + } + for _, fi := range fis { + if fi.Name() == "." || fi.Name() == ".." { + continue + } else if child := path + "/" + fi.Name(); fi.IsDir() { + add, err := allGobFiles(child) + if err != nil { + return nil, err } + files = append(files, add...) + } else if strings.HasSuffix(child, ".gob") || strings.HasSuffix(child, ".gob.gz") { + files = append(files, child) } - return nil } + return files, nil +} + +func (tilelib *tileLibrary) LoadDir(ctx context.Context, path string) error { log.Infof("LoadDir: walk dir %s", path) - err := walk(path) + files, err := allGobFiles(path) if err != nil { return err }