_ "net/http/pprof"
"os"
"path"
+ "runtime"
"sort"
"strings"
"sync"
type exporter struct {
outputFormat outputFormat
+ maxTileSize int
}
func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
flags := flag.NewFlagSet("", flag.ContinueOnError)
flags.SetOutput(stderr)
pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
+ pprofdir := flags.String("pprof-dir", "", "write Go profile data to `directory` periodically")
runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
projectUUID := flags.String("project", "", "project `UUID` for output data")
priority := flags.Int("priority", 500, "container request priority")
outputFormatStr := flags.String("output-format", "hgvs", "output `format`: hgvs or vcf")
outputBed := flags.String("output-bed", "", "also output bed `file`")
labelsFilename := flags.String("output-labels", "", "also output genome labels csv `file`")
+ flags.IntVar(&cmd.maxTileSize, "max-tile-size", 50000, "don't try to make annotations for tiles bigger than given `size`")
err = flags.Parse(args)
if err == flag.ErrHelp {
err = nil
log.Println(http.ListenAndServe(*pprof, nil))
}()
}
+ if *pprofdir != "" {
+ go writeProfilesPeriodically(*pprofdir)
+ }
if !*runlocal {
if *outputFilename != "-" {
Name: "lightning export",
Client: arvados.NewClientFromEnv(),
ProjectUUID: *projectUUID,
- RAM: 750000000000,
- VCPUs: 32,
+ RAM: 700000000000,
+ VCPUs: 64,
Priority: *priority,
}
err = runner.TranslatePaths(inputFilename)
*outputBed = "/mnt/output/" + *outputBed
}
runner.Args = []string{"export", "-local=true",
+ "-pprof", ":6000",
+ "-pprof-dir", "/mnt/output",
"-ref", *refname,
"-output-format", *outputFormatStr,
"-output-bed", *outputBed,
"-output-labels", "/mnt/output/labels.csv",
+ "-max-tile-size", fmt.Sprintf("%d", cmd.maxTileSize),
"-i", *inputFilename,
"-o", "/mnt/output/export.csv",
}
return fmt.Errorf("%d needed tiles are missing from library", len(missing))
}
- outbuf := make([]bytes.Buffer, len(seqnames))
- bedbuf := make([]bytes.Buffer, len(seqnames))
- ready := make([]chan struct{}, len(seqnames))
- for i := range ready {
- ready[i] = make(chan struct{})
- }
+ outw := make([]io.WriteCloser, len(seqnames))
+ bedw := make([]io.WriteCloser, len(seqnames))
- var output sync.WaitGroup
- output.Add(1)
- go func() {
- defer output.Done()
+ var merges sync.WaitGroup
+ merge := func(dst io.Writer, src []io.WriteCloser, label string) {
+ var mtx sync.Mutex
for i, seqname := range seqnames {
- <-ready[i]
- log.Infof("writing outbuf %s", seqname)
- io.Copy(out, &outbuf[i])
- log.Infof("writing outbuf %s done", seqname)
- }
- }()
- output.Add(1)
- go func() {
- defer output.Done()
- if bedout != nil {
- for i, seqname := range seqnames {
- <-ready[i]
- log.Infof("writing bedbuf %s", seqname)
- io.Copy(bedout, &bedbuf[i])
- log.Infof("writing bedbuf %s done", seqname)
- }
+ pr, pw := io.Pipe()
+ src[i] = pw
+ merges.Add(1)
+ seqname := seqname
+ go func() {
+ defer merges.Done()
+ log.Infof("writing %s %s", seqname, label)
+ scanner := bufio.NewScanner(pr)
+ for scanner.Scan() {
+ mtx.Lock()
+ dst.Write(scanner.Bytes())
+ dst.Write([]byte{'\n'})
+ mtx.Unlock()
+ }
+ log.Infof("writing %s %s done", seqname, label)
+ }()
}
- }()
+ }
+ merge(out, outw, "output")
+ if bedout != nil {
+ merge(bedout, bedw, "bed")
+ }
- throttle := throttle{Max: 4}
+ throttle := throttle{Max: runtime.NumCPU()}
log.Infof("assembling %d sequences in %d goroutines", len(seqnames), throttle.Max)
for seqidx, seqname := range seqnames {
seqidx, seqname := seqidx, seqname
- outbuf := &outbuf[seqidx]
- bedbuf := &bedbuf[seqidx]
- if bedout == nil {
- bedbuf = nil
- }
+ outw := outw[seqidx]
+ bedw := bedw[seqidx]
throttle.Acquire()
go func() {
defer throttle.Release()
- defer close(ready[seqidx])
- cmd.exportSeq(outbuf, bedbuf, tilelib.taglib.keylen, seqname, refseq[seqname], tileVariant, cgs)
- log.Infof("assembled %q to outbuf %d bedbuf %d", seqname, outbuf.Len(), bedbuf.Len())
+ if bedw != nil {
+ defer bedw.Close()
+ }
+ defer outw.Close()
+ outwb := bufio.NewWriter(outw)
+ defer outwb.Flush()
+ cmd.exportSeq(outwb, bedw, tilelib.taglib.keylen, seqname, refseq[seqname], tileVariant, cgs)
}()
}
- output.Wait()
+ merges.Wait()
return nil
}
// was false during import
continue
}
+ if len(genometile.Sequence) > cmd.maxTileSize {
+ continue
+ }
refSequence := reftile.Sequence
// If needed, extend the reference
// sequence up to the tag at the end
// of the genometile sequence.
refstepend := refstep + 1
- for refstepend < len(reftiles) && len(refSequence) >= taglen && !bytes.EqualFold(refSequence[len(refSequence)-taglen:], genometile.Sequence[len(genometile.Sequence)-taglen:]) {
+ for refstepend < len(reftiles) && len(refSequence) >= taglen && !bytes.EqualFold(refSequence[len(refSequence)-taglen:], genometile.Sequence[len(genometile.Sequence)-taglen:]) && len(refSequence) <= cmd.maxTileSize {
if &refSequence[0] == &reftile.Sequence[0] {
refSequence = append([]byte(nil), refSequence...)
}