Improve import and vcf2fasta performance.
[lightning.git] / import.go
index cea24f2785bc0c76568a018dee30ff4d9c285e55..f4e0cd0008c218cf23553e12be9d174031e06d2c 100644 (file)
--- a/import.go
+++ b/import.go
@@ -10,6 +10,7 @@ import (
        "flag"
        "fmt"
        "io"
+       "io/ioutil"
        "net/http"
        _ "net/http/pprof"
        "os"
@@ -23,13 +24,8 @@ import (
        "sync/atomic"
        "time"
 
-       "git.arvados.org/arvados.git/sdk/go/arvados"
-       "github.com/lucasb-eyer/go-colorful"
+       "github.com/klauspost/pgzip"
        log "github.com/sirupsen/logrus"
-       "gonum.org/v1/plot"
-       "gonum.org/v1/plot/plotter"
-       "gonum.org/v1/plot/vg"
-       "gonum.org/v1/plot/vg/draw"
 )
 
 type importer struct {
@@ -132,12 +128,12 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std
                }
                defer outf.Close()
                if strings.HasSuffix(cmd.outputFile, ".gz") {
-                       outw = gzip.NewWriter(outf)
+                       outw = pgzip.NewWriter(outf)
                } else {
                        outw = outf
                }
        }
-       bufw := bufio.NewWriter(outw)
+       bufw := bufio.NewWriterSize(outw, 64*1024*1024)
        cmd.encoder = gob.NewEncoder(bufw)
 
        tilelib := &tileLibrary{taglib: taglib, retainNoCalls: cmd.saveIncompleteTiles, skipOOO: cmd.skipOOO}
@@ -179,14 +175,15 @@ func (cmd *importer) runBatches(stdout io.Writer, inputs []string) error {
                // possibly even an in-place update.
                return errors.New("cannot specify output file in container mode: not implemented")
        }
-       client := arvados.NewClientFromEnv()
        runner := arvadosContainerRunner{
                Name:        "lightning import",
-               Client:      client,
+               Client:      arvadosClientFromEnv,
                ProjectUUID: cmd.projectUUID,
-               RAM:         80000000000,
-               VCPUs:       32,
+               APIAccess:   true,
+               RAM:         300000000000,
+               VCPUs:       64,
                Priority:    cmd.priority,
+               KeepCache:   1,
        }
        err := runner.TranslatePaths(&cmd.tagLibraryFile, &cmd.refFile, &cmd.outputFile)
        if err != nil {
@@ -207,6 +204,7 @@ func (cmd *importer) runBatches(stdout io.Writer, inputs []string) error {
                runner.Args = []string{"import",
                        "-local=true",
                        "-loglevel=" + cmd.loglevel,
+                       "-pprof=:6061",
                        fmt.Sprintf("-skip-ooo=%v", cmd.skipOOO),
                        fmt.Sprintf("-output-tiles=%v", cmd.outputTiles),
                        fmt.Sprintf("-save-incomplete-tiles=%v", cmd.saveIncompleteTiles),
@@ -233,13 +231,14 @@ func (cmd *importer) runBatches(stdout io.Writer, inputs []string) error {
 
 func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, []importStats, error) {
        var input io.ReadCloser
-       input, err := os.Open(infile)
+       input, err := open(infile)
        if err != nil {
                return nil, nil, err
        }
        defer input.Close()
+       input = ioutil.NopCloser(bufio.NewReaderSize(input, 8*1024*1024))
        if strings.HasSuffix(infile, ".gz") {
-               input, err = gzip.NewReader(input)
+               input, err = pgzip.NewReader(input)
                if err != nil {
                        return nil, nil, err
                }
@@ -250,14 +249,14 @@ func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, []
 
 func (cmd *importer) loadTagLibrary() (*tagLibrary, error) {
        log.Printf("tag library %s load starting", cmd.tagLibraryFile)
-       f, err := os.Open(cmd.tagLibraryFile)
+       f, err := open(cmd.tagLibraryFile)
        if err != nil {
                return nil, err
        }
        defer f.Close()
-       var rdr io.ReadCloser = f
+       rdr := ioutil.NopCloser(bufio.NewReaderSize(f, 64*1024*1024))
        if strings.HasSuffix(cmd.tagLibraryFile, ".gz") {
-               rdr, err = gzip.NewReader(f)
+               rdr, err = gzip.NewReader(rdr)
                if err != nil {
                        return nil, fmt.Errorf("%s: gzip: %s", cmd.tagLibraryFile, err)
                }
@@ -426,7 +425,7 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error {
        go close(todo)
        var tileJobs sync.WaitGroup
        var running int64
-       for i := 0; i < runtime.NumCPU()*9/8+1; i++ {
+       for i := 0; i < runtime.GOMAXPROCS(-1)*2; i++ {
                tileJobs.Add(1)
                atomic.AddInt64(&running, 1)
                go func() {
@@ -544,73 +543,3 @@ func flatten(variants [][]tileVariantID) []tileVariantID {
        }
        return flat
 }
-
-type importstatsplot struct{}
-
-func (cmd *importstatsplot) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
-       err := cmd.Plot(stdin, stdout)
-       if err != nil {
-               log.Errorf("%s", err)
-               return 1
-       }
-       return 0
-}
-
-func (cmd *importstatsplot) Plot(stdin io.Reader, stdout io.Writer) error {
-       var stats []importStats
-       err := json.NewDecoder(stdin).Decode(&stats)
-       if err != nil {
-               return err
-       }
-
-       p, err := plot.New()
-       if err != nil {
-               return err
-       }
-       p.Title.Text = "coverage preserved by import (excl X<0.65)"
-       p.X.Label.Text = "input base calls ÷ sequence length"
-       p.Y.Label.Text = "output base calls ÷ input base calls"
-       p.Add(plotter.NewGrid())
-
-       data := map[string]plotter.XYs{}
-       for _, stat := range stats {
-               data[stat.InputLabel] = append(data[stat.InputLabel], plotter.XY{
-                       X: float64(stat.InputCoverage) / float64(stat.InputLength),
-                       Y: float64(stat.TileCoverage) / float64(stat.InputCoverage),
-               })
-       }
-
-       labels := []string{}
-       for label := range data {
-               labels = append(labels, label)
-       }
-       sort.Strings(labels)
-       palette, err := colorful.SoftPalette(len(labels))
-       if err != nil {
-               return err
-       }
-       nextInPalette := 0
-       for idx, label := range labels {
-               s, err := plotter.NewScatter(data[label])
-               if err != nil {
-                       return err
-               }
-               s.GlyphStyle.Color = palette[idx]
-               s.GlyphStyle.Radius = vg.Millimeter / 2
-               s.GlyphStyle.Shape = draw.CrossGlyph{}
-               nextInPalette += 7
-               p.Add(s)
-               if false {
-                       p.Legend.Add(label, s)
-               }
-       }
-       p.X.Min = 0.65
-       p.X.Max = 1
-
-       w, err := p.WriterTo(8*vg.Inch, 6*vg.Inch, "svg")
-       if err != nil {
-               return err
-       }
-       _, err = w.WriteTo(stdout)
-       return err
-}