Use buffered writer to avoid overwhelming arv-mount.
[lightning.git] / import.go
index f6ca80fed6e9aa5b461f1c7f31d42563de8b1862..cea24f2785bc0c76568a018dee30ff4d9c285e55 100644 (file)
--- a/import.go
+++ b/import.go
@@ -3,6 +3,7 @@ package main
 import (
        "bufio"
        "compress/gzip"
+       "context"
        "encoding/gob"
        "encoding/json"
        "errors"
@@ -36,12 +37,16 @@ type importer struct {
        refFile             string
        outputFile          string
        projectUUID         string
+       loglevel            string
+       priority            int
        runLocal            bool
        skipOOO             bool
        outputTiles         bool
        saveIncompleteTiles bool
        outputStats         string
+       matchChromosome     *regexp.Regexp
        encoder             *gob.Encoder
+       batchArgs
 }
 
 func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
@@ -62,9 +67,11 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std
        flags.BoolVar(&cmd.outputTiles, "output-tiles", false, "include tile variant sequences in output file")
        flags.BoolVar(&cmd.saveIncompleteTiles, "save-incomplete-tiles", false, "treat tiles with no-calls as regular tiles")
        flags.StringVar(&cmd.outputStats, "output-stats", "", "output stats to `file` (json)")
-       priority := flags.Int("priority", 500, "container request priority")
+       cmd.batchArgs.Flags(flags)
+       matchChromosome := flags.String("match-chromosome", "^(chr)?([0-9]+|X|Y|MT?)$", "import chromosomes that match the given `regexp`")
+       flags.IntVar(&cmd.priority, "priority", 500, "container request priority")
        pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
-       loglevel := flags.String("loglevel", "info", "logging threshold (trace, debug, info, warn, error, fatal, or panic)")
+       flags.StringVar(&cmd.loglevel, "loglevel", "info", "logging threshold (trace, debug, info, warn, error, fatal, or panic)")
        err = flags.Parse(args)
        if err == flag.ErrHelp {
                err = nil
@@ -85,58 +92,22 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std
                }()
        }
 
-       lvl, err := log.ParseLevel(*loglevel)
+       lvl, err := log.ParseLevel(cmd.loglevel)
        if err != nil {
                return 2
        }
        log.SetLevel(lvl)
 
+       cmd.matchChromosome, err = regexp.Compile(*matchChromosome)
+       if err != nil {
+               return 1
+       }
+
        if !cmd.runLocal {
-               runner := arvadosContainerRunner{
-                       Name:        "lightning import",
-                       Client:      arvados.NewClientFromEnv(),
-                       ProjectUUID: cmd.projectUUID,
-                       RAM:         80000000000,
-                       VCPUs:       32,
-                       Priority:    *priority,
-               }
-               err = runner.TranslatePaths(&cmd.tagLibraryFile, &cmd.refFile, &cmd.outputFile)
-               if err != nil {
-                       return 1
-               }
-               inputs := flags.Args()
-               for i := range inputs {
-                       err = runner.TranslatePaths(&inputs[i])
-                       if err != nil {
-                               return 1
-                       }
-               }
-               if cmd.outputFile == "-" {
-                       cmd.outputFile = "/mnt/output/library.gob"
-               } else {
-                       // Not yet implemented, but this should write
-                       // the collection to an existing collection,
-                       // possibly even an in-place update.
-                       err = errors.New("cannot specify output file in container mode: not implemented")
-                       return 1
-               }
-               runner.Args = append([]string{"import",
-                       "-local=true",
-                       "-loglevel=" + *loglevel,
-                       fmt.Sprintf("-skip-ooo=%v", cmd.skipOOO),
-                       fmt.Sprintf("-output-tiles=%v", cmd.outputTiles),
-                       fmt.Sprintf("-save-incomplete-tiles=%v", cmd.saveIncompleteTiles),
-                       "-output-stats", "/mnt/output/stats.json",
-                       "-tag-library", cmd.tagLibraryFile,
-                       "-ref", cmd.refFile,
-                       "-o", cmd.outputFile,
-               }, inputs...)
-               var output string
-               output, err = runner.Run()
+               err = cmd.runBatches(stdout, flags.Args())
                if err != nil {
                        return 1
                }
-               fmt.Fprintln(stdout, output+"/library.gob")
                return 0
        }
 
@@ -144,23 +115,29 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std
        if err != nil {
                return 1
        }
+       infiles = cmd.batchArgs.Slice(infiles)
 
        taglib, err := cmd.loadTagLibrary()
        if err != nil {
                return 1
        }
 
-       var output io.WriteCloser
+       var outw, outf io.WriteCloser
        if cmd.outputFile == "-" {
-               output = nopCloser{stdout}
+               outw = nopCloser{stdout}
        } else {
-               output, err = os.OpenFile(cmd.outputFile, os.O_CREATE|os.O_WRONLY, 0777)
+               outf, err = os.OpenFile(cmd.outputFile, os.O_CREATE|os.O_WRONLY, 0777)
                if err != nil {
                        return 1
                }
-               defer output.Close()
+               defer outf.Close()
+               if strings.HasSuffix(cmd.outputFile, ".gz") {
+                       outw = gzip.NewWriter(outf)
+               } else {
+                       outw = outf
+               }
        }
-       bufw := bufio.NewWriter(output)
+       bufw := bufio.NewWriter(outw)
        cmd.encoder = gob.NewEncoder(bufw)
 
        tilelib := &tileLibrary{taglib: taglib, retainNoCalls: cmd.saveIncompleteTiles, skipOOO: cmd.skipOOO}
@@ -182,13 +159,78 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std
        if err != nil {
                return 1
        }
-       err = output.Close()
+       err = outw.Close()
        if err != nil {
                return 1
        }
+       if outf != nil && outf != outw {
+               err = outf.Close()
+               if err != nil {
+                       return 1
+               }
+       }
        return 0
 }
 
+func (cmd *importer) runBatches(stdout io.Writer, inputs []string) error {
+       if cmd.outputFile != "-" {
+               // Not yet implemented, but this should write
+               // the collection to an existing collection,
+               // possibly even an in-place update.
+               return errors.New("cannot specify output file in container mode: not implemented")
+       }
+       client := arvados.NewClientFromEnv()
+       runner := arvadosContainerRunner{
+               Name:        "lightning import",
+               Client:      client,
+               ProjectUUID: cmd.projectUUID,
+               RAM:         80000000000,
+               VCPUs:       32,
+               Priority:    cmd.priority,
+       }
+       err := runner.TranslatePaths(&cmd.tagLibraryFile, &cmd.refFile, &cmd.outputFile)
+       if err != nil {
+               return err
+       }
+       for i := range inputs {
+               err = runner.TranslatePaths(&inputs[i])
+               if err != nil {
+                       return err
+               }
+       }
+
+       outputs, err := cmd.batchArgs.RunBatches(context.Background(), func(ctx context.Context, batch int) (string, error) {
+               runner := runner
+               if cmd.batches > 1 {
+                       runner.Name += fmt.Sprintf(" (batch %d of %d)", batch, cmd.batches)
+               }
+               runner.Args = []string{"import",
+                       "-local=true",
+                       "-loglevel=" + cmd.loglevel,
+                       fmt.Sprintf("-skip-ooo=%v", cmd.skipOOO),
+                       fmt.Sprintf("-output-tiles=%v", cmd.outputTiles),
+                       fmt.Sprintf("-save-incomplete-tiles=%v", cmd.saveIncompleteTiles),
+                       "-match-chromosome", cmd.matchChromosome.String(),
+                       "-output-stats", "/mnt/output/stats.json",
+                       "-tag-library", cmd.tagLibraryFile,
+                       "-ref", cmd.refFile,
+                       "-o", "/mnt/output/library.gob.gz",
+               }
+               runner.Args = append(runner.Args, cmd.batchArgs.Args(batch)...)
+               runner.Args = append(runner.Args, inputs...)
+               return runner.RunContext(ctx)
+       })
+       if err != nil {
+               return err
+       }
+       var outfiles []string
+       for _, o := range outputs {
+               outfiles = append(outfiles, o+"/library.gob.gz")
+       }
+       fmt.Fprintln(stdout, strings.Join(outfiles, " "))
+       return nil
+}
+
 func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, []importStats, error) {
        var input io.ReadCloser
        input, err := os.Open(infile)
@@ -203,7 +245,7 @@ func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, []
                }
                defer input.Close()
        }
-       return tilelib.TileFasta(infile, input)
+       return tilelib.TileFasta(infile, input, cmd.matchChromosome)
 }
 
 func (cmd *importer) loadTagLibrary() (*tagLibrary, error) {
@@ -469,7 +511,7 @@ func (cmd *importer) tileGVCF(tilelib *tileLibrary, infile string, phase int) (t
                return
        }
        defer consensus.Wait()
-       tileseq, stats, err = tilelib.TileFasta(fmt.Sprintf("%s phase %d", infile, phase+1), stdout)
+       tileseq, stats, err = tilelib.TileFasta(fmt.Sprintf("%s phase %d", infile, phase+1), stdout, cmd.matchChromosome)
        if err != nil {
                return
        }