Preemptible: true,
Partitions: []string{},
},
+ "environment": map[string]string{
+ "GOMAXPROCS": fmt.Sprintf("%d", rc.VCPUs),
+ },
},
})
if err != nil {
siteFSMtx sync.Mutex
)
-func open(fnm string) (io.ReadCloser, error) {
+type file interface {
+ io.ReadCloser
+ Readdir(n int) ([]os.FileInfo, error)
+}
+
+func open(fnm string) (file, error) {
if os.Getenv("ARVADOS_API_HOST") == "" {
return os.Open(fnm)
}
"annotate": &annotatecmd{},
"export": &exporter{},
"export-numpy": &exportNumpy{},
+ "flake": &flakecmd{},
"numpy-comvar": &numpyComVar{},
"filter": &filtercmd{},
"build-docker-image": &buildDockerImage{},
Client: arvados.NewClientFromEnv(),
ProjectUUID: *projectUUID,
RAM: 500000000000,
- VCPUs: 32,
+ VCPUs: 96,
Priority: *priority,
KeepCache: 1,
APIAccess: true,
return 1
}
runner.Args = []string{"export-numpy", "-local=true",
- "-pprof", ":6000",
+ "-pprof", ":6060",
fmt.Sprintf("-one-hot=%v", *onehot),
"-i", *inputFilename,
"-output-dir", "/mnt/output",
--- /dev/null
+package lightning
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "io"
+ "net/http"
+ _ "net/http/pprof"
+
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ log "github.com/sirupsen/logrus"
+)
+
+type flakecmd struct {
+ filter filter
+}
+
+func (cmd *flakecmd) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+ var err error
+ defer func() {
+ if err != nil {
+ fmt.Fprintf(stderr, "%s\n", err)
+ }
+ }()
+ flags := flag.NewFlagSet("", flag.ContinueOnError)
+ flags.SetOutput(stderr)
+ pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
+ runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
+ projectUUID := flags.String("project", "", "project `UUID` for output data")
+ priority := flags.Int("priority", 500, "container request priority")
+ inputDir := flags.String("input-dir", "./in", "input `directory`")
+ outputDir := flags.String("output-dir", "./out", "output `directory`")
+ cmd.filter.Flags(flags)
+ err = flags.Parse(args)
+ if err == flag.ErrHelp {
+ err = nil
+ return 0
+ } else if err != nil {
+ return 2
+ }
+
+ if *pprof != "" {
+ go func() {
+ log.Println(http.ListenAndServe(*pprof, nil))
+ }()
+ }
+
+ if !*runlocal {
+ runner := arvadosContainerRunner{
+ Name: "lightning flake",
+ Client: arvados.NewClientFromEnv(),
+ ProjectUUID: *projectUUID,
+ RAM: 500000000000,
+ VCPUs: 96,
+ Priority: *priority,
+ KeepCache: 2,
+ APIAccess: true,
+ }
+ err = runner.TranslatePaths(inputDir)
+ if err != nil {
+ return 1
+ }
+ runner.Args = []string{"flake", "-local=true",
+ "-pprof", ":6060",
+ "-input-dir", *inputDir,
+ "-output-dir", "/mnt/output",
+ "-max-variants", fmt.Sprintf("%d", cmd.filter.MaxVariants),
+ "-min-coverage", fmt.Sprintf("%f", cmd.filter.MinCoverage),
+ "-max-tag", fmt.Sprintf("%d", cmd.filter.MaxTag),
+ }
+ var output string
+ output, err = runner.Run()
+ if err != nil {
+ return 1
+ }
+ fmt.Fprintln(stdout, output)
+ return 0
+ }
+
+ tilelib := &tileLibrary{
+ retainNoCalls: true,
+ retainTileSequences: true,
+ compactGenomes: map[string][]tileVariantID{},
+ }
+ err = tilelib.LoadDir(context.Background(), *inputDir, nil)
+ if err != nil {
+ return 1
+ }
+
+ log.Info("filtering")
+ cmd.filter.Apply(tilelib)
+ log.Info("tidying")
+ tilelib.Tidy()
+ err = tilelib.WriteDir(*outputDir)
+ if err != nil {
+ return 1
+ }
+ return 0
+}
"encoding/gob"
"fmt"
"io"
+ "os"
"regexp"
"runtime"
"sort"
"sync"
"sync/atomic"
+ "github.com/klauspost/pgzip"
log "github.com/sirupsen/logrus"
"golang.org/x/crypto/blake2b"
)
return nil
}
+func (tilelib *tileLibrary) LoadDir(ctx context.Context, path string, onLoadGenome func(CompactGenome)) error {
+ var files []string
+ var walk func(string) error
+ walk = func(path string) error {
+ f, err := open(path)
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+ fis, err := f.Readdir(-1)
+ if err != nil {
+ files = append(files, path)
+ return nil
+ }
+ for _, fi := range fis {
+ if fi.Name() == "." || fi.Name() == ".." {
+ continue
+ } else if fi.IsDir() {
+ err = walk(path + "/" + fi.Name())
+ if err != nil {
+ return err
+ }
+ } else if strings.HasSuffix(path, ".gob") || strings.HasSuffix(path, ".gob.gz") {
+ files = append(files, path)
+ }
+ }
+ return nil
+ }
+ err := walk(path)
+ if err != nil {
+ return err
+ }
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+ var mtx sync.Mutex
+ cgs := []CompactGenome{}
+ cseqs := []CompactSequence{}
+ variantmap := map[tileLibRef]tileVariantID{}
+ errs := make(chan error, len(files))
+ for _, path := range files {
+ path := path
+ go func() {
+ f, err := open(path)
+ if err != nil {
+ errs <- err
+ return
+ }
+ defer f.Close()
+ errs <- DecodeLibrary(f, strings.HasSuffix(path, ".gz"), func(ent *LibraryEntry) error {
+ if ctx.Err() != nil {
+ return ctx.Err()
+ }
+ mtx.Lock()
+ defer mtx.Unlock()
+ if err := tilelib.loadTagSet(ent.TagSet); err != nil {
+ return err
+ }
+ if err := tilelib.loadTileVariants(ent.TileVariants, variantmap); err != nil {
+ return err
+ }
+ cgs = append(cgs, ent.CompactGenomes...)
+ cseqs = append(cseqs, ent.CompactSequences...)
+ return nil
+ })
+ }()
+ }
+ for range files {
+ err := <-errs
+ if err != nil {
+ return err
+ }
+ }
+ err = tilelib.loadCompactGenomes(cgs, variantmap, onLoadGenome)
+ if err != nil {
+ return err
+ }
+ err = tilelib.loadCompactSequences(cseqs, variantmap)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func (tilelib *tileLibrary) WriteDir(dir string) error {
+ nfiles := 128
+ files := make([]*os.File, nfiles)
+ for i := range files {
+ f, err := os.OpenFile(fmt.Sprintf("%s/library.%04d.gob.gz", dir, i), os.O_CREATE|os.O_WRONLY, 0666)
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+ files[i] = f
+ }
+ bufws := make([]*bufio.Writer, nfiles)
+ for i := range bufws {
+ bufws[i] = bufio.NewWriterSize(files[i], 1<<26)
+ }
+ zws := make([]*pgzip.Writer, nfiles)
+ for i := range zws {
+ zws[i] = pgzip.NewWriter(bufws[i])
+ defer zws[i].Close()
+ }
+ encoders := make([]*gob.Encoder, nfiles)
+ for i := range encoders {
+ encoders[i] = gob.NewEncoder(zws[i])
+ }
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ errs := make(chan error, nfiles)
+ for start := range files {
+ start := start
+ go func() {
+ ent0 := LibraryEntry{
+ TagSet: tilelib.taglib.Tags(),
+ }
+ if start == 0 {
+ // For now, just write all the genomes and refs
+ // to the first file
+ for name, cg := range tilelib.compactGenomes {
+ ent0.CompactGenomes = append(ent0.CompactGenomes, CompactGenome{
+ Name: name,
+ Variants: cg,
+ })
+ }
+ for name, tseqs := range tilelib.refseqs {
+ ent0.CompactSequences = append(ent0.CompactSequences, CompactSequence{
+ Name: name,
+ TileSequences: tseqs,
+ })
+ }
+ }
+ err := encoders[start].Encode(ent0)
+ if err != nil {
+ errs <- err
+ return
+ }
+ tvs := []TileVariant{}
+ for tag := start; tag < len(tilelib.variant) && ctx.Err() == nil; tag += nfiles {
+ tvs = tvs[:0]
+ for idx, hash := range tilelib.variant[tag] {
+ tvs = append(tvs, TileVariant{
+ Tag: tagID(tag),
+ Variant: tileVariantID(idx + 1),
+ Blake2b: hash,
+ Sequence: tilelib.seq[hash],
+ })
+ }
+ err := encoders[start].Encode(LibraryEntry{TileVariants: tvs})
+ if err != nil {
+ errs <- err
+ return
+ }
+ }
+ errs <- nil
+ }()
+ }
+ for range files {
+ err := <-errs
+ if err != nil {
+ return err
+ }
+ }
+ for i := range zws {
+ err := zws[i].Close()
+ if err != nil {
+ return err
+ }
+ err = bufws[i].Flush()
+ if err != nil {
+ return err
+ }
+ err = files[i].Close()
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
// Load library data from rdr. Tile variants might be renumbered in
// the process; in that case, genomes variants will be renumbered to
// match.