Add "flake" command (read, tidy, write).
authorTom Clegg <tom@tomclegg.ca>
Tue, 15 Jun 2021 04:47:35 +0000 (00:47 -0400)
committerTom Clegg <tom@tomclegg.ca>
Tue, 15 Jun 2021 04:47:35 +0000 (00:47 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

arvados.go
cmd.go
exportnumpy.go
flake.go [new file with mode: 0644]
tilelib.go

index 6d76aded264124ff33d6bee7c24ef39346325145..0eb99686aa748969f4000c47ede9681869971e05 100644 (file)
@@ -267,6 +267,9 @@ func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, e
                                Preemptible: true,
                                Partitions:  []string{},
                        },
+                       "environment": map[string]string{
+                               "GOMAXPROCS": fmt.Sprintf("%d", rc.VCPUs),
+                       },
                },
        })
        if err != nil {
@@ -511,7 +514,12 @@ var (
        siteFSMtx            sync.Mutex
 )
 
-func open(fnm string) (io.ReadCloser, error) {
+type file interface {
+       io.ReadCloser
+       Readdir(n int) ([]os.FileInfo, error)
+}
+
+func open(fnm string) (file, error) {
        if os.Getenv("ARVADOS_API_HOST") == "" {
                return os.Open(fnm)
        }
diff --git a/cmd.go b/cmd.go
index d2f75c996ddc613a83c359ad7ba1f07e553e7c01..fdf4d9428f32414a82d0a1f25ae99a82cceddbed 100644 (file)
--- a/cmd.go
+++ b/cmd.go
@@ -24,6 +24,7 @@ var (
                "annotate":           &annotatecmd{},
                "export":             &exporter{},
                "export-numpy":       &exportNumpy{},
+               "flake":              &flakecmd{},
                "numpy-comvar":       &numpyComVar{},
                "filter":             &filtercmd{},
                "build-docker-image": &buildDockerImage{},
index 4ddbb1029bd9f6a28882cde3c1869dfcd11d13c0..41f7356660a0385f5e26d7fa144f80961bf63c03 100644 (file)
@@ -72,7 +72,7 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader,
                        Client:      arvados.NewClientFromEnv(),
                        ProjectUUID: *projectUUID,
                        RAM:         500000000000,
-                       VCPUs:       32,
+                       VCPUs:       96,
                        Priority:    *priority,
                        KeepCache:   1,
                        APIAccess:   true,
@@ -82,7 +82,7 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader,
                        return 1
                }
                runner.Args = []string{"export-numpy", "-local=true",
-                       "-pprof", ":6000",
+                       "-pprof", ":6060",
                        fmt.Sprintf("-one-hot=%v", *onehot),
                        "-i", *inputFilename,
                        "-output-dir", "/mnt/output",
diff --git a/flake.go b/flake.go
new file mode 100644 (file)
index 0000000..d880350
--- /dev/null
+++ b/flake.go
@@ -0,0 +1,100 @@
+package lightning
+
+import (
+       "context"
+       "flag"
+       "fmt"
+       "io"
+       "net/http"
+       _ "net/http/pprof"
+
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       log "github.com/sirupsen/logrus"
+)
+
+type flakecmd struct {
+       filter filter
+}
+
+func (cmd *flakecmd) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+       var err error
+       defer func() {
+               if err != nil {
+                       fmt.Fprintf(stderr, "%s\n", err)
+               }
+       }()
+       flags := flag.NewFlagSet("", flag.ContinueOnError)
+       flags.SetOutput(stderr)
+       pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
+       runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
+       projectUUID := flags.String("project", "", "project `UUID` for output data")
+       priority := flags.Int("priority", 500, "container request priority")
+       inputDir := flags.String("input-dir", "./in", "input `directory`")
+       outputDir := flags.String("output-dir", "./out", "output `directory`")
+       cmd.filter.Flags(flags)
+       err = flags.Parse(args)
+       if err == flag.ErrHelp {
+               err = nil
+               return 0
+       } else if err != nil {
+               return 2
+       }
+
+       if *pprof != "" {
+               go func() {
+                       log.Println(http.ListenAndServe(*pprof, nil))
+               }()
+       }
+
+       if !*runlocal {
+               runner := arvadosContainerRunner{
+                       Name:        "lightning flake",
+                       Client:      arvados.NewClientFromEnv(),
+                       ProjectUUID: *projectUUID,
+                       RAM:         500000000000,
+                       VCPUs:       96,
+                       Priority:    *priority,
+                       KeepCache:   2,
+                       APIAccess:   true,
+               }
+               err = runner.TranslatePaths(inputDir)
+               if err != nil {
+                       return 1
+               }
+               runner.Args = []string{"flake", "-local=true",
+                       "-pprof", ":6060",
+                       "-input-dir", *inputDir,
+                       "-output-dir", "/mnt/output",
+                       "-max-variants", fmt.Sprintf("%d", cmd.filter.MaxVariants),
+                       "-min-coverage", fmt.Sprintf("%f", cmd.filter.MinCoverage),
+                       "-max-tag", fmt.Sprintf("%d", cmd.filter.MaxTag),
+               }
+               var output string
+               output, err = runner.Run()
+               if err != nil {
+                       return 1
+               }
+               fmt.Fprintln(stdout, output)
+               return 0
+       }
+
+       tilelib := &tileLibrary{
+               retainNoCalls:       true,
+               retainTileSequences: true,
+               compactGenomes:      map[string][]tileVariantID{},
+       }
+       err = tilelib.LoadDir(context.Background(), *inputDir, nil)
+       if err != nil {
+               return 1
+       }
+
+       log.Info("filtering")
+       cmd.filter.Apply(tilelib)
+       log.Info("tidying")
+       tilelib.Tidy()
+       err = tilelib.WriteDir(*outputDir)
+       if err != nil {
+               return 1
+       }
+       return 0
+}
index 0606fc3092897eeec57c58553926f7715b811dfb..ace24cec0ac1b06e970aeefd23caf895faba00d6 100644 (file)
@@ -7,6 +7,7 @@ import (
        "encoding/gob"
        "fmt"
        "io"
+       "os"
        "regexp"
        "runtime"
        "sort"
@@ -14,6 +15,7 @@ import (
        "sync"
        "sync/atomic"
 
+       "github.com/klauspost/pgzip"
        log "github.com/sirupsen/logrus"
        "golang.org/x/crypto/blake2b"
 )
@@ -208,6 +210,186 @@ func (tilelib *tileLibrary) loadCompactSequences(cseqs []CompactSequence, varian
        return nil
 }
 
+func (tilelib *tileLibrary) LoadDir(ctx context.Context, path string, onLoadGenome func(CompactGenome)) error {
+       var files []string
+       var walk func(string) error
+       walk = func(path string) error {
+               f, err := open(path)
+               if err != nil {
+                       return err
+               }
+               defer f.Close()
+               fis, err := f.Readdir(-1)
+               if err != nil {
+                       files = append(files, path)
+                       return nil
+               }
+               for _, fi := range fis {
+                       if fi.Name() == "." || fi.Name() == ".." {
+                               continue
+                       } else if fi.IsDir() {
+                               err = walk(path + "/" + fi.Name())
+                               if err != nil {
+                                       return err
+                               }
+                       } else if strings.HasSuffix(path, ".gob") || strings.HasSuffix(path, ".gob.gz") {
+                               files = append(files, path)
+                       }
+               }
+               return nil
+       }
+       err := walk(path)
+       if err != nil {
+               return err
+       }
+       ctx, cancel := context.WithCancel(ctx)
+       defer cancel()
+       var mtx sync.Mutex
+       cgs := []CompactGenome{}
+       cseqs := []CompactSequence{}
+       variantmap := map[tileLibRef]tileVariantID{}
+       errs := make(chan error, len(files))
+       for _, path := range files {
+               path := path
+               go func() {
+                       f, err := open(path)
+                       if err != nil {
+                               errs <- err
+                               return
+                       }
+                       defer f.Close()
+                       errs <- DecodeLibrary(f, strings.HasSuffix(path, ".gz"), func(ent *LibraryEntry) error {
+                               if ctx.Err() != nil {
+                                       return ctx.Err()
+                               }
+                               mtx.Lock()
+                               defer mtx.Unlock()
+                               if err := tilelib.loadTagSet(ent.TagSet); err != nil {
+                                       return err
+                               }
+                               if err := tilelib.loadTileVariants(ent.TileVariants, variantmap); err != nil {
+                                       return err
+                               }
+                               cgs = append(cgs, ent.CompactGenomes...)
+                               cseqs = append(cseqs, ent.CompactSequences...)
+                               return nil
+                       })
+               }()
+       }
+       for range files {
+               err := <-errs
+               if err != nil {
+                       return err
+               }
+       }
+       err = tilelib.loadCompactGenomes(cgs, variantmap, onLoadGenome)
+       if err != nil {
+               return err
+       }
+       err = tilelib.loadCompactSequences(cseqs, variantmap)
+       if err != nil {
+               return err
+       }
+       return nil
+}
+
+func (tilelib *tileLibrary) WriteDir(dir string) error {
+       nfiles := 128
+       files := make([]*os.File, nfiles)
+       for i := range files {
+               f, err := os.OpenFile(fmt.Sprintf("%s/library.%04d.gob.gz", dir, i), os.O_CREATE|os.O_WRONLY, 0666)
+               if err != nil {
+                       return err
+               }
+               defer f.Close()
+               files[i] = f
+       }
+       bufws := make([]*bufio.Writer, nfiles)
+       for i := range bufws {
+               bufws[i] = bufio.NewWriterSize(files[i], 1<<26)
+       }
+       zws := make([]*pgzip.Writer, nfiles)
+       for i := range zws {
+               zws[i] = pgzip.NewWriter(bufws[i])
+               defer zws[i].Close()
+       }
+       encoders := make([]*gob.Encoder, nfiles)
+       for i := range encoders {
+               encoders[i] = gob.NewEncoder(zws[i])
+       }
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+       errs := make(chan error, nfiles)
+       for start := range files {
+               start := start
+               go func() {
+                       ent0 := LibraryEntry{
+                               TagSet: tilelib.taglib.Tags(),
+                       }
+                       if start == 0 {
+                               // For now, just write all the genomes and refs
+                               // to the first file
+                               for name, cg := range tilelib.compactGenomes {
+                                       ent0.CompactGenomes = append(ent0.CompactGenomes, CompactGenome{
+                                               Name:     name,
+                                               Variants: cg,
+                                       })
+                               }
+                               for name, tseqs := range tilelib.refseqs {
+                                       ent0.CompactSequences = append(ent0.CompactSequences, CompactSequence{
+                                               Name:          name,
+                                               TileSequences: tseqs,
+                                       })
+                               }
+                       }
+                       err := encoders[start].Encode(ent0)
+                       if err != nil {
+                               errs <- err
+                               return
+                       }
+                       tvs := []TileVariant{}
+                       for tag := start; tag < len(tilelib.variant) && ctx.Err() == nil; tag += nfiles {
+                               tvs = tvs[:0]
+                               for idx, hash := range tilelib.variant[tag] {
+                                       tvs = append(tvs, TileVariant{
+                                               Tag:      tagID(tag),
+                                               Variant:  tileVariantID(idx + 1),
+                                               Blake2b:  hash,
+                                               Sequence: tilelib.seq[hash],
+                                       })
+                               }
+                               err := encoders[start].Encode(LibraryEntry{TileVariants: tvs})
+                               if err != nil {
+                                       errs <- err
+                                       return
+                               }
+                       }
+                       errs <- nil
+               }()
+       }
+       for range files {
+               err := <-errs
+               if err != nil {
+                       return err
+               }
+       }
+       for i := range zws {
+               err := zws[i].Close()
+               if err != nil {
+                       return err
+               }
+               err = bufws[i].Flush()
+               if err != nil {
+                       return err
+               }
+               err = files[i].Close()
+               if err != nil {
+                       return err
+               }
+       }
+       return nil
+}
+
 // Load library data from rdr. Tile variants might be renumbered in
 // the process; in that case, genomes variants will be renumbered to
 // match.