X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/291f97cecfaa2dcee2c332c1192b506c551552ee..219c3aa996193dacd8e38f7aa4e3d0d32b84f403:/import.go diff --git a/import.go b/import.go index 0f22467076..e6b86afd81 100644 --- a/import.go +++ b/import.go @@ -8,7 +8,6 @@ import ( "flag" "fmt" "io" - "log" "net/http" _ "net/http/pprof" "os" @@ -23,6 +22,7 @@ import ( "time" "git.arvados.org/arvados.git/sdk/go/arvados" + log "github.com/sirupsen/logrus" ) type importer struct { @@ -45,9 +45,10 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std flags.SetOutput(stderr) flags.StringVar(&cmd.tagLibraryFile, "tag-library", "", "tag library fasta `file`") flags.StringVar(&cmd.refFile, "ref", "", "reference fasta `file`") - flags.StringVar(&cmd.outputFile, "o", "", "output `file`") - flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for storing intermediate and output data") + flags.StringVar(&cmd.outputFile, "o", "-", "output `file`") + flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for output data") flags.BoolVar(&cmd.runLocal, "local", false, "run on local host (default: run in an arvados container)") + priority := flags.Int("priority", 500, "container request priority") pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`") err = flags.Parse(args) if err == flag.ErrHelp { @@ -74,6 +75,9 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std Name: "lightning import", Client: arvados.NewClientFromEnv(), ProjectUUID: cmd.projectUUID, + RAM: 30000000000, + VCPUs: 16, + Priority: *priority, } err = runner.TranslatePaths(&cmd.tagLibraryFile, &cmd.refFile, &cmd.outputFile) if err != nil { @@ -86,7 +90,7 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std return 1 } } - if cmd.outputFile == "" { + if cmd.outputFile == "-" { cmd.outputFile = "/mnt/output/library.gob" } else { // Not yet implemented, but this should write @@ -96,10 +100,12 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std return 1 } runner.Args = append([]string{"import", "-local=true", "-tag-library", cmd.tagLibraryFile, "-ref", cmd.refFile, "-o", cmd.outputFile}, inputs...) - err = runner.Run() + var output string + output, err = runner.Run() if err != nil { return 1 } + fmt.Fprintln(stdout, output+"/library.gob") return 0 } @@ -113,38 +119,35 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std return 1 } go func() { - for range time.Tick(10 * time.Second) { + for range time.Tick(10 * time.Minute) { log.Printf("tilelib.Len() == %d", tilelib.Len()) } }() - var outfile *os.File - var w *bufio.Writer - if cmd.outputFile == "" { - w = bufio.NewWriter(stdout) + var output io.WriteCloser + if cmd.outputFile == "-" { + output = nopCloser{stdout} } else { - outfile, err = os.OpenFile(cmd.outputFile, os.O_CREATE|os.O_WRONLY, 0777) + output, err = os.OpenFile(cmd.outputFile, os.O_CREATE|os.O_WRONLY, 0777) if err != nil { return 1 } - defer outfile.Close() - w = bufio.NewWriter(outfile) + defer output.Close() } - cmd.encoder = gob.NewEncoder(w) + bufw := bufio.NewWriter(output) + cmd.encoder = gob.NewEncoder(bufw) err = cmd.tileInputs(tilelib, infiles) if err != nil { return 1 } - err = w.Flush() + err = bufw.Flush() if err != nil { return 1 } - if outfile != nil { - err = outfile.Close() - if err != nil { - return 1 - } + err = output.Close() + if err != nil { + return 1 } return 0 } @@ -306,9 +309,10 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { } go close(todo) var tileJobs sync.WaitGroup - running := int64(runtime.NumCPU()) - for i := 0; i < runtime.NumCPU(); i++ { + var running int64 + for i := 0; i < runtime.NumCPU()*9/8+1; i++ { tileJobs.Add(1) + atomic.AddInt64(&running, 1) go func() { defer tileJobs.Done() defer atomic.AddInt64(&running, -1)