X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/1c140a306fb88fc24d0e6b5742e7afd127b2a000..ff57cd8ae83f3ea464772d484fdbb26b7e79f4db:/export.go diff --git a/export.go b/export.go index 67d7d56e6c..38fbeacb52 100644 --- a/export.go +++ b/export.go @@ -21,13 +21,19 @@ import ( "git.arvados.org/arvados.git/sdk/go/arvados" "github.com/arvados/lightning/hgvs" + "github.com/klauspost/pgzip" log "github.com/sirupsen/logrus" ) +type tvVariant struct { + hgvs.Variant + librefs map[tileLibRef]bool +} + type outputFormat struct { Filename string Head func(out io.Writer, cgs []CompactGenome) - Print func(out io.Writer, seqname string, varslice []hgvs.Variant) + Print func(out io.Writer, seqname string, varslice []tvVariant) PadLeft bool } @@ -48,6 +54,7 @@ var ( type exporter struct { outputFormat outputFormat outputPerChrom bool + compress bool maxTileSize int } @@ -71,6 +78,7 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std outputFormatStr := flags.String("output-format", "hgvs", "output `format`: hgvs, pvcf, or vcf") outputBed := flags.String("output-bed", "", "also output bed `file`") flags.BoolVar(&cmd.outputPerChrom, "output-per-chromosome", true, "output one file per chromosome") + flags.BoolVar(&cmd.compress, "z", false, "write gzip-compressed output files") labelsFilename := flags.String("output-labels", "", "also output genome labels csv `file`") flags.IntVar(&cmd.maxTileSize, "max-tile-size", 50000, "don't try to make annotations for tiles bigger than given `size`") err = flags.Parse(args) @@ -137,6 +145,7 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std "-max-tile-size", fmt.Sprintf("%d", cmd.maxTileSize), "-input-dir", *inputDir, "-output-dir", "/mnt/output", + "-z=" + fmt.Sprintf("%v", cmd.compress), } var output string output, err = runner.Run() @@ -274,23 +283,41 @@ func (cmd *exporter) export(outdir string, bedout io.Writer, tilelib *tileLibrar } if cmd.outputPerChrom { for i, seqname := range seqnames { - f, err := os.OpenFile(filepath.Join(outdir, strings.Replace(cmd.outputFormat.Filename, ".", "."+seqname+".", 1)), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666) + fnm := filepath.Join(outdir, strings.Replace(cmd.outputFormat.Filename, ".", "."+seqname+".", 1)) + if cmd.compress { + fnm += ".gz" + } + f, err := os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666) if err != nil { return err } defer f.Close() log.Infof("writing %q", f.Name()) - cmd.outputFormat.Head(f, cgs) outw[i] = f + if cmd.compress { + z := pgzip.NewWriter(f) + defer z.Close() + outw[i] = z + } + cmd.outputFormat.Head(outw[i], cgs) } } else { fnm := filepath.Join(outdir, cmd.outputFormat.Filename) - log.Infof("writing %q", fnm) - out, err := os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666) + if cmd.compress { + fnm += ".gz" + } + f, err := os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666) if err != nil { return err } - defer out.Close() + defer f.Close() + log.Infof("writing %q", fnm) + var out io.Writer = f + if cmd.compress { + z := pgzip.NewWriter(out) + defer z.Close() + out = z + } cmd.outputFormat.Head(out, cgs) merge(out, outw, "output") } @@ -310,16 +337,18 @@ func (cmd *exporter) export(outdir string, bedout io.Writer, tilelib *tileLibrar if bedw != nil { defer bedw.Close() } - defer outw.Close() outwb := bufio.NewWriterSize(outw, 8*1024*1024) - defer outwb.Flush() cmd.exportSeq(outwb, bedw, tilelib.taglib.keylen, seqname, refseq[seqname], tilelib, cgs) + err := outwb.Flush() + throttle.Report(err) + err = outw.Close() + throttle.Report(err) }() } merges.Wait() throttle.Wait() - return nil + return throttle.Err() } // Align genome tiles to reference tiles, write diffs to outw, and (if @@ -331,7 +360,7 @@ func (cmd *exporter) exportSeq(outw, bedw io.Writer, taglen int, seqname string, var outmtx sync.Mutex defer outmtx.Lock() refpos := 0 - variantAt := map[int][]hgvs.Variant{} // variantAt[chromOffset][genomeIndex*2+phase] + variantAt := map[int][]tvVariant{} // variantAt[chromOffset][genomeIndex*2+phase] for refstep, libref := range reftiles { select { case <-progressbar.C: @@ -388,9 +417,11 @@ func (cmd *exporter) exportSeq(outw, bedw io.Writer, taglen int, seqname string, refstepend++ } // (TODO: handle no-calls) - refstr := strings.ToUpper(string(refSequence)) - genomestr := strings.ToUpper(string(genomeseq)) - vars, _ = hgvs.Diff(refstr, genomestr, time.Second) + if len(refSequence) <= cmd.maxTileSize { + refstr := strings.ToUpper(string(refSequence)) + genomestr := strings.ToUpper(string(genomeseq)) + vars, _ = hgvs.Diff(refstr, genomestr, time.Second) + } diffs[glibref] = vars } for _, v := range vars { @@ -400,10 +431,15 @@ func (cmd *exporter) exportSeq(outw, bedw io.Writer, taglen int, seqname string, v.Position += refpos varslice := variantAt[v.Position] if varslice == nil { - varslice = make([]hgvs.Variant, len(cgs)*2) + varslice = make([]tvVariant, len(cgs)*2) variantAt[v.Position] = varslice } - varslice[cgidx*2+phase] = v + varslice[cgidx*2+phase].Variant = v + if varslice[cgidx*2+phase].librefs == nil { + varslice[cgidx*2+phase].librefs = map[tileLibRef]bool{glibref: true} + } else { + varslice[cgidx*2+phase].librefs[glibref] = true + } } } } @@ -420,7 +456,7 @@ func (cmd *exporter) exportSeq(outw, bedw io.Writer, taglen int, seqname string, } } sort.Slice(flushpos, func(i, j int) bool { return flushpos[i] < flushpos[j] }) - flushvariants := make([][]hgvs.Variant, len(flushpos)) + flushvariants := make([][]tvVariant, len(flushpos)) for i, pos := range flushpos { varslice := variantAt[pos] delete(variantAt, pos) @@ -465,7 +501,7 @@ func (cmd *exporter) exportSeq(outw, bedw io.Writer, taglen int, seqname string, } } -func bucketVarsliceByRef(varslice []hgvs.Variant) map[string]map[string]int { +func bucketVarsliceByRef(varslice []tvVariant) map[string]map[string]int { byref := map[string]map[string]int{} for _, v := range varslice { if v.Ref == "" && v.New == "" { @@ -485,7 +521,7 @@ func headVCF(out io.Writer, cgs []CompactGenome) { fmt.Fprint(out, "#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\n") } -func printVCF(out io.Writer, seqname string, varslice []hgvs.Variant) { +func printVCF(out io.Writer, seqname string, varslice []tvVariant) { for ref, alts := range bucketVarsliceByRef(varslice) { altslice := make([]string, 0, len(alts)) for alt := range alts { @@ -513,7 +549,7 @@ func headPVCF(out io.Writer, cgs []CompactGenome) { fmt.Fprintf(out, "\n") } -func printPVCF(out io.Writer, seqname string, varslice []hgvs.Variant) { +func printPVCF(out io.Writer, seqname string, varslice []tvVariant) { for ref, alts := range bucketVarsliceByRef(varslice) { altslice := make([]string, 0, len(alts)) for alt := range alts { @@ -542,13 +578,13 @@ func printPVCF(out io.Writer, seqname string, varslice []hgvs.Variant) { } } -func printHGVS(out io.Writer, seqname string, varslice []hgvs.Variant) { +func printHGVS(out io.Writer, seqname string, varslice []tvVariant) { for i := 0; i < len(varslice)/2; i++ { if i > 0 { out.Write([]byte{'\t'}) } var1, var2 := varslice[i*2], varslice[i*2+1] - if var1 == var2 { + if var1.Variant == var2.Variant { if var1.Ref == var1.New { out.Write([]byte{'.'}) } else { @@ -561,11 +597,11 @@ func printHGVS(out io.Writer, seqname string, varslice []hgvs.Variant) { out.Write([]byte{'\n'}) } -func printHGVSOneHot(out io.Writer, seqname string, varslice []hgvs.Variant) { +func printHGVSOneHot(out io.Writer, seqname string, varslice []tvVariant) { vars := map[hgvs.Variant]bool{} for _, v := range varslice { if v.Ref != v.New { - vars[v] = true + vars[v.Variant] = true } } @@ -579,7 +615,7 @@ func printHGVSOneHot(out io.Writer, seqname string, varslice []hgvs.Variant) { for _, v := range sorted { fmt.Fprintf(out, "%s.%s", seqname, v.String()) for i := 0; i < len(varslice); i += 2 { - if varslice[i] == v || varslice[i+1] == v { + if varslice[i].Variant == v || varslice[i+1].Variant == v { out.Write([]byte("\t1")) } else { out.Write([]byte("\t0"))