From fd2049ccfde932448f570eeda6a1ecc1b663f8e0 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Tue, 4 Feb 2020 16:28:51 -0500 Subject: [PATCH] Split gvcf2numpy command into import and export-numpy. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- cmd.go | 3 +- exportnumpy.go | 79 +++++++++++++++ gvcf2numpy_test.go => exportnumpy_test.go | 16 ++-- gob.go | 39 ++++++++ gvcf2numpy.go => import.go | 112 +++++++++++----------- 5 files changed, 185 insertions(+), 64 deletions(-) create mode 100644 exportnumpy.go rename gvcf2numpy_test.go => exportnumpy_test.go (62%) create mode 100644 gob.go rename gvcf2numpy.go => import.go (79%) diff --git a/cmd.go b/cmd.go index 713e1d815d..1e86dc7010 100644 --- a/cmd.go +++ b/cmd.go @@ -16,7 +16,8 @@ var ( "-version": cmd.Version, "--version": cmd.Version, - "gvcf2numpy": &gvcf2numpy{}, + "import": &importer{}, + "export-numpy": &exportNumpy{}, "build-docker-image": &buildDockerImage{}, }) ) diff --git a/exportnumpy.go b/exportnumpy.go new file mode 100644 index 0000000000..d11b7431d5 --- /dev/null +++ b/exportnumpy.go @@ -0,0 +1,79 @@ +package main + +import ( + "bufio" + "flag" + "fmt" + "io" + "log" + "net/http" + _ "net/http/pprof" + + "github.com/kshedden/gonpy" +) + +type exportNumpy struct { + output io.Writer +} + +func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { + var err error + defer func() { + if err != nil { + fmt.Fprintf(stderr, "%s\n", err) + } + }() + flags := flag.NewFlagSet("", flag.ContinueOnError) + flags.SetOutput(stderr) + pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`") + err = flags.Parse(args) + if err == flag.ErrHelp { + err = nil + return 0 + } else if err != nil { + return 2 + } + cmd.output = stdout + + if *pprof != "" { + go func() { + log.Println(http.ListenAndServe(*pprof, nil)) + }() + } + + cgs, err := ReadCompactGenomes(stdin) + if err != nil { + return 1 + } + cols := 0 + for _, cg := range cgs { + if cols < len(cg.Variants) { + cols = len(cg.Variants) + } + } + rows := len(cgs) + out := make([]uint16, rows*cols) + for row, cg := range cgs { + for i, v := range cg.Variants { + out[row*cols+i] = uint16(v) + } + } + w := bufio.NewWriter(cmd.output) + npw, err := gonpy.NewWriter(nopCloser{w}) + if err != nil { + return 1 + } + npw.Shape = []int{rows, cols} + npw.WriteUint16(out) + err = w.Flush() + if err != nil { + return 1 + } + return 0 +} + +type nopCloser struct { + io.Writer +} + +func (nopCloser) Close() error { return nil } diff --git a/gvcf2numpy_test.go b/exportnumpy_test.go similarity index 62% rename from gvcf2numpy_test.go rename to exportnumpy_test.go index 21e08f9ac5..4bef7bd1e8 100644 --- a/gvcf2numpy_test.go +++ b/exportnumpy_test.go @@ -8,16 +8,18 @@ import ( "gopkg.in/check.v1" ) -type gvcf2numpySuite struct{} +type exportSuite struct{} -var _ = check.Suite(&gvcf2numpySuite{}) +var _ = check.Suite(&exportSuite{}) -func (s *gvcf2numpySuite) TestFastaToNumpy(c *check.C) { - var stdout bytes.Buffer - var cmd gvcf2numpy - exited := cmd.RunCommand("gvcf2numpy", []string{"-tag-library", "testdata/tags", "-ref", "testdata/ref", "testdata/a.1.fasta"}, &bytes.Buffer{}, &stdout, os.Stderr) +func (s *exportSuite) TestFastaToNumpy(c *check.C) { + var buffer bytes.Buffer + exited := (&importer{}).RunCommand("import", []string{"-tag-library", "testdata/tags", "-ref", "testdata/ref", "testdata/a.1.fasta"}, &bytes.Buffer{}, &buffer, os.Stderr) c.Check(exited, check.Equals, 0) - npy, err := gonpy.NewReader(&stdout) + var output bytes.Buffer + exited = (&exportNumpy{}).RunCommand("export-numpy", nil, &buffer, &output, os.Stderr) + c.Check(exited, check.Equals, 0) + npy, err := gonpy.NewReader(&output) c.Assert(err, check.IsNil) variants, err := npy.GetUint16() c.Assert(err, check.IsNil) diff --git a/gob.go b/gob.go new file mode 100644 index 0000000000..d7ac24e48f --- /dev/null +++ b/gob.go @@ -0,0 +1,39 @@ +package main + +import ( + "encoding/gob" + "io" + _ "net/http/pprof" + + "golang.org/x/crypto/blake2b" +) + +type CompactGenome struct { + Name string + Variants []tileVariantID +} + +type LibraryEntry struct { + TagSet [][]byte + CompactGenomes []CompactGenome + TileVariants []struct { + Tag tagID + Blake2b [blake2b.Size]byte + Sequence []byte + } +} + +func ReadCompactGenomes(rdr io.Reader) ([]CompactGenome, error) { + dec := gob.NewDecoder(rdr) + var ret []CompactGenome + for { + var ent LibraryEntry + err := dec.Decode(&ent) + if err == io.EOF { + return ret, nil + } else if err != nil { + return nil, err + } + ret = append(ret, ent.CompactGenomes...) + } +} diff --git a/gvcf2numpy.go b/import.go similarity index 79% rename from gvcf2numpy.go rename to import.go index 156e770912..2f9888ce39 100644 --- a/gvcf2numpy.go +++ b/import.go @@ -3,6 +3,7 @@ package main import ( "bufio" "compress/gzip" + "encoding/gob" "flag" "fmt" "io" @@ -19,17 +20,15 @@ import ( "sync" "sync/atomic" "time" - - "github.com/kshedden/gonpy" ) -type gvcf2numpy struct { +type importer struct { tagLibraryFile string refFile string - output io.Writer + encoder *gob.Encoder } -func (cmd *gvcf2numpy) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { +func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { var err error defer func() { if err != nil { @@ -54,7 +53,6 @@ func (cmd *gvcf2numpy) RunCommand(prog string, args []string, stdin io.Reader, s flags.Usage() return 2 } - cmd.output = stdout if *pprof != "" { go func() { @@ -76,18 +74,20 @@ func (cmd *gvcf2numpy) RunCommand(prog string, args []string, stdin io.Reader, s log.Printf("tilelib.Len() == %d", tilelib.Len()) } }() - variants, err := cmd.tileGVCFs(tilelib, infiles) + w := bufio.NewWriter(stdout) + cmd.encoder = gob.NewEncoder(w) + err = cmd.tileGVCFs(tilelib, infiles) if err != nil { return 1 } - err = cmd.printVariants(variants) + err = w.Flush() if err != nil { return 1 } return 0 } -func (cmd *gvcf2numpy) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, error) { +func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, error) { var input io.ReadCloser input, err := os.Open(infile) if err != nil { @@ -104,7 +104,7 @@ func (cmd *gvcf2numpy) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, return tilelib.TileFasta(infile, input) } -func (cmd *gvcf2numpy) loadTileLibrary() (*tileLibrary, error) { +func (cmd *importer) loadTileLibrary() (*tileLibrary, error) { log.Printf("tag library %s load starting", cmd.tagLibraryFile) f, err := os.Open(cmd.tagLibraryFile) if err != nil { @@ -174,49 +174,81 @@ func listInputFiles(paths []string) (files []string, err error) { return } -func (cmd *gvcf2numpy) tileGVCFs(tilelib *tileLibrary, infiles []string) ([][]tileVariantID, error) { +func (cmd *importer) tileGVCFs(tilelib *tileLibrary, infiles []string) error { starttime := time.Now() errs := make(chan error, 1) - variants := make([][]tileVariantID, len(infiles)*2) todo := make(chan func() error, len(infiles)*2) - var wg sync.WaitGroup - for i, infile := range infiles { - i, infile := i, infile + var encodeJobs sync.WaitGroup + for _, infile := range infiles { + infile := infile + var phases sync.WaitGroup + phases.Add(2) + variants := make([][]tileVariantID, 2) if strings.HasSuffix(infile, ".1.fasta") || strings.HasSuffix(infile, ".1.fasta.gz") { todo <- func() error { + defer phases.Done() log.Printf("%s starting", infile) defer log.Printf("%s done", infile) tseqs, err := cmd.tileFasta(tilelib, infile) - variants[i*2] = tseqs.Variants() + variants[0] = tseqs.Variants() return err } infile2 := regexp.MustCompile(`\.1\.fasta(\.gz)?$`).ReplaceAllString(infile, `.2.fasta$1`) todo <- func() error { + defer phases.Done() log.Printf("%s starting", infile2) defer log.Printf("%s done", infile2) tseqs, err := cmd.tileFasta(tilelib, infile2) - variants[i*2+1] = tseqs.Variants() + variants[1] = tseqs.Variants() return err } } else { for phase := 0; phase < 2; phase++ { phase := phase todo <- func() error { + defer phases.Done() log.Printf("%s phase %d starting", infile, phase+1) defer log.Printf("%s phase %d done", infile, phase+1) tseqs, err := cmd.tileGVCF(tilelib, infile, phase) - variants[i*2+phase] = tseqs.Variants() + variants[phase] = tseqs.Variants() return err } } } + encodeJobs.Add(1) + go func() { + defer encodeJobs.Done() + phases.Wait() + if len(errs) > 0 { + return + } + ntags := len(variants[0]) + if ntags < len(variants[1]) { + ntags = len(variants[1]) + } + flat := make([]tileVariantID, ntags*2) + for i := 0; i < ntags; i++ { + flat[i*2] = variants[0][i] + flat[i*2+1] = variants[1][i] + } + err := cmd.encoder.Encode(LibraryEntry{ + CompactGenomes: []CompactGenome{{Name: infile, Variants: flat}}, + }) + if err != nil { + select { + case errs <- err: + default: + } + } + }() } go close(todo) + var tileJobs sync.WaitGroup running := int64(runtime.NumCPU()) for i := 0; i < runtime.NumCPU(); i++ { - wg.Add(1) + tileJobs.Add(1) go func() { - defer wg.Done() + defer tileJobs.Done() defer atomic.AddInt64(&running, -1) for fn := range todo { if len(errs) > 0 { @@ -236,45 +268,13 @@ func (cmd *gvcf2numpy) tileGVCFs(tilelib *tileLibrary, infiles []string) ([][]ti } }() } - wg.Wait() + tileJobs.Wait() + encodeJobs.Wait() go close(errs) - return variants, <-errs -} - -func (cmd *gvcf2numpy) printVariants(variants [][]tileVariantID) error { - maxlen := 0 - for _, v := range variants { - if maxlen < len(v) { - maxlen = len(v) - } - } - rows := len(variants) / 2 - cols := maxlen * 2 - out := make([]uint16, rows*cols) - for row := 0; row < len(variants)/2; row++ { - for phase := 0; phase < 2; phase++ { - for tag, variant := range variants[row*2+phase] { - out[row*cols+2*int(tag)+phase] = uint16(variant) - } - } - } - w := bufio.NewWriter(cmd.output) - npw, err := gonpy.NewWriter(nopCloser{w}) - if err != nil { - return err - } - npw.Shape = []int{rows, cols} - npw.WriteUint16(out) - return w.Flush() + return <-errs } -type nopCloser struct { - io.Writer -} - -func (nopCloser) Close() error { return nil } - -func (cmd *gvcf2numpy) tileGVCF(tilelib *tileLibrary, infile string, phase int) (tileseq tileSeq, err error) { +func (cmd *importer) tileGVCF(tilelib *tileLibrary, infile string, phase int) (tileseq tileSeq, err error) { args := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase + 1), infile} indexsuffix := ".tbi" if _, err := os.Stat(infile + ".csi"); err == nil { -- 2.30.2