Compress export output.
authorTom Clegg <tom@tomclegg.ca>
Tue, 13 Jul 2021 15:25:00 +0000 (11:25 -0400)
committerTom Clegg <tom@tomclegg.ca>
Tue, 13 Jul 2021 15:25:00 +0000 (11:25 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

export.go

index 67d7d56e6c711047960692eefc829c22ba7638b4..2ecf984ba1e094cff0be1b62ba6df73fa15cd3b5 100644 (file)
--- a/export.go
+++ b/export.go
@@ -21,6 +21,7 @@ import (
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "github.com/arvados/lightning/hgvs"
+       "github.com/klauspost/pgzip"
        log "github.com/sirupsen/logrus"
 )
 
@@ -48,6 +49,7 @@ var (
 type exporter struct {
        outputFormat   outputFormat
        outputPerChrom bool
+       compress       bool
        maxTileSize    int
 }
 
@@ -71,6 +73,7 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std
        outputFormatStr := flags.String("output-format", "hgvs", "output `format`: hgvs, pvcf, or vcf")
        outputBed := flags.String("output-bed", "", "also output bed `file`")
        flags.BoolVar(&cmd.outputPerChrom, "output-per-chromosome", true, "output one file per chromosome")
+       flags.BoolVar(&cmd.compress, "z", false, "write gzip-compressed output files")
        labelsFilename := flags.String("output-labels", "", "also output genome labels csv `file`")
        flags.IntVar(&cmd.maxTileSize, "max-tile-size", 50000, "don't try to make annotations for tiles bigger than given `size`")
        err = flags.Parse(args)
@@ -137,6 +140,7 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std
                        "-max-tile-size", fmt.Sprintf("%d", cmd.maxTileSize),
                        "-input-dir", *inputDir,
                        "-output-dir", "/mnt/output",
+                       "-z=" + fmt.Sprintf("%v", cmd.compress),
                }
                var output string
                output, err = runner.Run()
@@ -274,23 +278,41 @@ func (cmd *exporter) export(outdir string, bedout io.Writer, tilelib *tileLibrar
        }
        if cmd.outputPerChrom {
                for i, seqname := range seqnames {
-                       f, err := os.OpenFile(filepath.Join(outdir, strings.Replace(cmd.outputFormat.Filename, ".", "."+seqname+".", 1)), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666)
+                       fnm := filepath.Join(outdir, strings.Replace(cmd.outputFormat.Filename, ".", "."+seqname+".", 1))
+                       if cmd.compress {
+                               fnm += ".gz"
+                       }
+                       f, err := os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666)
                        if err != nil {
                                return err
                        }
                        defer f.Close()
                        log.Infof("writing %q", f.Name())
-                       cmd.outputFormat.Head(f, cgs)
                        outw[i] = f
+                       if cmd.compress {
+                               z := pgzip.NewWriter(f)
+                               defer z.Close()
+                               outw[i] = z
+                       }
+                       cmd.outputFormat.Head(outw[i], cgs)
                }
        } else {
                fnm := filepath.Join(outdir, cmd.outputFormat.Filename)
-               log.Infof("writing %q", fnm)
-               out, err := os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666)
+               if cmd.compress {
+                       fnm += ".gz"
+               }
+               f, err := os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666)
                if err != nil {
                        return err
                }
-               defer out.Close()
+               defer f.Close()
+               log.Infof("writing %q", fnm)
+               var out io.Writer = f
+               if cmd.compress {
+                       z := pgzip.NewWriter(out)
+                       defer z.Close()
+                       out = z
+               }
                cmd.outputFormat.Head(out, cgs)
                merge(out, outw, "output")
        }
@@ -310,16 +332,18 @@ func (cmd *exporter) export(outdir string, bedout io.Writer, tilelib *tileLibrar
                        if bedw != nil {
                                defer bedw.Close()
                        }
-                       defer outw.Close()
                        outwb := bufio.NewWriterSize(outw, 8*1024*1024)
-                       defer outwb.Flush()
                        cmd.exportSeq(outwb, bedw, tilelib.taglib.keylen, seqname, refseq[seqname], tilelib, cgs)
+                       err := outwb.Flush()
+                       throttle.Report(err)
+                       err = outw.Close()
+                       throttle.Report(err)
                }()
        }
 
        merges.Wait()
        throttle.Wait()
-       return nil
+       return throttle.Err()
 }
 
 // Align genome tiles to reference tiles, write diffs to outw, and (if