Bring memory back down.
[lightning.git] / export.go
index 0ab28825fc4147c4bb01ab3262673001067248e7..44030671490dc334f1269c3238add048b39da876 100644 (file)
--- a/export.go
+++ b/export.go
@@ -12,6 +12,7 @@ import (
        _ "net/http/pprof"
        "os"
        "path"
+       "runtime"
        "sort"
        "strings"
        "sync"
@@ -40,6 +41,7 @@ var (
 
 type exporter struct {
        outputFormat outputFormat
+       maxTileSize  int
 }
 
 func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
@@ -52,6 +54,7 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std
        flags := flag.NewFlagSet("", flag.ContinueOnError)
        flags.SetOutput(stderr)
        pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
+       pprofdir := flags.String("pprof-dir", "", "write Go profile data to `directory` periodically")
        runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
        projectUUID := flags.String("project", "", "project `UUID` for output data")
        priority := flags.Int("priority", 500, "container request priority")
@@ -61,6 +64,7 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std
        outputFormatStr := flags.String("output-format", "hgvs", "output `format`: hgvs or vcf")
        outputBed := flags.String("output-bed", "", "also output bed `file`")
        labelsFilename := flags.String("output-labels", "", "also output genome labels csv `file`")
+       flags.IntVar(&cmd.maxTileSize, "max-tile-size", 50000, "don't try to make annotations for tiles bigger than given `size`")
        err = flags.Parse(args)
        if err == flag.ErrHelp {
                err = nil
@@ -81,6 +85,9 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std
                        log.Println(http.ListenAndServe(*pprof, nil))
                }()
        }
+       if *pprofdir != "" {
+               go writeProfilesPeriodically(*pprofdir)
+       }
 
        if !*runlocal {
                if *outputFilename != "-" {
@@ -91,7 +98,7 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std
                        Name:        "lightning export",
                        Client:      arvados.NewClientFromEnv(),
                        ProjectUUID: *projectUUID,
-                       RAM:         1600000000000,
+                       RAM:         700000000000,
                        VCPUs:       64,
                        Priority:    *priority,
                }
@@ -107,10 +114,13 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std
                        *outputBed = "/mnt/output/" + *outputBed
                }
                runner.Args = []string{"export", "-local=true",
+                       "-pprof", ":6000",
+                       "-pprof-dir", "/mnt/output",
                        "-ref", *refname,
                        "-output-format", *outputFormatStr,
                        "-output-bed", *outputBed,
                        "-output-labels", "/mnt/output/labels.csv",
+                       "-max-tile-size", fmt.Sprintf("%d", cmd.maxTileSize),
                        "-i", *inputFilename,
                        "-o", "/mnt/output/export.csv",
                }
@@ -303,68 +313,56 @@ func (cmd *exporter) export(out, bedout io.Writer, librdr io.Reader, gz bool, ti
                return fmt.Errorf("%d needed tiles are missing from library", len(missing))
        }
 
-       if false {
-               // low memory mode
-               for _, seqname := range seqnames {
-                       log.Infof("assembling %q", seqname)
-                       cmd.exportSeq(out, bedout, tilelib.taglib.keylen, seqname, refseq[seqname], tileVariant, cgs)
-                       log.Infof("assembled %q", seqname)
-               }
-               return nil
-       }
+       outw := make([]io.WriteCloser, len(seqnames))
+       bedw := make([]io.WriteCloser, len(seqnames))
 
-       outbuf := make([]bytes.Buffer, len(seqnames))
-       bedbuf := make([]bytes.Buffer, len(seqnames))
-       ready := make([]chan struct{}, len(seqnames))
-       for i := range ready {
-               ready[i] = make(chan struct{})
-       }
-
-       var output sync.WaitGroup
-       output.Add(1)
-       go func() {
-               defer output.Done()
+       var merges sync.WaitGroup
+       merge := func(dst io.Writer, src []io.WriteCloser, label string) {
+               var mtx sync.Mutex
                for i, seqname := range seqnames {
-                       <-ready[i]
-                       log.Infof("writing outbuf %s", seqname)
-                       io.Copy(out, &outbuf[i])
-                       log.Infof("writing outbuf %s done", seqname)
-                       outbuf[i] = bytes.Buffer{}
-               }
-       }()
-       output.Add(1)
-       go func() {
-               defer output.Done()
-               if bedout != nil {
-                       for i, seqname := range seqnames {
-                               <-ready[i]
-                               log.Infof("writing bedbuf %s", seqname)
-                               io.Copy(bedout, &bedbuf[i])
-                               log.Infof("writing bedbuf %s done", seqname)
-                               bedbuf[i] = bytes.Buffer{}
-                       }
+                       pr, pw := io.Pipe()
+                       src[i] = pw
+                       merges.Add(1)
+                       seqname := seqname
+                       go func() {
+                               defer merges.Done()
+                               log.Infof("writing %s %s", seqname, label)
+                               scanner := bufio.NewScanner(pr)
+                               for scanner.Scan() {
+                                       mtx.Lock()
+                                       dst.Write(scanner.Bytes())
+                                       dst.Write([]byte{'\n'})
+                                       mtx.Unlock()
+                               }
+                               log.Infof("writing %s %s done", seqname, label)
+                       }()
                }
-       }()
+       }
+       merge(out, outw, "output")
+       if bedout != nil {
+               merge(bedout, bedw, "bed")
+       }
 
-       throttle := throttle{Max: 8}
+       throttle := throttle{Max: runtime.NumCPU()}
        log.Infof("assembling %d sequences in %d goroutines", len(seqnames), throttle.Max)
        for seqidx, seqname := range seqnames {
                seqidx, seqname := seqidx, seqname
-               outbuf := &outbuf[seqidx]
-               bedbuf := &bedbuf[seqidx]
-               if bedout == nil {
-                       bedbuf = nil
-               }
+               outw := outw[seqidx]
+               bedw := bedw[seqidx]
                throttle.Acquire()
                go func() {
                        defer throttle.Release()
-                       defer close(ready[seqidx])
-                       cmd.exportSeq(outbuf, bedbuf, tilelib.taglib.keylen, seqname, refseq[seqname], tileVariant, cgs)
-                       log.Infof("assembled %q to outbuf %d bedbuf %d", seqname, outbuf.Len(), bedbuf.Len())
+                       if bedw != nil {
+                               defer bedw.Close()
+                       }
+                       defer outw.Close()
+                       outwb := bufio.NewWriter(outw)
+                       defer outwb.Flush()
+                       cmd.exportSeq(outwb, bedw, tilelib.taglib.keylen, seqname, refseq[seqname], tileVariant, cgs)
                }()
        }
 
-       output.Wait()
+       merges.Wait()
        return nil
 }
 
@@ -396,12 +394,15 @@ func (cmd *exporter) exportSeq(outw, bedw io.Writer, taglen int, seqname string,
                                        // was false during import
                                        continue
                                }
+                               if len(genometile.Sequence) > cmd.maxTileSize {
+                                       continue
+                               }
                                refSequence := reftile.Sequence
                                // If needed, extend the reference
                                // sequence up to the tag at the end
                                // of the genometile sequence.
                                refstepend := refstep + 1
-                               for refstepend < len(reftiles) && len(refSequence) >= taglen && !bytes.EqualFold(refSequence[len(refSequence)-taglen:], genometile.Sequence[len(genometile.Sequence)-taglen:]) {
+                               for refstepend < len(reftiles) && len(refSequence) >= taglen && !bytes.EqualFold(refSequence[len(refSequence)-taglen:], genometile.Sequence[len(genometile.Sequence)-taglen:]) && len(refSequence) <= cmd.maxTileSize {
                                        if &refSequence[0] == &reftile.Sequence[0] {
                                                refSequence = append([]byte(nil), refSequence...)
                                        }