flags := flag.NewFlagSet("", flag.ContinueOnError)
flags.SetOutput(stderr)
pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
+ pprofdir := flags.String("pprof-dir", "", "write Go profile data to `directory` periodically")
runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
projectUUID := flags.String("project", "", "project `UUID` for output data")
priority := flags.Int("priority", 500, "container request priority")
+ preemptible := flags.Bool("preemptible", true, "request preemptible instance")
outputDir := flags.String("output-dir", "./out", "output `directory`")
tagsPerFile := flags.Int("tags-per-file", 50000, "tags per file (nfiles will be ~10M÷x)")
err = flags.Parse(args)
log.Println(http.ListenAndServe(*pprof, nil))
}()
}
+ if *pprofdir != "" {
+ go writeProfilesPeriodically(*pprofdir)
+ }
if !*runlocal {
runner := arvadosContainerRunner{
Name: "lightning slice",
Client: arvados.NewClientFromEnv(),
ProjectUUID: *projectUUID,
- RAM: 200000000000,
- VCPUs: 32,
+ RAM: 500000000000,
+ VCPUs: 64,
Priority: *priority,
- KeepCache: 50,
+ KeepCache: 2,
APIAccess: true,
+ Preemptible: *preemptible,
}
for i := range inputDirs {
err = runner.TranslatePaths(&inputDirs[i])
func Slice(tagsPerFile int, dstdir string, srcdirs []string) error {
var infiles []string
for _, srcdir := range srcdirs {
- files, err := allGobFiles(srcdir)
+ files, err := allFiles(srcdir, matchGobFile)
if err != nil {
return err
}
}
// dirNamespace[dir] is an int in [0,len(dirNamespace)), used below to
// namespace variant numbers from different dirs.
- dirNamespace := map[string]int{}
+ dirNamespace := map[string]tileVariantID{}
for _, path := range infiles {
dir, _ := filepath.Split(path)
if _, ok := dirNamespace[dir]; !ok {
- dirNamespace[dir] = len(dirNamespace)
+ dirNamespace[dir] = tileVariantID(len(dirNamespace))
}
}
- namespaces := len(dirNamespace)
+ namespaces := tileVariantID(len(dirNamespace))
var (
tagset [][]byte
)
throttle := throttle{Max: runtime.GOMAXPROCS(0)}
- for _, path := range infiles {
- path := path
- throttle.Acquire()
- go func() {
- defer throttle.Release()
- f, err := open(path)
+ for _, infile := range infiles {
+ infile := infile
+ throttle.Go(func() error {
+ f, err := open(infile)
if err != nil {
- throttle.Report(err)
- return
+ return err
}
defer f.Close()
- dir, _ := filepath.Split(path)
+ dir, _ := filepath.Split(infile)
namespace := dirNamespace[dir]
- log.Printf("reading %s (namespace %d)", path, namespace)
- err = DecodeLibrary(f, strings.HasSuffix(path, ".gz"), func(ent *LibraryEntry) error {
+ log.Printf("reading %s (namespace %d)", infile, namespace)
+ return DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error {
if err := throttle.Err(); err != nil {
return err
}
}
atomic.AddInt64(&countTileVariants, int64(len(ent.TileVariants)))
for _, tv := range ent.TileVariants {
- tv.Variant = tileVariantID(int(tv.Variant)*namespaces + namespace)
- err := encs[int(tv.Tag)/tagsPerFile].Encode(LibraryEntry{
+ tv.Variant = tv.Variant*namespaces + namespace
+ fileno := 0
+ if !tv.Ref {
+ fileno = int(tv.Tag) / tagsPerFile
+ }
+ err := encs[fileno].Encode(LibraryEntry{
TileVariants: []TileVariant{tv},
})
if err != nil {
for _, cg := range ent.CompactGenomes {
for i, v := range cg.Variants {
if v > 0 {
- cg.Variants[i] = tileVariantID(int(v)*namespaces + namespace)
+ cg.Variants[i] = v*namespaces + namespace
}
}
for i, enc := range encs {
for _, cs := range ent.CompactSequences {
for _, tseq := range cs.TileSequences {
for i, libref := range tseq {
- tseq[i].Variant = tileVariantID(int(libref.Variant)*namespaces + namespace)
+ tseq[i].Variant = libref.Variant*namespaces + namespace
}
}
}
}
return nil
})
- throttle.Report(err)
- }()
+ })
}
throttle.Wait()
if throttle.Err() != nil {