From: Tom Clegg Date: Mon, 2 Aug 2021 19:09:51 +0000 (-0400) Subject: Export hgvs one-hot numpy. X-Git-Url: https://git.arvados.org/lightning.git/commitdiff_plain/a70ac669647881534d00e333f23c5d99cf2eb771 Export hgvs one-hot numpy. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- diff --git a/export.go b/export.go index 5e0ecf43e1..5b0910054c 100644 --- a/export.go +++ b/export.go @@ -22,6 +22,8 @@ import ( "git.arvados.org/arvados.git/sdk/go/arvados" "github.com/arvados/lightning/hgvs" "github.com/klauspost/pgzip" + "github.com/kshedden/gonpy" + "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" ) @@ -30,26 +32,23 @@ type tvVariant struct { librefs map[tileLibRef]bool } -type outputFormat struct { - Filename string - Head func(out io.Writer, cgs []CompactGenome) - Print func(out io.Writer, seqname string, varslice []tvVariant) - PadLeft bool +type outputFormat interface { + Filename() string + PadLeft() bool + Head(out io.Writer, cgs []CompactGenome) error + Print(out io.Writer, seqname string, varslice []tvVariant) error + Finish(outdir string, out io.Writer, seqname string) error } -var ( - outputFormats = map[string]outputFormat{ - "hgvs-onehot": outputFormatHGVSOneHot, - "hgvs": outputFormatHGVS, - "pvcf": outputFormatPVCF, - "vcf": outputFormatVCF, - } - outputFormatHGVS = outputFormat{Filename: "out.tsv", Head: headNone, Print: printHGVS} - outputFormatHGVSOneHot = outputFormat{Filename: "out.tsv", Head: headNone, Print: printHGVSOneHot} - outputFormatPVCF = outputFormat{Filename: "out.vcf", Head: headPVCF, Print: printPVCF, PadLeft: true} - outputFormatVCF = outputFormat{Filename: "out.vcf", Head: headVCF, Print: printVCF, PadLeft: true} - headNone = func(io.Writer, []CompactGenome) {} -) +var outputFormats = map[string]func() outputFormat{ + "hgvs-numpy": func() outputFormat { + return &formatHGVSNumpy{alleles: map[string][][]bool{}, variants: map[string][]hgvs.Variant{}} + }, + "hgvs-onehot": func() outputFormat { return formatHGVSOneHot{} }, + "hgvs": func() outputFormat { return formatHGVS{} }, + "pvcf": func() outputFormat { return formatPVCF{} }, + "vcf": func() outputFormat { return formatVCF{} }, +} type exporter struct { outputFormat outputFormat @@ -97,7 +96,7 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std err = fmt.Errorf("invalid output format %q", *outputFormatStr) return 2 } else { - cmd.outputFormat = f + cmd.outputFormat = f() } if *pprof != "" { @@ -191,7 +190,7 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std } defer f.Close() for i, name := range names { - _, err = fmt.Fprintf(f, "%d,%q,%q\n", i, trimFilenameForLabel(name), cmd.outputFormat.Filename) + _, err = fmt.Fprintf(f, "%d,%q,%q\n", i, trimFilenameForLabel(name), cmd.outputFormat.Filename()) if err != nil { err = fmt.Errorf("write %s: %w", *labelsFilename, err) return 1 @@ -283,7 +282,7 @@ func (cmd *exporter) export(outdir string, bedout io.Writer, tilelib *tileLibrar } if cmd.outputPerChrom { for i, seqname := range seqnames { - fnm := filepath.Join(outdir, strings.Replace(cmd.outputFormat.Filename, ".", "."+seqname+".", 1)) + fnm := filepath.Join(outdir, strings.Replace(cmd.outputFormat.Filename(), ".", "."+seqname+".", 1)) if cmd.compress { fnm += ".gz" } @@ -299,10 +298,13 @@ func (cmd *exporter) export(outdir string, bedout io.Writer, tilelib *tileLibrar defer z.Close() outw[i] = z } - cmd.outputFormat.Head(outw[i], cgs) + err = cmd.outputFormat.Head(outw[i], cgs) + if err != nil { + return err + } } } else { - fnm := filepath.Join(outdir, cmd.outputFormat.Filename) + fnm := filepath.Join(outdir, cmd.outputFormat.Filename()) if cmd.compress { fnm += ".gz" } @@ -338,8 +340,13 @@ func (cmd *exporter) export(outdir string, bedout io.Writer, tilelib *tileLibrar defer bedw.Close() } outwb := bufio.NewWriterSize(outw, 8*1024*1024) - cmd.exportSeq(outwb, bedw, tilelib.taglib.keylen, seqname, refseq[seqname], tilelib, cgs) - err := outwb.Flush() + eachVariant(bedw, tilelib.taglib.keylen, seqname, refseq[seqname], tilelib, cgs, cmd.outputFormat.PadLeft(), cmd.maxTileSize, func(varslice []tvVariant) { + err := cmd.outputFormat.Print(outwb, seqname, varslice) + throttle.Report(err) + }) + err := cmd.outputFormat.Finish(outdir, outwb, seqname) + throttle.Report(err) + err = outwb.Flush() throttle.Report(err) err = outw.Close() throttle.Report(err) @@ -351,9 +358,9 @@ func (cmd *exporter) export(outdir string, bedout io.Writer, tilelib *tileLibrar return throttle.Err() } -// Align genome tiles to reference tiles, write diffs to outw, and (if -// bedw is not nil) write tile coverage to bedw. -func (cmd *exporter) exportSeq(outw, bedw io.Writer, taglen int, seqname string, reftiles []tileLibRef, tilelib *tileLibrary, cgs []CompactGenome) { +// Align genome tiles to reference tiles, call callback func on each +// variant, and (if bedw is not nil) write tile coverage to bedw. +func eachVariant(bedw io.Writer, taglen int, seqname string, reftiles []tileLibRef, tilelib *tileLibrary, cgs []CompactGenome, padLeft bool, maxTileSize int, callback func(varslice []tvVariant)) { t0 := time.Now() progressbar := time.NewTicker(time.Minute) defer progressbar.Stop() @@ -400,7 +407,7 @@ func (cmd *exporter) exportSeq(outw, bedw io.Writer, taglen int, seqname string, // was false during import continue } - if len(genomeseq) > cmd.maxTileSize { + if len(genomeseq) > maxTileSize { continue } refSequence := refseq @@ -409,7 +416,7 @@ func (cmd *exporter) exportSeq(outw, bedw io.Writer, taglen int, seqname string, // the tag at the end of the // genomeseq sequence. refstepend := refstep + 1 - for refstepend < len(reftiles) && len(refSequence) >= taglen && !bytes.EqualFold(refSequence[len(refSequence)-taglen:], genomeseq[len(genomeseq)-taglen:]) && len(refSequence) <= cmd.maxTileSize { + for refstepend < len(reftiles) && len(refSequence) >= taglen && !bytes.EqualFold(refSequence[len(refSequence)-taglen:], genomeseq[len(genomeseq)-taglen:]) && len(refSequence) <= maxTileSize { if &refSequence[0] == &refseq[0] { refSequence = append([]byte(nil), refSequence...) } @@ -417,7 +424,7 @@ func (cmd *exporter) exportSeq(outw, bedw io.Writer, taglen int, seqname string, refstepend++ } // (TODO: handle no-calls) - if len(refSequence) <= cmd.maxTileSize { + if len(refSequence) <= maxTileSize { refstr := strings.ToUpper(string(refSequence)) genomestr := strings.ToUpper(string(genomeseq)) vars, _ = hgvs.Diff(refstr, genomestr, time.Second) @@ -425,7 +432,7 @@ func (cmd *exporter) exportSeq(outw, bedw io.Writer, taglen int, seqname string, diffs[glibref] = vars } for _, v := range vars { - if cmd.outputFormat.PadLeft { + if padLeft { v = v.PadLeft() } v.Position += refpos @@ -471,7 +478,7 @@ func (cmd *exporter) exportSeq(outw, bedw io.Writer, taglen int, seqname string, go func() { defer outmtx.Unlock() for _, varslice := range flushvariants { - cmd.outputFormat.Print(outw, seqname, varslice) + callback(varslice) } }() if bedw != nil && len(refseq) > 0 { @@ -517,11 +524,16 @@ func bucketVarsliceByRef(varslice []tvVariant) map[string]map[string]int { return byref } -func headVCF(out io.Writer, cgs []CompactGenome) { - fmt.Fprint(out, "#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\n") -} +type formatVCF struct{} -func printVCF(out io.Writer, seqname string, varslice []tvVariant) { +func (formatVCF) Filename() string { return "out.vcf" } +func (formatVCF) PadLeft() bool { return true } +func (formatVCF) Finish(string, io.Writer, string) error { return nil } +func (formatVCF) Head(out io.Writer, cgs []CompactGenome) error { + _, err := fmt.Fprint(out, "#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\n") + return err +} +func (formatVCF) Print(out io.Writer, seqname string, varslice []tvVariant) error { for ref, alts := range bucketVarsliceByRef(varslice) { altslice := make([]string, 0, len(alts)) for alt := range alts { @@ -536,20 +548,30 @@ func printVCF(out io.Writer, seqname string, varslice []tvVariant) { } info += strconv.Itoa(alts[a]) } - fmt.Fprintf(out, "%s\t%d\t.\t%s\t%s\t.\t.\t%s\n", seqname, varslice[0].Position, ref, strings.Join(altslice, ","), info) + _, err := fmt.Fprintf(out, "%s\t%d\t.\t%s\t%s\t.\t.\t%s\n", seqname, varslice[0].Position, ref, strings.Join(altslice, ","), info) + if err != nil { + return err + } } + return nil } -func headPVCF(out io.Writer, cgs []CompactGenome) { +type formatPVCF struct{} + +func (formatPVCF) Filename() string { return "out.vcf" } +func (formatPVCF) PadLeft() bool { return true } +func (formatPVCF) Finish(string, io.Writer, string) error { return nil } +func (formatPVCF) Head(out io.Writer, cgs []CompactGenome) error { fmt.Fprintln(out, `##FORMAT=`) fmt.Fprintf(out, "#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT") for _, cg := range cgs { fmt.Fprintf(out, "\t%s", cg.Name) } - fmt.Fprintf(out, "\n") + _, err := fmt.Fprintf(out, "\n") + return err } -func printPVCF(out io.Writer, seqname string, varslice []tvVariant) { +func (formatPVCF) Print(out io.Writer, seqname string, varslice []tvVariant) error { for ref, alts := range bucketVarsliceByRef(varslice) { altslice := make([]string, 0, len(alts)) for alt := range alts { @@ -559,7 +581,10 @@ func printPVCF(out io.Writer, seqname string, varslice []tvVariant) { for i, a := range altslice { alts[a] = i + 1 } - fmt.Fprintf(out, "%s\t%d\t.\t%s\t%s\t.\t.\t.\tGT", seqname, varslice[0].Position, ref, strings.Join(altslice, ",")) + _, err := fmt.Fprintf(out, "%s\t%d\t.\t%s\t%s\t.\t.\t.\tGT", seqname, varslice[0].Position, ref, strings.Join(altslice, ",")) + if err != nil { + return err + } for i := 0; i < len(varslice); i += 2 { v1, v2 := varslice[i], varslice[i+1] a1, a2 := alts[v1.New], alts[v2.New] @@ -572,13 +597,26 @@ func printPVCF(out io.Writer, seqname string, varslice []tvVariant) { if v2.Ref != ref { a2 = 0 } - fmt.Fprintf(out, "\t%d/%d", a1, a2) + _, err := fmt.Fprintf(out, "\t%d/%d", a1, a2) + if err != nil { + return err + } + } + _, err = out.Write([]byte{'\n'}) + if err != nil { + return err } - out.Write([]byte{'\n'}) } + return nil } -func printHGVS(out io.Writer, seqname string, varslice []tvVariant) { +type formatHGVS struct{} + +func (formatHGVS) Filename() string { return "out.tsv" } +func (formatHGVS) PadLeft() bool { return false } +func (formatHGVS) Head(out io.Writer, cgs []CompactGenome) error { return nil } +func (formatHGVS) Finish(string, io.Writer, string) error { return nil } +func (formatHGVS) Print(out io.Writer, seqname string, varslice []tvVariant) error { for i := 0; i < len(varslice)/2; i++ { if i > 0 { out.Write([]byte{'\t'}) @@ -586,18 +624,34 @@ func printHGVS(out io.Writer, seqname string, varslice []tvVariant) { var1, var2 := varslice[i*2], varslice[i*2+1] if var1.Variant == var2.Variant { if var1.Ref == var1.New { - out.Write([]byte{'.'}) + _, err := out.Write([]byte{'.'}) + if err != nil { + return err + } } else { - fmt.Fprintf(out, "%s:g.%s", seqname, var1.String()) + _, err := fmt.Fprintf(out, "%s:g.%s", seqname, var1.String()) + if err != nil { + return err + } } } else { - fmt.Fprintf(out, "%s:g.[%s];[%s]", seqname, var1.String(), var2.String()) + _, err := fmt.Fprintf(out, "%s:g.[%s];[%s]", seqname, var1.String(), var2.String()) + if err != nil { + return err + } } } - out.Write([]byte{'\n'}) + _, err := out.Write([]byte{'\n'}) + return err } -func printHGVSOneHot(out io.Writer, seqname string, varslice []tvVariant) { +type formatHGVSOneHot struct{} + +func (formatHGVSOneHot) Filename() string { return "out.tsv" } +func (formatHGVSOneHot) PadLeft() bool { return false } +func (formatHGVSOneHot) Head(out io.Writer, cgs []CompactGenome) error { return nil } +func (formatHGVSOneHot) Finish(string, io.Writer, string) error { return nil } +func (formatHGVSOneHot) Print(out io.Writer, seqname string, varslice []tvVariant) error { vars := map[hgvs.Variant]bool{} for _, v := range varslice { if v.Ref != v.New { @@ -621,6 +675,113 @@ func printHGVSOneHot(out io.Writer, seqname string, varslice []tvVariant) { out.Write([]byte("\t0")) } } - out.Write([]byte{'\n'}) + _, err := out.Write([]byte{'\n'}) + if err != nil { + return err + } + } + return nil +} + +type formatHGVSNumpy struct { + sync.Mutex + variants map[string][]hgvs.Variant // variants[seqname][variantidx] + alleles map[string][][]bool // alleles[seqname][variantidx][genomeidx*2+phase] +} + +func (*formatHGVSNumpy) Filename() string { return "matrix.npy" } +func (*formatHGVSNumpy) PadLeft() bool { return false } +func (*formatHGVSNumpy) Head(out io.Writer, cgs []CompactGenome) error { return nil } +func (f *formatHGVSNumpy) Print(_ io.Writer, seqname string, varslice []tvVariant) error { + // sort variants to ensure output is deterministic + sorted := make([]hgvs.Variant, 0, len(varslice)) + for _, v := range varslice { + sorted = append(sorted, v.Variant) + } + sort.Slice(sorted, func(a, b int) bool { return hgvs.Less(sorted[a], sorted[b]) }) + + f.Lock() + defer f.Unlock() + + seqvariants := f.variants[seqname] + seqalleles := f.alleles[seqname] + + // append a row to seqvariants and seqalleles for each unique + // non-ref variant in varslice. + var previous hgvs.Variant + for _, v := range sorted { + if previous == v || v.Ref == v.New { + continue + } + previous = v + newrow := make([]bool, len(varslice)) + for i, allele := range varslice { + if allele.Variant == v { + newrow[i] = true + } + } + seqalleles = append(seqalleles, newrow) + seqvariants = append(seqvariants, v) } + f.variants[seqname] = seqvariants + f.alleles[seqname] = seqalleles + return nil +} +func (f *formatHGVSNumpy) Finish(outdir string, outw io.Writer, seqname string) error { + // Write seqname's data to a .npy matrix with one row per + // genome and 2 columns per variant. + seqvariants := f.variants[seqname] + seqalleles := f.alleles[seqname] + if len(seqalleles) == 0 { + return nil + } + out := make([]int8, len(seqalleles)*len(seqalleles[0])) + rows := len(seqalleles[0]) / 2 + cols := len(seqalleles) * 2 + // copy seqalleles[varidx][genome*2+phase] to + // out[genome*nvars*2 + varidx*2 + phase] + for varidx, alleles := range seqalleles { + for g := 0; g < len(alleles)/2; g++ { + aa, ab := alleles[g*2], alleles[g*2+1] + if aa && ab { + // hom + out[g*cols+varidx*2] = 1 + } else if aa || ab { + // het + out[g*cols+varidx*2+1] = 1 + } + } + } + bufw := bufio.NewWriter(outw) + npw, err := gonpy.NewWriter(nopCloser{bufw}) + if err != nil { + return err + } + log.WithFields(logrus.Fields{ + "seqname": seqname, + "rows": rows, + "cols": cols, + }).Info("writing numpy") + npw.Shape = []int{rows, cols} + npw.WriteInt8(out) + err = bufw.Flush() + if err != nil { + return err + } + + // Write annotations + csv, err := os.OpenFile(outdir+"/annotations."+seqname+".csv", os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0777) + if err != nil { + return err + } + defer csv.Close() + for i, v := range seqvariants { + fmt.Fprintf(csv, "%d,%q\n", i, seqname+"."+v.String()) + } + err = csv.Close() + if err != nil { + return err + } + + return nil } diff --git a/export_test.go b/export_test.go index 2b7772c382..959bd85d1b 100644 --- a/export_test.go +++ b/export_test.go @@ -6,6 +6,7 @@ import ( "os" "os/exec" + "github.com/kshedden/gonpy" "gopkg.in/check.v1" ) @@ -126,4 +127,61 @@ chr2 315 . C A . . AC=1 chr2 469 . GTGG G . . AC=1 chr2 471 . GG AA . . AC=1 `)) + + outdir := c.MkDir() + exited = (&exporter{}).RunCommand("export", []string{ + "-local=true", + "-input-dir=" + tmpdir, + "-output-dir=" + outdir, + "-output-format=hgvs-numpy", + "-ref=testdata/ref.fasta", + }, &buffer, os.Stderr, os.Stderr) + c.Check(exited, check.Equals, 0) + + f, err := os.Open(outdir + "/matrix.chr1.npy") + c.Assert(err, check.IsNil) + defer f.Close() + npy, err := gonpy.NewReader(f) + c.Assert(err, check.IsNil) + variants, err := npy.GetInt8() + c.Assert(err, check.IsNil) + c.Check(variants, check.HasLen, 6*2*2) // 6 variants * 2 alleles * 2 genomes + c.Check(variants, check.DeepEquals, []int8{ + 1, 0, 0, 1, 0, 1, 0, 1, 1, 0, 0, 1, // input1.1.fasta + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // input2.1.fasta + }) + + f, err = os.Open(outdir + "/matrix.chr2.npy") + c.Assert(err, check.IsNil) + defer f.Close() + npy, err = gonpy.NewReader(f) + c.Assert(err, check.IsNil) + variants, err = npy.GetInt8() + c.Assert(err, check.IsNil) + c.Check(variants, check.HasLen, 7*2*2) // 6 variants * 2 alleles * 2 genomes + c.Check(variants, check.DeepEquals, []int8{ + 0, 0, 0, 0, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, // input1.1.fasta + 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // input2.1.fasta + }) + + annotations, err := ioutil.ReadFile(outdir + "/annotations.chr1.csv") + c.Check(err, check.IsNil) + c.Logf("%s", string(annotations)) + c.Check(string(annotations), check.Equals, `0,"chr1.1_3delinsGGC" +1,"chr1.41_42delinsAA" +2,"chr1.161A>T" +3,"chr1.178A>T" +4,"chr1.222_224del" +5,"chr1.302_305delinsAAAA" +`) + annotations, err = ioutil.ReadFile(outdir + "/annotations.chr2.csv") + c.Check(err, check.IsNil) + c.Check(string(annotations), check.Equals, `0,"chr2.1_3delinsAAA" +1,"chr2.125_127delinsAAA" +2,"chr2.241_254del" +3,"chr2.258_269delinsAA" +4,"chr2.315C>A" +5,"chr2.470_472del" +6,"chr2.471_472delinsAA" +`) }