Slice imported data by tag#.
authorTom Clegg <tom@tomclegg.ca>
Sun, 12 Sep 2021 19:24:20 +0000 (15:24 -0400)
committerTom Clegg <tom@tomclegg.ca>
Mon, 13 Sep 2021 01:01:32 +0000 (21:01 -0400)
refs #17996

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

cmd.go
gob.go
slice.go [new file with mode: 0644]
slice_test.go [new file with mode: 0644]
tilelib.go

diff --git a/cmd.go b/cmd.go
index 710080f97b271eaf88cadaa4602c9bf400e4803b..a4df2e0fcbef5ce3a5a4e28786143daed0601fda 100644 (file)
--- a/cmd.go
+++ b/cmd.go
@@ -29,6 +29,7 @@ var (
                "export":             &exporter{},
                "export-numpy":       &exportNumpy{},
                "flake":              &flakecmd{},
+               "slice":              &slicecmd{},
                "numpy-comvar":       &numpyComVar{},
                "filter":             &filtercmd{},
                "build-docker-image": &buildDockerImage{},
diff --git a/gob.go b/gob.go
index 893b8cadc6d25d4c0fb6d8cd9e8dc1a0ef95d6ce..5682db3771ac7002114f1f3a18666d499920a72a 100644 (file)
--- a/gob.go
+++ b/gob.go
@@ -18,6 +18,8 @@ import (
 type CompactGenome struct {
        Name     string
        Variants []tileVariantID
+       StartTag tagID
+       EndTag   tagID
 }
 
 type CompactSequence struct {
diff --git a/slice.go b/slice.go
new file mode 100644 (file)
index 0000000..01ffb1f
--- /dev/null
+++ b/slice.go
@@ -0,0 +1,252 @@
+// Copyright (C) The Lightning Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package lightning
+
+import (
+       "bufio"
+       "encoding/gob"
+       "flag"
+       "fmt"
+       "io"
+       "net/http"
+       _ "net/http/pprof"
+       "os"
+       "runtime"
+       "strings"
+       "sync"
+
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "github.com/klauspost/pgzip"
+       log "github.com/sirupsen/logrus"
+)
+
+type slicecmd struct{}
+
+func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+       var err error
+       defer func() {
+               if err != nil {
+                       fmt.Fprintf(stderr, "%s\n", err)
+               }
+       }()
+       flags := flag.NewFlagSet("", flag.ContinueOnError)
+       flags.SetOutput(stderr)
+       pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
+       runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
+       projectUUID := flags.String("project", "", "project `UUID` for output data")
+       priority := flags.Int("priority", 500, "container request priority")
+       inputDir := flags.String("input-dir", "./in", "input `directory`")
+       outputDir := flags.String("output-dir", "./out", "output `directory`")
+       tagsPerFile := flags.Int("tags-per-file", 50000, "tags per file (nfiles will be ~10M÷x)")
+       err = flags.Parse(args)
+       if err == flag.ErrHelp {
+               err = nil
+               return 0
+       } else if err != nil {
+               return 2
+       }
+
+       if *pprof != "" {
+               go func() {
+                       log.Println(http.ListenAndServe(*pprof, nil))
+               }()
+       }
+
+       if !*runlocal {
+               runner := arvadosContainerRunner{
+                       Name:        "lightning slice",
+                       Client:      arvados.NewClientFromEnv(),
+                       ProjectUUID: *projectUUID,
+                       RAM:         200000000000,
+                       VCPUs:       4,
+                       Priority:    *priority,
+                       KeepCache:   50,
+                       APIAccess:   true,
+               }
+               err = runner.TranslatePaths(inputDir)
+               if err != nil {
+                       return 1
+               }
+               runner.Args = []string{"slice", "-local=true",
+                       "-pprof", ":6060",
+                       "-input-dir", *inputDir,
+                       "-output-dir", "/mnt/output",
+               }
+               var output string
+               output, err = runner.Run()
+               if err != nil {
+                       return 1
+               }
+               fmt.Fprintln(stdout, output)
+               return 0
+       }
+
+       err = Slice(*outputDir, *inputDir, *tagsPerFile)
+       if err != nil {
+               return 1
+       }
+       return 0
+}
+
+// Read tags+tiles+genomes from srcdir, write to dstdir with (up to)
+// the specified number of tags per file.
+func Slice(dstdir, srcdir string, tagsPerFile int) error {
+       infiles, err := allGobFiles(srcdir)
+       if err != nil {
+               return err
+       }
+
+       var (
+               tagset     [][]byte
+               tagsetOnce sync.Once
+               fs         []*os.File
+               bufws      []*bufio.Writer
+               gzws       []*pgzip.Writer
+               encs       []*gob.Encoder
+       )
+
+       throttle := throttle{Max: runtime.NumCPU()}
+       for _, path := range infiles {
+               path := path
+               throttle.Acquire()
+               go func() {
+                       defer throttle.Release()
+                       f, err := open(path)
+                       if err != nil {
+                               throttle.Report(err)
+                               return
+                       }
+                       defer f.Close()
+                       log.Printf("reading %s", path)
+                       err = DecodeLibrary(f, strings.HasSuffix(path, ".gz"), func(ent *LibraryEntry) error {
+                               if err := throttle.Err(); err != nil {
+                                       return err
+                               }
+                               if len(ent.TagSet) > 0 {
+                                       tagsetOnce.Do(func() {
+                                               tagset = ent.TagSet
+                                               var err error
+                                               fs, bufws, gzws, encs, err = openOutFiles(dstdir, len(ent.TagSet), tagsPerFile)
+                                               if err != nil {
+                                                       throttle.Report(err)
+                                                       return
+                                               }
+                                               for _, enc := range encs {
+                                                       err = enc.Encode(LibraryEntry{TagSet: tagset})
+                                                       if err != nil {
+                                                               throttle.Report(err)
+                                                               return
+                                                       }
+                                               }
+                                       })
+                               }
+                               if err := throttle.Err(); err != nil {
+                                       return err
+                               }
+                               for _, tv := range ent.TileVariants {
+                                       err := encs[int(tv.Tag)/tagsPerFile].Encode(LibraryEntry{
+                                               TileVariants: []TileVariant{tv},
+                                       })
+                                       if err != nil {
+                                               return err
+                                       }
+                               }
+                               // Here, each output file gets a
+                               // CompactGenome entry for each
+                               // genome, even if there are no
+                               // variants in the relevant range.
+                               // Easier for downstream code.
+                               for _, cg := range ent.CompactGenomes {
+                                       for i, enc := range encs {
+                                               start := i * tagsPerFile
+                                               end := start + tagsPerFile
+                                               if max := len(cg.Variants)/2 + int(cg.StartTag); end > max {
+                                                       end = max
+                                               }
+                                               if start < int(cg.StartTag) {
+                                                       start = int(cg.StartTag)
+                                               }
+                                               var variants []tileVariantID
+                                               if start < end {
+                                                       variants = cg.Variants[(start-int(cg.StartTag))*2 : (end-int(cg.StartTag))*2]
+                                               }
+                                               err := enc.Encode(LibraryEntry{CompactGenomes: []CompactGenome{{
+                                                       Name:     cg.Name,
+                                                       Variants: variants,
+                                                       StartTag: tagID(start),
+                                                       EndTag:   tagID(start + tagsPerFile),
+                                               }}})
+                                               if err != nil {
+                                                       return err
+                                               }
+                                       }
+                               }
+                               // Write all ref seqs to the first
+                               // slice. Easier for downstream code.
+                               if len(ent.CompactSequences) > 0 {
+                                       err := encs[0].Encode(LibraryEntry{CompactSequences: ent.CompactSequences})
+                                       if err != nil {
+                                               return err
+                                       }
+                               }
+                               return nil
+                       })
+                       throttle.Report(err)
+               }()
+       }
+       throttle.Wait()
+       if throttle.Err() != nil {
+               closeOutFiles(fs, bufws, gzws, encs)
+               return throttle.Err()
+       }
+       return closeOutFiles(fs, bufws, gzws, encs)
+}
+
+func openOutFiles(dstdir string, tags, tagsPerFile int) (fs []*os.File, bufws []*bufio.Writer, gzws []*pgzip.Writer, encs []*gob.Encoder, err error) {
+       nfiles := (tags + tagsPerFile - 1) / tagsPerFile
+       fs = make([]*os.File, nfiles)
+       bufws = make([]*bufio.Writer, nfiles)
+       gzws = make([]*pgzip.Writer, nfiles)
+       encs = make([]*gob.Encoder, nfiles)
+       for i := 0; i*tagsPerFile < tags; i++ {
+               fs[i], err = os.Create(dstdir + fmt.Sprintf("/library%04d.gob.gz", i))
+               if err != nil {
+                       return
+               }
+               bufws[i] = bufio.NewWriterSize(fs[i], 1<<26)
+               gzws[i] = pgzip.NewWriter(bufws[i])
+               encs[i] = gob.NewEncoder(gzws[i])
+       }
+       return
+}
+
+func closeOutFiles(fs []*os.File, bufws []*bufio.Writer, gzws []*pgzip.Writer, encs []*gob.Encoder) error {
+       var firstErr error
+       for _, gzw := range gzws {
+               if gzw != nil {
+                       err := gzw.Close()
+                       if err != nil && firstErr == nil {
+                               firstErr = err
+                       }
+               }
+       }
+       for _, bufw := range bufws {
+               if bufw != nil {
+                       err := bufw.Flush()
+                       if err != nil && firstErr == nil {
+                               firstErr = err
+                       }
+               }
+       }
+       for _, f := range fs {
+               if f != nil {
+                       err := f.Close()
+                       if err != nil && firstErr == nil {
+                               firstErr = err
+                       }
+               }
+       }
+       return firstErr
+}
diff --git a/slice_test.go b/slice_test.go
new file mode 100644 (file)
index 0000000..0d1fe3a
--- /dev/null
@@ -0,0 +1,64 @@
+// Copyright (C) The Lightning Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package lightning
+
+import (
+       "io/ioutil"
+       "os"
+       "os/exec"
+
+       "gopkg.in/check.v1"
+)
+
+type sliceSuite struct{}
+
+var _ = check.Suite(&sliceSuite{})
+
+func (s *sliceSuite) TestImportAndSlice(c *check.C) {
+       tmpdir := c.MkDir()
+
+       err := ioutil.WriteFile(tmpdir+"/chr1-12-100.bed", []byte("chr1\t12\t100\ttest.1\n"), 0644)
+       c.Check(err, check.IsNil)
+
+       exited := (&importer{}).RunCommand("import", []string{
+               "-local=true",
+               "-tag-library", "testdata/tags",
+               "-output-tiles",
+               "-save-incomplete-tiles",
+               "-o", tmpdir + "/library1.gob",
+               "testdata/ref.fasta",
+       }, nil, os.Stderr, os.Stderr)
+       c.Assert(exited, check.Equals, 0)
+
+       exited = (&importer{}).RunCommand("import", []string{
+               "-local=true",
+               "-tag-library", "testdata/tags",
+               "-output-tiles",
+               // "-save-incomplete-tiles",
+               "-o", tmpdir + "/library2.gob",
+               "testdata/pipeline1",
+       }, nil, os.Stderr, os.Stderr)
+       c.Assert(exited, check.Equals, 0)
+
+       exited = (&merger{}).RunCommand("merge", []string{
+               "-local=true",
+               "-o", tmpdir + "/library.gob",
+               tmpdir + "/library1.gob",
+               tmpdir + "/library2.gob",
+       }, nil, os.Stderr, os.Stderr)
+       c.Assert(exited, check.Equals, 0)
+
+       input := tmpdir + "/library.gob"
+
+       exited = (&slicecmd{}).RunCommand("slice", []string{
+               "-local=true",
+               "-input-dir=" + input,
+               "-output-dir=" + tmpdir,
+               "-tags-per-file=2",
+       }, nil, os.Stderr, os.Stderr)
+       c.Check(exited, check.Equals, 0)
+       out, _ := exec.Command("find", tmpdir, "-ls").CombinedOutput()
+       c.Logf("%s", out)
+}
index 4ad607c1e518a667f2c7dbccb03f63a0b22f6ea8..e396e1c9d05ed08c630b0221522c223ae63ec8fa 100644 (file)
@@ -234,36 +234,36 @@ func (tilelib *tileLibrary) loadCompactSequences(cseqs []CompactSequence, varian
        return nil
 }
 
-func (tilelib *tileLibrary) LoadDir(ctx context.Context, path string) error {
+func allGobFiles(path string) ([]string, error) {
        var files []string
-       var walk func(string) error
-       walk = func(path string) error {
-               f, err := open(path)
-               if err != nil {
-                       return err
-               }
-               defer f.Close()
-               fis, err := f.Readdir(-1)
-               if err != nil {
-                       files = append(files, path)
-                       return nil
-               }
-               for _, fi := range fis {
-                       if fi.Name() == "." || fi.Name() == ".." {
-                               continue
-                       } else if child := path + "/" + fi.Name(); fi.IsDir() {
-                               err = walk(child)
-                               if err != nil {
-                                       return err
-                               }
-                       } else if strings.HasSuffix(child, ".gob") || strings.HasSuffix(child, ".gob.gz") {
-                               files = append(files, child)
+       f, err := open(path)
+       if err != nil {
+               return nil, err
+       }
+       defer f.Close()
+       fis, err := f.Readdir(-1)
+       if err != nil {
+               return []string{path}, nil
+       }
+       for _, fi := range fis {
+               if fi.Name() == "." || fi.Name() == ".." {
+                       continue
+               } else if child := path + "/" + fi.Name(); fi.IsDir() {
+                       add, err := allGobFiles(child)
+                       if err != nil {
+                               return nil, err
                        }
+                       files = append(files, add...)
+               } else if strings.HasSuffix(child, ".gob") || strings.HasSuffix(child, ".gob.gz") {
+                       files = append(files, child)
                }
-               return nil
        }
+       return files, nil
+}
+
+func (tilelib *tileLibrary) LoadDir(ctx context.Context, path string) error {
        log.Infof("LoadDir: walk dir %s", path)
-       err := walk(path)
+       files, err := allGobFiles(path)
        if err != nil {
                return err
        }