From 40c17a78f9cdc073f494052a3421d43ba4a9d441 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Tue, 3 Mar 2020 11:41:23 -0500 Subject: [PATCH] Run "filter" and "export" in arvados containers. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- arvados.go | 13 +++++---- exportnumpy.go | 52 +++++++++++++++++++++++++++++++-- exportnumpy_test.go | 2 +- filter.go | 71 +++++++++++++++++++++++++++++++++++++++++++-- import.go | 27 +++++++++-------- 5 files changed, 139 insertions(+), 26 deletions(-) diff --git a/arvados.go b/arvados.go index f121542117..07e7372078 100644 --- a/arvados.go +++ b/arvados.go @@ -18,6 +18,8 @@ type arvadosContainerRunner struct { Client *arvados.Client Name string ProjectUUID string + VCPUs int + RAM int64 Args []string Mounts map[string]string } @@ -53,11 +55,10 @@ func (runner *arvadosContainerRunner) Run() error { "uuid": uuid, } } - cpus := 16 rc := arvados.RuntimeConstraints{ - VCPUs: cpus, - RAM: 64000000000, - KeepCacheRAM: (1 << 26) * 2 * int64(cpus), + VCPUs: runner.VCPUs, + RAM: runner.RAM, + KeepCacheRAM: (1 << 26) * 2 * int64(runner.VCPUs), } var cr arvados.ContainerRequest err = runner.Client.RequestAndDecode(&cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{ @@ -113,8 +114,8 @@ func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) { Limit: 1, Count: "none", Filters: []arvados.Filter{ - {"name", "=", cname}, - {"owner_uuid", "=", runner.ProjectUUID}, + {Attr: "name", Operator: "=", Operand: cname}, + {Attr: "owner_uuid", Operator: "=", Operand: runner.ProjectUUID}, }, }) if err != nil { diff --git a/exportnumpy.go b/exportnumpy.go index d11b7431d5..e8854ace8c 100644 --- a/exportnumpy.go +++ b/exportnumpy.go @@ -2,13 +2,16 @@ package main import ( "bufio" + "errors" "flag" "fmt" "io" "log" "net/http" _ "net/http/pprof" + "os" + "git.arvados.org/arvados.git/sdk/go/arvados" "github.com/kshedden/gonpy" ) @@ -26,6 +29,10 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, flags := flag.NewFlagSet("", flag.ContinueOnError) flags.SetOutput(stderr) pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`") + runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)") + projectUUID := flags.String("project", "", "project `UUID` for output data") + inputFilename := flags.String("i", "", "input `file`") + outputFilename := flags.String("o", "", "output `file`") err = flags.Parse(args) if err == flag.ErrHelp { err = nil @@ -41,6 +48,30 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, }() } + if !*runlocal { + if *outputFilename != "" { + err = errors.New("cannot specify output file in container mode: not implemented") + return 1 + } + runner := arvadosContainerRunner{ + Name: "lightning export-numpy", + Client: arvados.NewClientFromEnv(), + ProjectUUID: *projectUUID, + RAM: 64000000000, + VCPUs: 2, + } + err = runner.TranslatePaths(inputFilename) + if err != nil { + return 1 + } + runner.Args = []string{"export-numpy", "-local=true", "-i", *inputFilename, "-o", "/mnt/output/library.npy"} + err = runner.Run() + if err != nil { + return 1 + } + return 0 + } + cgs, err := ReadCompactGenomes(stdin) if err != nil { return 1 @@ -58,14 +89,29 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, out[row*cols+i] = uint16(v) } } - w := bufio.NewWriter(cmd.output) - npw, err := gonpy.NewWriter(nopCloser{w}) + + var output io.WriteCloser + if *outputFilename == "" { + output = nopCloser{cmd.output} + } else { + output, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777) + if err != nil { + return 1 + } + defer output.Close() + } + bufw := bufio.NewWriter(output) + npw, err := gonpy.NewWriter(nopCloser{bufw}) if err != nil { return 1 } npw.Shape = []int{rows, cols} npw.WriteUint16(out) - err = w.Flush() + err = bufw.Flush() + if err != nil { + return 1 + } + err = output.Close() if err != nil { return 1 } diff --git a/exportnumpy_test.go b/exportnumpy_test.go index 91921a8502..8051e279a9 100644 --- a/exportnumpy_test.go +++ b/exportnumpy_test.go @@ -17,7 +17,7 @@ func (s *exportSuite) TestFastaToNumpy(c *check.C) { exited := (&importer{}).RunCommand("import", []string{"-local=true", "-tag-library", "testdata/tags", "-ref", "testdata/ref", "testdata/a.1.fasta"}, &bytes.Buffer{}, &buffer, os.Stderr) c.Assert(exited, check.Equals, 0) var output bytes.Buffer - exited = (&exportNumpy{}).RunCommand("export-numpy", nil, &buffer, &output, os.Stderr) + exited = (&exportNumpy{}).RunCommand("export-numpy", []string{"-local=true"}, &buffer, &output, os.Stderr) c.Check(exited, check.Equals, 0) npy, err := gonpy.NewReader(&output) c.Assert(err, check.IsNil) diff --git a/filter.go b/filter.go index a2bf12d729..87d94ef62c 100644 --- a/filter.go +++ b/filter.go @@ -3,12 +3,17 @@ package main import ( "bufio" "encoding/gob" + "errors" "flag" "fmt" "io" + "io/ioutil" "log" "net/http" _ "net/http/pprof" + "os" + + "git.arvados.org/arvados.git/sdk/go/arvados" ) type filterer struct { @@ -25,6 +30,10 @@ func (cmd *filterer) RunCommand(prog string, args []string, stdin io.Reader, std flags := flag.NewFlagSet("", flag.ContinueOnError) flags.SetOutput(stderr) pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`") + runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)") + projectUUID := flags.String("project", "", "project `UUID` for output data") + inputFilename := flags.String("i", "", "input `file`") + outputFilename := flags.String("o", "", "output `file`") maxvariants := flags.Int("max-variants", -1, "drop tiles with more than `N` variants") mincoverage := flags.Float64("min-coverage", 1, "drop tiles with coverage less than `P` across all haplotypes (0 < P ≤ 1)") maxtag := flags.Int("max-tag", -1, "drop tiles with tag ID > `N`") @@ -43,8 +52,52 @@ func (cmd *filterer) RunCommand(prog string, args []string, stdin io.Reader, std }() } + if !*runlocal { + if *outputFilename != "" { + err = errors.New("cannot specify output file in container mode: not implemented") + return 1 + } + runner := arvadosContainerRunner{ + Name: "lightning filter", + Client: arvados.NewClientFromEnv(), + ProjectUUID: *projectUUID, + RAM: 64000000000, + VCPUs: 2, + } + err = runner.TranslatePaths(inputFilename) + if err != nil { + return 1 + } + runner.Args = []string{"filter", "-local=true", + "-i", *inputFilename, + "-o", "/mnt/output/library.gob", + "-max-variants", fmt.Sprintf("%d", *maxvariants), + "-min-coverage", fmt.Sprintf("%f", *mincoverage), + "-max-tag", fmt.Sprintf("%d", *maxtag), + } + err = runner.Run() + if err != nil { + return 1 + } + return 0 + } + + var infile io.ReadCloser + if *inputFilename == "" { + infile = ioutil.NopCloser(stdin) + } else { + infile, err = os.Open(*inputFilename) + if err != nil { + return 1 + } + defer infile.Close() + } log.Print("reading") - cgs, err := ReadCompactGenomes(stdin) + cgs, err := ReadCompactGenomes(infile) + if err != nil { + return 1 + } + err = infile.Close() if err != nil { return 1 } @@ -105,7 +158,17 @@ func (cmd *filterer) RunCommand(prog string, args []string, stdin io.Reader, std log.Print("filtering done") - w := bufio.NewWriter(cmd.output) + var outfile io.WriteCloser + if *outputFilename == "" { + outfile = nopCloser{cmd.output} + } else { + outfile, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777) + if err != nil { + return 1 + } + defer outfile.Close() + } + w := bufio.NewWriter(outfile) enc := gob.NewEncoder(w) log.Print("writing") err = enc.Encode(LibraryEntry{ @@ -119,5 +182,9 @@ func (cmd *filterer) RunCommand(prog string, args []string, stdin io.Reader, std if err != nil { return 1 } + err = outfile.Close() + if err != nil { + return 1 + } return 0 } diff --git a/import.go b/import.go index 0f22467076..14b3667ed7 100644 --- a/import.go +++ b/import.go @@ -46,7 +46,7 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std flags.StringVar(&cmd.tagLibraryFile, "tag-library", "", "tag library fasta `file`") flags.StringVar(&cmd.refFile, "ref", "", "reference fasta `file`") flags.StringVar(&cmd.outputFile, "o", "", "output `file`") - flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for storing intermediate and output data") + flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for output data") flags.BoolVar(&cmd.runLocal, "local", false, "run on local host (default: run in an arvados container)") pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`") err = flags.Parse(args) @@ -74,6 +74,8 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std Name: "lightning import", Client: arvados.NewClientFromEnv(), ProjectUUID: cmd.projectUUID, + RAM: 30000000000, + VCPUs: 16, } err = runner.TranslatePaths(&cmd.tagLibraryFile, &cmd.refFile, &cmd.outputFile) if err != nil { @@ -118,33 +120,30 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std } }() - var outfile *os.File - var w *bufio.Writer + var output io.WriteCloser if cmd.outputFile == "" { - w = bufio.NewWriter(stdout) + output = nopCloser{stdout} } else { - outfile, err = os.OpenFile(cmd.outputFile, os.O_CREATE|os.O_WRONLY, 0777) + output, err = os.OpenFile(cmd.outputFile, os.O_CREATE|os.O_WRONLY, 0777) if err != nil { return 1 } - defer outfile.Close() - w = bufio.NewWriter(outfile) + defer output.Close() } - cmd.encoder = gob.NewEncoder(w) + bufw := bufio.NewWriter(output) + cmd.encoder = gob.NewEncoder(bufw) err = cmd.tileInputs(tilelib, infiles) if err != nil { return 1 } - err = w.Flush() + err = bufw.Flush() if err != nil { return 1 } - if outfile != nil { - err = outfile.Close() - if err != nil { - return 1 - } + err = output.Close() + if err != nil { + return 1 } return 0 } -- 2.30.2