From: Tom Clegg Date: Fri, 7 May 2021 13:47:18 +0000 (-0400) Subject: Export hgvs-onehot. X-Git-Url: https://git.arvados.org/lightning.git/commitdiff_plain/2ce4195e773941a3f625e5e2d5f0a36aa082ecf2 Export hgvs-onehot. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- diff --git a/export.go b/export.go index 19ff41950e..4dbef1910b 100644 --- a/export.go +++ b/export.go @@ -11,9 +11,9 @@ import ( "net/http" _ "net/http/pprof" "os" + "path" "sort" "strings" - "sync" "time" "git.arvados.org/arvados.git/sdk/go/arvados" @@ -28,11 +28,13 @@ type outputFormat struct { var ( outputFormats = map[string]outputFormat{ - "hgvs": outputFormatHGVS, - "vcf": outputFormatVCF, + "hgvs-onehot": outputFormatHGVSOneHot, + "hgvs": outputFormatHGVS, + "vcf": outputFormatVCF, } - outputFormatHGVS = outputFormat{Print: printHGVS} - outputFormatVCF = outputFormat{Print: printVCF, PadLeft: true} + outputFormatHGVS = outputFormat{Print: printHGVS} + outputFormatHGVSOneHot = outputFormat{Print: printHGVSOneHot} + outputFormatVCF = outputFormat{Print: printVCF, PadLeft: true} ) type exporter struct { @@ -57,7 +59,7 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std outputFilename := flags.String("o", "-", "output `file`") outputFormatStr := flags.String("output-format", "hgvs", "output `format`: hgvs or vcf") outputBed := flags.String("output-bed", "", "also output bed `file`") - pick := flags.String("pick", "", "`name` of single genome to export") + labelsFilename := flags.String("output-labels", "", "also output genome labels csv `file`") err = flags.Parse(args) if err == flag.ErrHelp { err = nil @@ -103,7 +105,14 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std } *outputBed = "/mnt/output/" + *outputBed } - runner.Args = []string{"export", "-local=true", "-pick", *pick, "-ref", *refname, "-output-format", *outputFormatStr, "-output-bed", *outputBed, "-i", *inputFilename, "-o", "/mnt/output/export.csv"} + runner.Args = []string{"export", "-local=true", + "-ref", *refname, + "-output-format", *outputFormatStr, + "-output-bed", *outputBed, + "-output-labels", "/mnt/output/labels.csv", + "-i", *inputFilename, + "-o", "/mnt/output/export.csv", + } var output string output, err = runner.Run() if err != nil { @@ -113,11 +122,16 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std return 0 } - input, err := os.Open(*inputFilename) + in, err := open(*inputFilename) if err != nil { return 1 } - defer input.Close() + defer in.Close() + input, ok := in.(io.ReadSeeker) + if !ok { + err = fmt.Errorf("%s: %T cannot seek", *inputFilename, in) + return 1 + } // Error out early if seeking doesn't work on the input file. _, err = input.Seek(0, io.SeekEnd) @@ -129,25 +143,15 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std return 1 } - var mtx sync.Mutex var cgs []CompactGenome - tilelib := tileLibrary{ - retainNoCalls: true, + tilelib := &tileLibrary{ + retainNoCalls: true, + compactGenomes: map[string][]tileVariantID{}, } - err = tilelib.LoadGob(context.Background(), input, strings.HasSuffix(*inputFilename, ".gz"), func(cg CompactGenome) { - if *pick != "" && *pick != cg.Name { - return - } - log.Debugf("export: pick %q", cg.Name) - mtx.Lock() - defer mtx.Unlock() - cgs = append(cgs, cg) - }) + err = tilelib.LoadGob(context.Background(), input, strings.HasSuffix(*inputFilename, ".gz"), nil) if err != nil { return 1 } - sort.Slice(cgs, func(i, j int) bool { return cgs[i].Name < cgs[j].Name }) - log.Printf("export: pick %q => %d genomes", *pick, len(cgs)) refseq, ok := tilelib.refseqs[*refname] if !ok { @@ -160,6 +164,33 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std return 1 } + names := cgnames(tilelib) + for _, name := range names { + cgs = append(cgs, CompactGenome{Name: name, Variants: tilelib.compactGenomes[name]}) + } + if *labelsFilename != "" { + log.Infof("writing labels to %s", *labelsFilename) + var f *os.File + f, err = os.OpenFile(*labelsFilename, os.O_CREATE|os.O_WRONLY, 0777) + if err != nil { + return 1 + } + defer f.Close() + _, outBasename := path.Split(*outputFilename) + for i, name := range names { + _, err = fmt.Fprintf(f, "%d,%q,%q\n", i, trimFilenameForLabel(name), outBasename) + if err != nil { + err = fmt.Errorf("write %s: %w", *labelsFilename, err) + return 1 + } + } + err = f.Close() + if err != nil { + err = fmt.Errorf("close %s: %w", *labelsFilename, err) + return 1 + } + } + _, err = input.Seek(0, io.SeekStart) if err != nil { return 1 @@ -188,7 +219,7 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std bedbufw = bufio.NewWriter(bedout) } - err = cmd.export(bufw, bedout, input, strings.HasSuffix(*inputFilename, ".gz"), tilelib.taglib.keylen, refseq, cgs) + err = cmd.export(bufw, bedout, input, strings.HasSuffix(*inputFilename, ".gz"), tilelib, refseq, cgs) if err != nil { return 1 } @@ -210,14 +241,14 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std return 1 } } - err = input.Close() + err = in.Close() if err != nil { return 1 } return 0 } -func (cmd *exporter) export(out, bedout io.Writer, librdr io.Reader, gz bool, taglen int, refseq map[string][]tileLibRef, cgs []CompactGenome) error { +func (cmd *exporter) export(out, bedout io.Writer, librdr io.Reader, gz bool, tilelib *tileLibrary, refseq map[string][]tileLibRef, cgs []CompactGenome) error { need := map[tileLibRef]bool{} var seqnames []string for seqname, librefs := range refseq { @@ -244,7 +275,7 @@ func (cmd *exporter) export(out, bedout io.Writer, librdr io.Reader, gz bool, ta tileVariant := map[tileLibRef]TileVariant{} err := DecodeLibrary(librdr, gz, func(ent *LibraryEntry) error { for _, tv := range ent.TileVariants { - libref := tileLibRef{Tag: tv.Tag, Variant: tv.Variant} + libref := tilelib.getRef(tv.Tag, tv.Sequence) if need[libref] { tileVariant[libref] = tv } @@ -285,7 +316,7 @@ func (cmd *exporter) export(out, bedout io.Writer, librdr io.Reader, gz bool, ta throttle.Acquire() go func() { defer throttle.Release() - cmd.exportSeq(outbuf, bedbuf, taglen, seqname, refseq[seqname], tileVariant, cgs) + cmd.exportSeq(outbuf, bedbuf, tilelib.taglib.keylen, seqname, refseq[seqname], tileVariant, cgs) log.Infof("assembled %q to outbuf %d bedbuf %d", seqname, outbuf.Len(), bedbuf.Len()) }() } @@ -360,7 +391,6 @@ func (cmd *exporter) exportSeq(outw, bedw io.Writer, taglen int, seqname string, v = v.PadLeft() } v.Position += refpos - log.Debugf("%s seq %s phase %d tag %d tile diff %s\n", cg.Name, seqname, phase, libref.Tag, v.String()) varslice := variantAt[v.Position] if varslice == nil { varslice = make([]hgvs.Variant, len(cgs)*2) @@ -476,3 +506,31 @@ func printHGVS(out io.Writer, seqname string, varslice []hgvs.Variant) { } out.Write([]byte{'\n'}) } + +func printHGVSOneHot(out io.Writer, seqname string, varslice []hgvs.Variant) { + vars := map[hgvs.Variant]bool{} + for _, v := range varslice { + if v.Ref != v.New { + vars[v] = true + } + } + + // sort variants to ensure output is deterministic + sorted := make([]hgvs.Variant, 0, len(vars)) + for v := range vars { + sorted = append(sorted, v) + } + sort.Slice(sorted, func(a, b int) bool { return hgvs.Less(sorted[a], sorted[b]) }) + + for _, v := range sorted { + fmt.Fprintf(out, "%s.%s", seqname, v.String()) + for i := 0; i < len(varslice); i += 2 { + if varslice[i] == v || varslice[i+1] == v { + out.Write([]byte("\t1")) + } else { + out.Write([]byte("\t0")) + } + } + out.Write([]byte{'\n'}) + } +} diff --git a/export_test.go b/export_test.go new file mode 100644 index 0000000000..2a020b9eba --- /dev/null +++ b/export_test.go @@ -0,0 +1,53 @@ +package lightning + +import ( + "bytes" + "io/ioutil" + "os" + + "gopkg.in/check.v1" +) + +type exportSuite struct{} + +var _ = check.Suite(&exportSuite{}) + +func (s *exportSuite) TestFastaToHGVS(c *check.C) { + tmpdir := c.MkDir() + + err := ioutil.WriteFile(tmpdir+"/chr1-12-100.bed", []byte("chr1\t12\t100\ttest.1\n"), 0644) + c.Check(err, check.IsNil) + + var buffer bytes.Buffer + exited := (&importer{}).RunCommand("import", []string{"-local=true", "-tag-library", "testdata/tags", "-output-tiles", "-save-incomplete-tiles", "testdata/pipeline1", "testdata/ref.fasta"}, &bytes.Buffer{}, &buffer, os.Stderr) + c.Assert(exited, check.Equals, 0) + ioutil.WriteFile(tmpdir+"/library.gob", buffer.Bytes(), 0644) + var output bytes.Buffer + exited = (&exporter{}).RunCommand("export", []string{ + "-local=true", + "-i=" + tmpdir + "/library.gob", + "-output-format=hgvs-onehot", + "-output-labels=" + tmpdir + "/labels.csv", + "-ref=testdata/ref.fasta", + }, &buffer, &output, os.Stderr) + c.Check(exited, check.Equals, 0) + c.Check(output.String(), check.Equals, `chr1.1_3delinsGGC 1 0 +chr1.41_42delinsAA 1 0 +chr1.161A>T 1 0 +chr1.178A>T 1 0 +chr1.222_224del 1 0 +chr1.302_305delinsAAAA 1 0 +chr2.1_3delinsAAA 0 1 +chr2.125_127delinsAAA 0 1 +chr2.241_254del 1 0 +chr2.258_269delinsAA 1 0 +chr2.315C>A 1 0 +chr2.470_472del 1 0 +chr2.471_472delinsAA 1 0 +`) + labels, err := ioutil.ReadFile(tmpdir + "/labels.csv") + c.Check(err, check.IsNil) + c.Check(string(labels), check.Equals, `0,"input1","-" +1,"input2","-" +`) +} diff --git a/exportnumpy.go b/exportnumpy.go index 066480d448..fb4df0801d 100644 --- a/exportnumpy.go +++ b/exportnumpy.go @@ -73,7 +73,7 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, Name: "lightning export-numpy", Client: arvados.NewClientFromEnv(), ProjectUUID: *projectUUID, - RAM: 700000000000, + RAM: 750000000000, VCPUs: 32, Priority: *priority, KeepCache: 1, diff --git a/exportnumpy_test.go b/exportnumpy_test.go index ce5fd4063a..5011df01b4 100644 --- a/exportnumpy_test.go +++ b/exportnumpy_test.go @@ -9,11 +9,11 @@ import ( "gopkg.in/check.v1" ) -type exportSuite struct{} +type exportNumpySuite struct{} -var _ = check.Suite(&exportSuite{}) +var _ = check.Suite(&exportNumpySuite{}) -func (s *exportSuite) TestFastaToNumpy(c *check.C) { +func (s *exportNumpySuite) TestFastaToNumpy(c *check.C) { tmpdir := c.MkDir() err := ioutil.WriteFile(tmpdir+"/chr1-12-100.bed", []byte("chr1\t12\t100\ttest.1\n"), 0644) @@ -58,7 +58,7 @@ func sortUints(variants []int16) { } } -func (s *exportSuite) TestOnehot(c *check.C) { +func (s *exportNumpySuite) TestOnehot(c *check.C) { for _, trial := range []struct { incols int in []int16 diff --git a/hgvs/diff.go b/hgvs/diff.go index 5a6b625e6b..e04ccff587 100644 --- a/hgvs/diff.go +++ b/hgvs/diff.go @@ -129,3 +129,13 @@ func cleanup(in []diffmatchpatch.Diff) (out []diffmatchpatch.Diff) { } return } + +func Less(a, b Variant) bool { + if a.Position != b.Position { + return a.Position < b.Position + } else if a.New != b.New { + return a.New < b.New + } else { + return a.Ref < b.Ref + } +} diff --git a/import.go b/import.go index 7a8820620b..d229160a8b 100644 --- a/import.go +++ b/import.go @@ -42,6 +42,7 @@ type importer struct { outputStats string matchChromosome *regexp.Regexp encoder *gob.Encoder + retainAfterEncoding bool // keep imported genomes/refseqs in memory after writing to disk batchArgs } @@ -361,7 +362,6 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { var kept, dropped int variants[1], kept, dropped = tseqs.Variants() log.Printf("%s found %d unique tags plus %d repeats", infile2, kept, dropped) - return err } } else if fastaFilenameRe.MatchString(infile) { @@ -380,6 +380,16 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { totlen += len(tseq) } log.Printf("%s tiled %d seqs, total len %d", infile, len(tseqs), totlen) + + if cmd.retainAfterEncoding { + tilelib.mtx.Lock() + if tilelib.refseqs == nil { + tilelib.refseqs = map[string]map[string][]tileLibRef{} + } + tilelib.refseqs[infile] = tseqs + tilelib.mtx.Unlock() + } + return cmd.encoder.Encode(LibraryEntry{ CompactSequences: []CompactSequence{{Name: infile, TileSequences: tseqs}}, }) @@ -411,8 +421,9 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { if len(errs) > 0 { return } + variants := flatten(variants) err := cmd.encoder.Encode(LibraryEntry{ - CompactGenomes: []CompactGenome{{Name: infile, Variants: flatten(variants)}}, + CompactGenomes: []CompactGenome{{Name: infile, Variants: variants}}, }) if err != nil { select { @@ -420,6 +431,14 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { default: } } + if cmd.retainAfterEncoding { + tilelib.mtx.Lock() + if tilelib.compactGenomes == nil { + tilelib.compactGenomes = make(map[string][]tileVariantID) + } + tilelib.compactGenomes[infile] = variants + tilelib.mtx.Unlock() + } }() } go close(todo) diff --git a/tilelib.go b/tilelib.go index 455a526215..fbdbe6a8e4 100644 --- a/tilelib.go +++ b/tilelib.go @@ -248,6 +248,36 @@ func (tilelib *tileLibrary) LoadGob(ctx context.Context, rdr io.Reader, gz bool, return nil } +func (tilelib *tileLibrary) dump(out io.Writer) { + printTV := func(tag int, variant tileVariantID) { + if variant < 1 { + fmt.Fprintf(out, " -") + } else if tag >= len(tilelib.variant) { + fmt.Fprintf(out, " (!tag=%d)", tag) + } else if int(variant) > len(tilelib.variant[tag]) { + fmt.Fprintf(out, " (tag=%d,!variant=%d)", tag, variant) + } else { + fmt.Fprintf(out, " %x", tilelib.variant[tag][variant-1][:8]) + } + } + for refname, refseqs := range tilelib.refseqs { + for seqname, seq := range refseqs { + fmt.Fprintf(out, "ref %s %s", refname, seqname) + for _, libref := range seq { + printTV(int(libref.Tag), libref.Variant) + } + fmt.Fprintf(out, "\n") + } + } + for name, cg := range tilelib.compactGenomes { + fmt.Fprintf(out, "cg %s", name) + for tag, variant := range cg { + printTV(tag/2, variant) + } + fmt.Fprintf(out, "\n") + } +} + type importStats struct { InputFile string InputLabel string