import (
"bufio"
"encoding/gob"
+ "errors"
"flag"
"fmt"
"io"
"net/http"
_ "net/http/pprof"
"os"
+ "path/filepath"
"runtime"
"strings"
"sync"
+ "sync/atomic"
"git.arvados.org/arvados.git/sdk/go/arvados"
"github.com/klauspost/pgzip"
runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
projectUUID := flags.String("project", "", "project `UUID` for output data")
priority := flags.Int("priority", 500, "container request priority")
- inputDir := flags.String("input-dir", "./in", "input `directory`")
outputDir := flags.String("output-dir", "./out", "output `directory`")
tagsPerFile := flags.Int("tags-per-file", 50000, "tags per file (nfiles will be ~10M÷x)")
err = flags.Parse(args)
} else if err != nil {
return 2
}
+ inputDirs := flags.Args()
+ if len(inputDirs) == 0 {
+ err = errors.New("no input dirs specified")
+ return 2
+ }
if *pprof != "" {
go func() {
Name: "lightning slice",
Client: arvados.NewClientFromEnv(),
ProjectUUID: *projectUUID,
- RAM: 200000000000,
- VCPUs: 32,
+ RAM: 500000000000,
+ VCPUs: 64,
Priority: *priority,
- KeepCache: 50,
+ KeepCache: 2,
APIAccess: true,
}
- err = runner.TranslatePaths(inputDir)
- if err != nil {
- return 1
+ for i := range inputDirs {
+ err = runner.TranslatePaths(&inputDirs[i])
+ if err != nil {
+ return 1
+ }
}
- runner.Args = []string{"slice", "-local=true",
+ runner.Args = append([]string{"slice", "-local=true",
"-pprof", ":6060",
- "-input-dir", *inputDir,
"-output-dir", "/mnt/output",
- }
+ }, inputDirs...)
var output string
output, err = runner.Run()
if err != nil {
return 0
}
- err = Slice(*outputDir, *inputDir, *tagsPerFile)
+ err = Slice(*tagsPerFile, *outputDir, inputDirs)
if err != nil {
return 1
}
// Read tags+tiles+genomes from srcdir, write to dstdir with (up to)
// the specified number of tags per file.
-func Slice(dstdir, srcdir string, tagsPerFile int) error {
- infiles, err := allGobFiles(srcdir)
- if err != nil {
- return err
+func Slice(tagsPerFile int, dstdir string, srcdirs []string) error {
+ var infiles []string
+ for _, srcdir := range srcdirs {
+ files, err := allGobFiles(srcdir)
+ if err != nil {
+ return err
+ }
+ infiles = append(infiles, files...)
}
+ // dirNamespace[dir] is an int in [0,len(dirNamespace)), used below to
+ // namespace variant numbers from different dirs.
+ dirNamespace := map[string]tileVariantID{}
+ for _, path := range infiles {
+ dir, _ := filepath.Split(path)
+ if _, ok := dirNamespace[dir]; !ok {
+ dirNamespace[dir] = tileVariantID(len(dirNamespace))
+ }
+ }
+ namespaces := tileVariantID(len(dirNamespace))
var (
tagset [][]byte
bufws []*bufio.Writer
gzws []*pgzip.Writer
encs []*gob.Encoder
+
+ countTileVariants int64
+ countGenomes int64
+ countReferences int64
)
throttle := throttle{Max: runtime.GOMAXPROCS(0)}
- for _, path := range infiles {
- path := path
- throttle.Acquire()
- go func() {
- defer throttle.Release()
- f, err := open(path)
+ for _, infile := range infiles {
+ infile := infile
+ throttle.Go(func() error {
+ f, err := open(infile)
if err != nil {
- throttle.Report(err)
- return
+ return err
}
defer f.Close()
- log.Printf("reading %s", path)
- err = DecodeLibrary(f, strings.HasSuffix(path, ".gz"), func(ent *LibraryEntry) error {
+ dir, _ := filepath.Split(infile)
+ namespace := dirNamespace[dir]
+ log.Printf("reading %s (namespace %d)", infile, namespace)
+ return DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error {
if err := throttle.Err(); err != nil {
return err
}
if err := throttle.Err(); err != nil {
return err
}
+ atomic.AddInt64(&countTileVariants, int64(len(ent.TileVariants)))
for _, tv := range ent.TileVariants {
- err := encs[int(tv.Tag)/tagsPerFile].Encode(LibraryEntry{
+ tv.Variant = tv.Variant*namespaces + namespace
+ fileno := 0
+ if !tv.Ref {
+ fileno = int(tv.Tag) / tagsPerFile
+ }
+ err := encs[fileno].Encode(LibraryEntry{
TileVariants: []TileVariant{tv},
})
if err != nil {
// genome, even if there are no
// variants in the relevant range.
// Easier for downstream code.
+ atomic.AddInt64(&countGenomes, int64(len(ent.CompactGenomes)))
for _, cg := range ent.CompactGenomes {
+ for i, v := range cg.Variants {
+ if v > 0 {
+ cg.Variants[i] = v*namespaces + namespace
+ }
+ }
for i, enc := range encs {
start := i * tagsPerFile
end := start + tagsPerFile
}
// Write all ref seqs to the first
// slice. Easier for downstream code.
+ atomic.AddInt64(&countReferences, int64(len(ent.CompactSequences)))
if len(ent.CompactSequences) > 0 {
+ for _, cs := range ent.CompactSequences {
+ for _, tseq := range cs.TileSequences {
+ for i, libref := range tseq {
+ tseq[i].Variant = libref.Variant*namespaces + namespace
+ }
+ }
+ }
err := encs[0].Encode(LibraryEntry{CompactSequences: ent.CompactSequences})
if err != nil {
return err
}
return nil
})
- throttle.Report(err)
- }()
+ })
}
throttle.Wait()
if throttle.Err() != nil {
closeOutFiles(fs, bufws, gzws, encs)
return throttle.Err()
}
+ defer log.Printf("Total %d tile variants, %d genomes, %d reference sequences", countTileVariants, countGenomes, countReferences)
return closeOutFiles(fs, bufws, gzws, encs)
}