Run "filter" and "export" in arvados containers.
authorTom Clegg <tom@tomclegg.ca>
Tue, 3 Mar 2020 16:41:23 +0000 (11:41 -0500)
committerTom Clegg <tom@tomclegg.ca>
Tue, 3 Mar 2020 16:41:23 +0000 (11:41 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@tomclegg.ca>

arvados.go
exportnumpy.go
exportnumpy_test.go
filter.go
import.go

index f121542117f3e7b64e856a4a371c0f3a26246388..07e7372078363337eff28e001f20ae92babeccfe 100644 (file)
@@ -18,6 +18,8 @@ type arvadosContainerRunner struct {
        Client      *arvados.Client
        Name        string
        ProjectUUID string
+       VCPUs       int
+       RAM         int64
        Args        []string
        Mounts      map[string]string
 }
@@ -53,11 +55,10 @@ func (runner *arvadosContainerRunner) Run() error {
                        "uuid": uuid,
                }
        }
-       cpus := 16
        rc := arvados.RuntimeConstraints{
-               VCPUs:        cpus,
-               RAM:          64000000000,
-               KeepCacheRAM: (1 << 26) * 2 * int64(cpus),
+               VCPUs:        runner.VCPUs,
+               RAM:          runner.RAM,
+               KeepCacheRAM: (1 << 26) * 2 * int64(runner.VCPUs),
        }
        var cr arvados.ContainerRequest
        err = runner.Client.RequestAndDecode(&cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
@@ -113,8 +114,8 @@ func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) {
                Limit: 1,
                Count: "none",
                Filters: []arvados.Filter{
-                       {"name", "=", cname},
-                       {"owner_uuid", "=", runner.ProjectUUID},
+                       {Attr: "name", Operator: "=", Operand: cname},
+                       {Attr: "owner_uuid", Operator: "=", Operand: runner.ProjectUUID},
                },
        })
        if err != nil {
index d11b7431d5a6bc73dc4711035dfdda9bf992d6fb..e8854ace8c55f88a8cadf2f2e0b67682808b33c5 100644 (file)
@@ -2,13 +2,16 @@ package main
 
 import (
        "bufio"
+       "errors"
        "flag"
        "fmt"
        "io"
        "log"
        "net/http"
        _ "net/http/pprof"
+       "os"
 
+       "git.arvados.org/arvados.git/sdk/go/arvados"
        "github.com/kshedden/gonpy"
 )
 
@@ -26,6 +29,10 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader,
        flags := flag.NewFlagSet("", flag.ContinueOnError)
        flags.SetOutput(stderr)
        pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
+       runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
+       projectUUID := flags.String("project", "", "project `UUID` for output data")
+       inputFilename := flags.String("i", "", "input `file`")
+       outputFilename := flags.String("o", "", "output `file`")
        err = flags.Parse(args)
        if err == flag.ErrHelp {
                err = nil
@@ -41,6 +48,30 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader,
                }()
        }
 
+       if !*runlocal {
+               if *outputFilename != "" {
+                       err = errors.New("cannot specify output file in container mode: not implemented")
+                       return 1
+               }
+               runner := arvadosContainerRunner{
+                       Name:        "lightning export-numpy",
+                       Client:      arvados.NewClientFromEnv(),
+                       ProjectUUID: *projectUUID,
+                       RAM:         64000000000,
+                       VCPUs:       2,
+               }
+               err = runner.TranslatePaths(inputFilename)
+               if err != nil {
+                       return 1
+               }
+               runner.Args = []string{"export-numpy", "-local=true", "-i", *inputFilename, "-o", "/mnt/output/library.npy"}
+               err = runner.Run()
+               if err != nil {
+                       return 1
+               }
+               return 0
+       }
+
        cgs, err := ReadCompactGenomes(stdin)
        if err != nil {
                return 1
@@ -58,14 +89,29 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader,
                        out[row*cols+i] = uint16(v)
                }
        }
-       w := bufio.NewWriter(cmd.output)
-       npw, err := gonpy.NewWriter(nopCloser{w})
+
+       var output io.WriteCloser
+       if *outputFilename == "" {
+               output = nopCloser{cmd.output}
+       } else {
+               output, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
+               if err != nil {
+                       return 1
+               }
+               defer output.Close()
+       }
+       bufw := bufio.NewWriter(output)
+       npw, err := gonpy.NewWriter(nopCloser{bufw})
        if err != nil {
                return 1
        }
        npw.Shape = []int{rows, cols}
        npw.WriteUint16(out)
-       err = w.Flush()
+       err = bufw.Flush()
+       if err != nil {
+               return 1
+       }
+       err = output.Close()
        if err != nil {
                return 1
        }
index 91921a8502bd4c6fb16ed395749b8e6733d3cb1b..8051e279a91f52d2ece97a9cb4838f80b357d440 100644 (file)
@@ -17,7 +17,7 @@ func (s *exportSuite) TestFastaToNumpy(c *check.C) {
        exited := (&importer{}).RunCommand("import", []string{"-local=true", "-tag-library", "testdata/tags", "-ref", "testdata/ref", "testdata/a.1.fasta"}, &bytes.Buffer{}, &buffer, os.Stderr)
        c.Assert(exited, check.Equals, 0)
        var output bytes.Buffer
-       exited = (&exportNumpy{}).RunCommand("export-numpy", nil, &buffer, &output, os.Stderr)
+       exited = (&exportNumpy{}).RunCommand("export-numpy", []string{"-local=true"}, &buffer, &output, os.Stderr)
        c.Check(exited, check.Equals, 0)
        npy, err := gonpy.NewReader(&output)
        c.Assert(err, check.IsNil)
index a2bf12d7296cbd994ce560ec3a988d310ef2bb13..87d94ef62c9b0817af27e729aa6018ef3384ca20 100644 (file)
--- a/filter.go
+++ b/filter.go
@@ -3,12 +3,17 @@ package main
 import (
        "bufio"
        "encoding/gob"
+       "errors"
        "flag"
        "fmt"
        "io"
+       "io/ioutil"
        "log"
        "net/http"
        _ "net/http/pprof"
+       "os"
+
+       "git.arvados.org/arvados.git/sdk/go/arvados"
 )
 
 type filterer struct {
@@ -25,6 +30,10 @@ func (cmd *filterer) RunCommand(prog string, args []string, stdin io.Reader, std
        flags := flag.NewFlagSet("", flag.ContinueOnError)
        flags.SetOutput(stderr)
        pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
+       runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
+       projectUUID := flags.String("project", "", "project `UUID` for output data")
+       inputFilename := flags.String("i", "", "input `file`")
+       outputFilename := flags.String("o", "", "output `file`")
        maxvariants := flags.Int("max-variants", -1, "drop tiles with more than `N` variants")
        mincoverage := flags.Float64("min-coverage", 1, "drop tiles with coverage less than `P` across all haplotypes (0 < P ≤ 1)")
        maxtag := flags.Int("max-tag", -1, "drop tiles with tag ID > `N`")
@@ -43,8 +52,52 @@ func (cmd *filterer) RunCommand(prog string, args []string, stdin io.Reader, std
                }()
        }
 
+       if !*runlocal {
+               if *outputFilename != "" {
+                       err = errors.New("cannot specify output file in container mode: not implemented")
+                       return 1
+               }
+               runner := arvadosContainerRunner{
+                       Name:        "lightning filter",
+                       Client:      arvados.NewClientFromEnv(),
+                       ProjectUUID: *projectUUID,
+                       RAM:         64000000000,
+                       VCPUs:       2,
+               }
+               err = runner.TranslatePaths(inputFilename)
+               if err != nil {
+                       return 1
+               }
+               runner.Args = []string{"filter", "-local=true",
+                       "-i", *inputFilename,
+                       "-o", "/mnt/output/library.gob",
+                       "-max-variants", fmt.Sprintf("%d", *maxvariants),
+                       "-min-coverage", fmt.Sprintf("%f", *mincoverage),
+                       "-max-tag", fmt.Sprintf("%d", *maxtag),
+               }
+               err = runner.Run()
+               if err != nil {
+                       return 1
+               }
+               return 0
+       }
+
+       var infile io.ReadCloser
+       if *inputFilename == "" {
+               infile = ioutil.NopCloser(stdin)
+       } else {
+               infile, err = os.Open(*inputFilename)
+               if err != nil {
+                       return 1
+               }
+               defer infile.Close()
+       }
        log.Print("reading")
-       cgs, err := ReadCompactGenomes(stdin)
+       cgs, err := ReadCompactGenomes(infile)
+       if err != nil {
+               return 1
+       }
+       err = infile.Close()
        if err != nil {
                return 1
        }
@@ -105,7 +158,17 @@ func (cmd *filterer) RunCommand(prog string, args []string, stdin io.Reader, std
 
        log.Print("filtering done")
 
-       w := bufio.NewWriter(cmd.output)
+       var outfile io.WriteCloser
+       if *outputFilename == "" {
+               outfile = nopCloser{cmd.output}
+       } else {
+               outfile, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
+               if err != nil {
+                       return 1
+               }
+               defer outfile.Close()
+       }
+       w := bufio.NewWriter(outfile)
        enc := gob.NewEncoder(w)
        log.Print("writing")
        err = enc.Encode(LibraryEntry{
@@ -119,5 +182,9 @@ func (cmd *filterer) RunCommand(prog string, args []string, stdin io.Reader, std
        if err != nil {
                return 1
        }
+       err = outfile.Close()
+       if err != nil {
+               return 1
+       }
        return 0
 }
index 0f224670767698fd609b49f62426a0dbaae898df..14b3667ed7fec34df0c75a4ef5504b4dc356d838 100644 (file)
--- a/import.go
+++ b/import.go
@@ -46,7 +46,7 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std
        flags.StringVar(&cmd.tagLibraryFile, "tag-library", "", "tag library fasta `file`")
        flags.StringVar(&cmd.refFile, "ref", "", "reference fasta `file`")
        flags.StringVar(&cmd.outputFile, "o", "", "output `file`")
-       flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for storing intermediate and output data")
+       flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for output data")
        flags.BoolVar(&cmd.runLocal, "local", false, "run on local host (default: run in an arvados container)")
        pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
        err = flags.Parse(args)
@@ -74,6 +74,8 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std
                        Name:        "lightning import",
                        Client:      arvados.NewClientFromEnv(),
                        ProjectUUID: cmd.projectUUID,
+                       RAM:         30000000000,
+                       VCPUs:       16,
                }
                err = runner.TranslatePaths(&cmd.tagLibraryFile, &cmd.refFile, &cmd.outputFile)
                if err != nil {
@@ -118,33 +120,30 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std
                }
        }()
 
-       var outfile *os.File
-       var w *bufio.Writer
+       var output io.WriteCloser
        if cmd.outputFile == "" {
-               w = bufio.NewWriter(stdout)
+               output = nopCloser{stdout}
        } else {
-               outfile, err = os.OpenFile(cmd.outputFile, os.O_CREATE|os.O_WRONLY, 0777)
+               output, err = os.OpenFile(cmd.outputFile, os.O_CREATE|os.O_WRONLY, 0777)
                if err != nil {
                        return 1
                }
-               defer outfile.Close()
-               w = bufio.NewWriter(outfile)
+               defer output.Close()
        }
-       cmd.encoder = gob.NewEncoder(w)
+       bufw := bufio.NewWriter(output)
+       cmd.encoder = gob.NewEncoder(bufw)
 
        err = cmd.tileInputs(tilelib, infiles)
        if err != nil {
                return 1
        }
-       err = w.Flush()
+       err = bufw.Flush()
        if err != nil {
                return 1
        }
-       if outfile != nil {
-               err = outfile.Close()
-               if err != nil {
-                       return 1
-               }
+       err = output.Close()
+       if err != nil {
+               return 1
        }
        return 0
 }