X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/8d4af49bcb5f054d23ea0739bd018e407e4bd441..ba5dea916505730ed44af0655c820bd3d767be1e:/import.go diff --git a/import.go b/import.go index e6a4b3a5cb..a396636fcb 100644 --- a/import.go +++ b/import.go @@ -8,7 +8,6 @@ import ( "flag" "fmt" "io" - "log" "net/http" _ "net/http/pprof" "os" @@ -23,6 +22,7 @@ import ( "time" "git.arvados.org/arvados.git/sdk/go/arvados" + log "github.com/sirupsen/logrus" ) type importer struct { @@ -31,6 +31,7 @@ type importer struct { outputFile string projectUUID string runLocal bool + skipOOO bool encoder *gob.Encoder } @@ -48,7 +49,10 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std flags.StringVar(&cmd.outputFile, "o", "-", "output `file`") flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for output data") flags.BoolVar(&cmd.runLocal, "local", false, "run on local host (default: run in an arvados container)") + flags.BoolVar(&cmd.skipOOO, "skip-ooo", false, "skip out-of-order tags") + priority := flags.Int("priority", 500, "container request priority") pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`") + loglevel := flags.String("loglevel", "info", "logging threshold (trace, debug, info, warn, error, fatal, or panic)") err = flags.Parse(args) if err == flag.ErrHelp { err = nil @@ -69,13 +73,20 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std }() } + lvl, err := log.ParseLevel(*loglevel) + if err != nil { + return 2 + } + log.SetLevel(lvl) + if !cmd.runLocal { runner := arvadosContainerRunner{ Name: "lightning import", Client: arvados.NewClientFromEnv(), ProjectUUID: cmd.projectUUID, - RAM: 30000000000, + RAM: 60000000000, VCPUs: 16, + Priority: *priority, } err = runner.TranslatePaths(&cmd.tagLibraryFile, &cmd.refFile, &cmd.outputFile) if err != nil { @@ -97,11 +108,13 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std err = errors.New("cannot specify output file in container mode: not implemented") return 1 } - runner.Args = append([]string{"import", "-local=true", "-tag-library", cmd.tagLibraryFile, "-ref", cmd.refFile, "-o", cmd.outputFile}, inputs...) - err = runner.Run() + runner.Args = append([]string{"import", "-local=true", "-loglevel=" + *loglevel, fmt.Sprintf("-skip-ooo=%v", cmd.skipOOO), "-tag-library", cmd.tagLibraryFile, "-ref", cmd.refFile, "-o", cmd.outputFile}, inputs...) + var output string + output, err = runner.Run() if err != nil { return 1 } + fmt.Fprintln(stdout, output+"/library.gob") return 0 } @@ -115,7 +128,7 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std return 1 } go func() { - for range time.Tick(10 * time.Second) { + for range time.Tick(10 * time.Minute) { log.Printf("tilelib.Len() == %d", tilelib.Len()) } }() @@ -189,7 +202,7 @@ func (cmd *importer) loadTileLibrary() (*tileLibrary, error) { return nil, fmt.Errorf("cannot tile: tag library is empty") } log.Printf("tag library %s load done", cmd.tagLibraryFile) - return &tileLibrary{taglib: &taglib}, nil + return &tileLibrary{taglib: &taglib, skipOOO: cmd.skipOOO}, nil } func listInputFiles(paths []string) (files []string, err error) { @@ -251,7 +264,9 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { log.Printf("%s starting", infile) defer log.Printf("%s done", infile) tseqs, err := cmd.tileFasta(tilelib, infile) - variants[0] = tseqs.Variants() + var kept, dropped int + variants[0], kept, dropped = tseqs.Variants() + log.Printf("%s found %d unique tags plus %d repeats", infile, kept, dropped) return err } infile2 := regexp.MustCompile(`\.1\.fasta(\.gz)?$`).ReplaceAllString(infile, `.2.fasta$1`) @@ -260,7 +275,9 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { log.Printf("%s starting", infile2) defer log.Printf("%s done", infile2) tseqs, err := cmd.tileFasta(tilelib, infile2) - variants[1] = tseqs.Variants() + var kept, dropped int + variants[1], kept, dropped = tseqs.Variants() + log.Printf("%s found %d unique tags plus %d repeats", infile, kept, dropped) return err } } else { @@ -271,7 +288,9 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { log.Printf("%s phase %d starting", infile, phase+1) defer log.Printf("%s phase %d done", infile, phase+1) tseqs, err := cmd.tileGVCF(tilelib, infile, phase) - variants[phase] = tseqs.Variants() + var kept, dropped int + variants[phase], kept, dropped = tseqs.Variants() + log.Printf("%s phase %d found %d unique tags plus %d repeats", infile, phase+1, kept, dropped) return err } } @@ -289,8 +308,11 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { } flat := make([]tileVariantID, ntags*2) for i := 0; i < ntags; i++ { - flat[i*2] = variants[0][i] - flat[i*2+1] = variants[1][i] + for hap := 0; hap < 2; hap++ { + if i < len(variants[hap]) { + flat[i*2+hap] = variants[hap][i] + } + } } err := cmd.encoder.Encode(LibraryEntry{ CompactGenomes: []CompactGenome{{Name: infile, Variants: flat}}, @@ -305,9 +327,10 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { } go close(todo) var tileJobs sync.WaitGroup - running := int64(runtime.NumCPU()) - for i := 0; i < runtime.NumCPU(); i++ { + var running int64 + for i := 0; i < runtime.NumCPU()*9/8+1; i++ { tileJobs.Add(1) + atomic.AddInt64(&running, 1) go func() { defer tileJobs.Done() defer atomic.AddInt64(&running, -1)