X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/7053eebc9122d70faea82fec8551635d74f5bec3..335a3e40a54aab203fab7158af53819fef7bb512:/export.go diff --git a/export.go b/export.go index 00c2360dd4..4403067149 100644 --- a/export.go +++ b/export.go @@ -12,6 +12,7 @@ import ( _ "net/http/pprof" "os" "path" + "runtime" "sort" "strings" "sync" @@ -40,6 +41,7 @@ var ( type exporter struct { outputFormat outputFormat + maxTileSize int } func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { @@ -52,6 +54,7 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std flags := flag.NewFlagSet("", flag.ContinueOnError) flags.SetOutput(stderr) pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`") + pprofdir := flags.String("pprof-dir", "", "write Go profile data to `directory` periodically") runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)") projectUUID := flags.String("project", "", "project `UUID` for output data") priority := flags.Int("priority", 500, "container request priority") @@ -61,6 +64,7 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std outputFormatStr := flags.String("output-format", "hgvs", "output `format`: hgvs or vcf") outputBed := flags.String("output-bed", "", "also output bed `file`") labelsFilename := flags.String("output-labels", "", "also output genome labels csv `file`") + flags.IntVar(&cmd.maxTileSize, "max-tile-size", 50000, "don't try to make annotations for tiles bigger than given `size`") err = flags.Parse(args) if err == flag.ErrHelp { err = nil @@ -81,6 +85,9 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std log.Println(http.ListenAndServe(*pprof, nil)) }() } + if *pprofdir != "" { + go writeProfilesPeriodically(*pprofdir) + } if !*runlocal { if *outputFilename != "-" { @@ -91,8 +98,8 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std Name: "lightning export", Client: arvados.NewClientFromEnv(), ProjectUUID: *projectUUID, - RAM: 750000000000, - VCPUs: 32, + RAM: 700000000000, + VCPUs: 64, Priority: *priority, } err = runner.TranslatePaths(inputFilename) @@ -107,10 +114,13 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std *outputBed = "/mnt/output/" + *outputBed } runner.Args = []string{"export", "-local=true", + "-pprof", ":6000", + "-pprof-dir", "/mnt/output", "-ref", *refname, "-output-format", *outputFormatStr, "-output-bed", *outputBed, "-output-labels", "/mnt/output/labels.csv", + "-max-tile-size", fmt.Sprintf("%d", cmd.maxTileSize), "-i", *inputFilename, "-o", "/mnt/output/export.csv", } @@ -303,56 +313,56 @@ func (cmd *exporter) export(out, bedout io.Writer, librdr io.Reader, gz bool, ti return fmt.Errorf("%d needed tiles are missing from library", len(missing)) } - outbuf := make([]bytes.Buffer, len(seqnames)) - bedbuf := make([]bytes.Buffer, len(seqnames)) - ready := make([]chan struct{}, len(seqnames)) - for i := range ready { - ready[i] = make(chan struct{}) - } + outw := make([]io.WriteCloser, len(seqnames)) + bedw := make([]io.WriteCloser, len(seqnames)) - var output sync.WaitGroup - output.Add(1) - go func() { - defer output.Done() + var merges sync.WaitGroup + merge := func(dst io.Writer, src []io.WriteCloser, label string) { + var mtx sync.Mutex for i, seqname := range seqnames { - <-ready[i] - log.Infof("writing outbuf %s", seqname) - io.Copy(out, &outbuf[i]) - log.Infof("writing outbuf %s done", seqname) - } - }() - output.Add(1) - go func() { - defer output.Done() - if bedout != nil { - for i, seqname := range seqnames { - <-ready[i] - log.Infof("writing bedbuf %s", seqname) - io.Copy(bedout, &bedbuf[i]) - log.Infof("writing bedbuf %s done", seqname) - } + pr, pw := io.Pipe() + src[i] = pw + merges.Add(1) + seqname := seqname + go func() { + defer merges.Done() + log.Infof("writing %s %s", seqname, label) + scanner := bufio.NewScanner(pr) + for scanner.Scan() { + mtx.Lock() + dst.Write(scanner.Bytes()) + dst.Write([]byte{'\n'}) + mtx.Unlock() + } + log.Infof("writing %s %s done", seqname, label) + }() } - }() + } + merge(out, outw, "output") + if bedout != nil { + merge(bedout, bedw, "bed") + } - throttle := throttle{Max: 4} + throttle := throttle{Max: runtime.NumCPU()} log.Infof("assembling %d sequences in %d goroutines", len(seqnames), throttle.Max) for seqidx, seqname := range seqnames { seqidx, seqname := seqidx, seqname - outbuf := &outbuf[seqidx] - bedbuf := &bedbuf[seqidx] - if bedout == nil { - bedbuf = nil - } + outw := outw[seqidx] + bedw := bedw[seqidx] throttle.Acquire() go func() { defer throttle.Release() - defer close(ready[seqidx]) - cmd.exportSeq(outbuf, bedbuf, tilelib.taglib.keylen, seqname, refseq[seqname], tileVariant, cgs) - log.Infof("assembled %q to outbuf %d bedbuf %d", seqname, outbuf.Len(), bedbuf.Len()) + if bedw != nil { + defer bedw.Close() + } + defer outw.Close() + outwb := bufio.NewWriter(outw) + defer outwb.Flush() + cmd.exportSeq(outwb, bedw, tilelib.taglib.keylen, seqname, refseq[seqname], tileVariant, cgs) }() } - output.Wait() + merges.Wait() return nil } @@ -384,12 +394,15 @@ func (cmd *exporter) exportSeq(outw, bedw io.Writer, taglen int, seqname string, // was false during import continue } + if len(genometile.Sequence) > cmd.maxTileSize { + continue + } refSequence := reftile.Sequence // If needed, extend the reference // sequence up to the tag at the end // of the genometile sequence. refstepend := refstep + 1 - for refstepend < len(reftiles) && len(refSequence) >= taglen && !bytes.EqualFold(refSequence[len(refSequence)-taglen:], genometile.Sequence[len(genometile.Sequence)-taglen:]) { + for refstepend < len(reftiles) && len(refSequence) >= taglen && !bytes.EqualFold(refSequence[len(refSequence)-taglen:], genometile.Sequence[len(genometile.Sequence)-taglen:]) && len(refSequence) <= cmd.maxTileSize { if &refSequence[0] == &reftile.Sequence[0] { refSequence = append([]byte(nil), refSequence...) }