Concurrent-batches mode for vcf2fasta and import.
authorTom Clegg <tom@tomclegg.ca>
Tue, 24 Nov 2020 20:10:29 +0000 (15:10 -0500)
committerTom Clegg <tom@tomclegg.ca>
Tue, 24 Nov 2020 20:10:29 +0000 (15:10 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@tomclegg.ca>

arvados.go
import.go
vcf2fasta.go

index e1404d4646c90bf8682c90566d29a41762d78800..be11a2333d917552eea53af3956e0d2c325a9524 100644 (file)
@@ -1,6 +1,7 @@
 package main
 
 import (
+       "context"
        "encoding/json"
        "errors"
        "fmt"
@@ -190,6 +191,10 @@ type arvadosContainerRunner struct {
 }
 
 func (runner *arvadosContainerRunner) Run() (string, error) {
+       return runner.RunContext(context.Background())
+}
+
+func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, error) {
        if runner.ProjectUUID == "" {
                return "", errors.New("cannot run arvados container: ProjectUUID not provided")
        }
@@ -293,8 +298,19 @@ func (runner *arvadosContainerRunner) Run() (string, error) {
        }
 
        var reCrunchstat = regexp.MustCompile(`mem .* rss`)
+waitctr:
        for cr.State != arvados.ContainerRequestStateFinal {
                select {
+               case <-ctx.Done():
+                       err := runner.Client.RequestAndDecode(&cr, "PATCH", "arvados/v1/container_requests/"+cr.UUID, nil, map[string]interface{}{
+                               "container_request": map[string]interface{}{
+                                       "priority": 0,
+                               },
+                       })
+                       if err != nil {
+                               log.Errorf("error while trying to cancel container request %s: %s", cr.UUID, err)
+                       }
+                       break waitctr
                case <-ticker.C:
                        refreshCR()
                case msg := <-logch:
@@ -322,6 +338,10 @@ func (runner *arvadosContainerRunner) Run() (string, error) {
        }
        fmt.Fprint(os.Stderr, neednewline)
 
+       if err := ctx.Err(); err != nil {
+               return "", err
+       }
+
        var c arvados.Container
        err = runner.Client.RequestAndDecode(&c, "GET", "arvados/v1/containers/"+cr.ContainerUUID, nil, nil)
        if err != nil {
@@ -362,7 +382,11 @@ func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error {
        return nil
 }
 
+var mtxMakeCommandCollection sync.Mutex
+
 func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) {
+       mtxMakeCommandCollection.Lock()
+       defer mtxMakeCommandCollection.Unlock()
        exe, err := ioutil.ReadFile("/proc/self/exe")
        if err != nil {
                return "", err
index 0d2d7379d184174a54766a9a4b4306adb14fb582..cea24f2785bc0c76568a018dee30ff4d9c285e55 100644 (file)
--- a/import.go
+++ b/import.go
@@ -3,6 +3,7 @@ package main
 import (
        "bufio"
        "compress/gzip"
+       "context"
        "encoding/gob"
        "encoding/json"
        "errors"
@@ -36,6 +37,8 @@ type importer struct {
        refFile             string
        outputFile          string
        projectUUID         string
+       loglevel            string
+       priority            int
        runLocal            bool
        skipOOO             bool
        outputTiles         bool
@@ -43,6 +46,7 @@ type importer struct {
        outputStats         string
        matchChromosome     *regexp.Regexp
        encoder             *gob.Encoder
+       batchArgs
 }
 
 func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
@@ -63,10 +67,11 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std
        flags.BoolVar(&cmd.outputTiles, "output-tiles", false, "include tile variant sequences in output file")
        flags.BoolVar(&cmd.saveIncompleteTiles, "save-incomplete-tiles", false, "treat tiles with no-calls as regular tiles")
        flags.StringVar(&cmd.outputStats, "output-stats", "", "output stats to `file` (json)")
+       cmd.batchArgs.Flags(flags)
        matchChromosome := flags.String("match-chromosome", "^(chr)?([0-9]+|X|Y|MT?)$", "import chromosomes that match the given `regexp`")
-       priority := flags.Int("priority", 500, "container request priority")
+       flags.IntVar(&cmd.priority, "priority", 500, "container request priority")
        pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
-       loglevel := flags.String("loglevel", "info", "logging threshold (trace, debug, info, warn, error, fatal, or panic)")
+       flags.StringVar(&cmd.loglevel, "loglevel", "info", "logging threshold (trace, debug, info, warn, error, fatal, or panic)")
        err = flags.Parse(args)
        if err == flag.ErrHelp {
                err = nil
@@ -87,7 +92,7 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std
                }()
        }
 
-       lvl, err := log.ParseLevel(*loglevel)
+       lvl, err := log.ParseLevel(cmd.loglevel)
        if err != nil {
                return 2
        }
@@ -99,52 +104,10 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std
        }
 
        if !cmd.runLocal {
-               runner := arvadosContainerRunner{
-                       Name:        "lightning import",
-                       Client:      arvados.NewClientFromEnv(),
-                       ProjectUUID: cmd.projectUUID,
-                       RAM:         80000000000,
-                       VCPUs:       32,
-                       Priority:    *priority,
-               }
-               err = runner.TranslatePaths(&cmd.tagLibraryFile, &cmd.refFile, &cmd.outputFile)
-               if err != nil {
-                       return 1
-               }
-               inputs := flags.Args()
-               for i := range inputs {
-                       err = runner.TranslatePaths(&inputs[i])
-                       if err != nil {
-                               return 1
-                       }
-               }
-               if cmd.outputFile == "-" {
-                       cmd.outputFile = "/mnt/output/library.gob.gz"
-               } else {
-                       // Not yet implemented, but this should write
-                       // the collection to an existing collection,
-                       // possibly even an in-place update.
-                       err = errors.New("cannot specify output file in container mode: not implemented")
-                       return 1
-               }
-               runner.Args = append([]string{"import",
-                       "-local=true",
-                       "-loglevel=" + *loglevel,
-                       fmt.Sprintf("-skip-ooo=%v", cmd.skipOOO),
-                       fmt.Sprintf("-output-tiles=%v", cmd.outputTiles),
-                       fmt.Sprintf("-save-incomplete-tiles=%v", cmd.saveIncompleteTiles),
-                       "-match-chromosome", cmd.matchChromosome.String(),
-                       "-output-stats", "/mnt/output/stats.json",
-                       "-tag-library", cmd.tagLibraryFile,
-                       "-ref", cmd.refFile,
-                       "-o", cmd.outputFile,
-               }, inputs...)
-               var output string
-               output, err = runner.Run()
+               err = cmd.runBatches(stdout, flags.Args())
                if err != nil {
                        return 1
                }
-               fmt.Fprintln(stdout, output+"/library.gob.gz")
                return 0
        }
 
@@ -152,6 +115,7 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std
        if err != nil {
                return 1
        }
+       infiles = cmd.batchArgs.Slice(infiles)
 
        taglib, err := cmd.loadTagLibrary()
        if err != nil {
@@ -208,6 +172,65 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std
        return 0
 }
 
+func (cmd *importer) runBatches(stdout io.Writer, inputs []string) error {
+       if cmd.outputFile != "-" {
+               // Not yet implemented, but this should write
+               // the collection to an existing collection,
+               // possibly even an in-place update.
+               return errors.New("cannot specify output file in container mode: not implemented")
+       }
+       client := arvados.NewClientFromEnv()
+       runner := arvadosContainerRunner{
+               Name:        "lightning import",
+               Client:      client,
+               ProjectUUID: cmd.projectUUID,
+               RAM:         80000000000,
+               VCPUs:       32,
+               Priority:    cmd.priority,
+       }
+       err := runner.TranslatePaths(&cmd.tagLibraryFile, &cmd.refFile, &cmd.outputFile)
+       if err != nil {
+               return err
+       }
+       for i := range inputs {
+               err = runner.TranslatePaths(&inputs[i])
+               if err != nil {
+                       return err
+               }
+       }
+
+       outputs, err := cmd.batchArgs.RunBatches(context.Background(), func(ctx context.Context, batch int) (string, error) {
+               runner := runner
+               if cmd.batches > 1 {
+                       runner.Name += fmt.Sprintf(" (batch %d of %d)", batch, cmd.batches)
+               }
+               runner.Args = []string{"import",
+                       "-local=true",
+                       "-loglevel=" + cmd.loglevel,
+                       fmt.Sprintf("-skip-ooo=%v", cmd.skipOOO),
+                       fmt.Sprintf("-output-tiles=%v", cmd.outputTiles),
+                       fmt.Sprintf("-save-incomplete-tiles=%v", cmd.saveIncompleteTiles),
+                       "-match-chromosome", cmd.matchChromosome.String(),
+                       "-output-stats", "/mnt/output/stats.json",
+                       "-tag-library", cmd.tagLibraryFile,
+                       "-ref", cmd.refFile,
+                       "-o", "/mnt/output/library.gob.gz",
+               }
+               runner.Args = append(runner.Args, cmd.batchArgs.Args(batch)...)
+               runner.Args = append(runner.Args, inputs...)
+               return runner.RunContext(ctx)
+       })
+       if err != nil {
+               return err
+       }
+       var outfiles []string
+       for _, o := range outputs {
+               outfiles = append(outfiles, o+"/library.gob.gz")
+       }
+       fmt.Fprintln(stdout, strings.Join(outfiles, " "))
+       return nil
+}
+
 func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, []importStats, error) {
        var input io.ReadCloser
        input, err := os.Open(infile)
index 4044cfdeee5412ba1099d258359fea9eee792f86..98b285e0ed1dafb4dee7236d1af338a4f082937d 100644 (file)
@@ -37,6 +37,7 @@ type vcf2fasta struct {
        outputDir         string
        runLocal          bool
        vcpus             int
+       batchArgs
 
        stderr io.Writer
 }
@@ -59,6 +60,7 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st
        flags.StringVar(&cmd.outputDir, "output-dir", "", "output directory")
        flags.IntVar(&cmd.vcpus, "vcpus", 0, "number of VCPUs to request for arvados container (default: 2*number of input files, max 32)")
        flags.BoolVar(&cmd.runLocal, "local", false, "run on local host (default: run in an arvados container)")
+       cmd.batchArgs.Flags(flags)
        priority := flags.Int("priority", 500, "container request priority")
        pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
        err = flags.Parse(args)
@@ -100,13 +102,14 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st
                        if err != nil {
                                return 1
                        }
-                       if cmd.vcpus = len(infiles) * 2; cmd.vcpus > 32 {
+                       if cmd.vcpus = len(cmd.batchArgs.Slice(infiles)) * 2; cmd.vcpus > 32 {
                                cmd.vcpus = 32
                        }
                }
+               client := arvados.NewClientFromEnv()
                runner := arvadosContainerRunner{
                        Name:        "lightning vcf2fasta",
-                       Client:      arvados.NewClientFromEnv(),
+                       Client:      client,
                        ProjectUUID: cmd.projectUUID,
                        RAM:         2<<30 + int64(cmd.vcpus)<<28,
                        VCPUs:       cmd.vcpus,
@@ -118,9 +121,6 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st
                                },
                        },
                }
-               if cmd.mask {
-                       runner.RAM += int64(cmd.vcpus) << 31
-               }
                err = runner.TranslatePaths(&cmd.refFile, &cmd.genomeFile)
                if err != nil {
                        return 1
@@ -132,19 +132,29 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st
                                return 1
                        }
                }
-               runner.Args = append([]string{"vcf2fasta",
-                       "-local=true",
-                       "-ref", cmd.refFile, fmt.Sprintf("-mask=%v", cmd.mask),
-                       "-genome", cmd.genomeFile,
-                       "-gvcf-regions.py", "/gvcf_regions.py",
-                       "-gvcf-type", cmd.gvcfType,
-                       "-output-dir", "/mnt/output"}, inputs...)
-               var output string
-               output, err = runner.Run()
+               var outputs []string
+               outputs, err = cmd.batchArgs.RunBatches(context.Background(), func(ctx context.Context, batch int) (string, error) {
+                       runner := runner
+                       if cmd.mask {
+                               runner.RAM += int64(cmd.vcpus) << 31
+                       }
+                       runner.Args = []string{"vcf2fasta",
+                               "-local=true",
+                               "-ref", cmd.refFile, fmt.Sprintf("-mask=%v", cmd.mask),
+                               "-genome", cmd.genomeFile,
+                               "-gvcf-regions.py", "/gvcf_regions.py",
+                               "-gvcf-type", cmd.gvcfType,
+                               "-output-dir", "/mnt/output",
+                       }
+                       runner.Args = append(runner.Args, cmd.batchArgs.Args(batch)...)
+                       runner.Args = append(runner.Args, inputs...)
+                       log.Printf("batch %d: %v", batch, runner.Args)
+                       return runner.RunContext(ctx)
+               })
                if err != nil {
                        return 1
                }
-               fmt.Fprintln(stdout, output)
+               fmt.Fprintln(stdout, strings.Join(outputs, " "))
                return 0
        }
 
@@ -152,6 +162,7 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st
        if err != nil {
                return 1
        }
+       infiles = cmd.batchArgs.Slice(infiles)
 
        type job struct {
                vcffile string