Split gvcf2numpy command into import and export-numpy.
authorTom Clegg <tom@tomclegg.ca>
Tue, 4 Feb 2020 21:28:51 +0000 (16:28 -0500)
committerTom Clegg <tom@tomclegg.ca>
Tue, 4 Feb 2020 21:28:51 +0000 (16:28 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@tomclegg.ca>

cmd.go
exportnumpy.go [new file with mode: 0644]
exportnumpy_test.go [moved from gvcf2numpy_test.go with 62% similarity]
gob.go [new file with mode: 0644]
import.go [moved from gvcf2numpy.go with 79% similarity]

diff --git a/cmd.go b/cmd.go
index 713e1d815dd9bcd24065b9c16fdca3b84209ffd4..1e86dc701010164ade05e32765762004ceeff42c 100644 (file)
--- a/cmd.go
+++ b/cmd.go
@@ -16,7 +16,8 @@ var (
                "-version":  cmd.Version,
                "--version": cmd.Version,
 
-               "gvcf2numpy":         &gvcf2numpy{},
+               "import":             &importer{},
+               "export-numpy":       &exportNumpy{},
                "build-docker-image": &buildDockerImage{},
        })
 )
diff --git a/exportnumpy.go b/exportnumpy.go
new file mode 100644 (file)
index 0000000..d11b743
--- /dev/null
@@ -0,0 +1,79 @@
+package main
+
+import (
+       "bufio"
+       "flag"
+       "fmt"
+       "io"
+       "log"
+       "net/http"
+       _ "net/http/pprof"
+
+       "github.com/kshedden/gonpy"
+)
+
+type exportNumpy struct {
+       output io.Writer
+}
+
+func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+       var err error
+       defer func() {
+               if err != nil {
+                       fmt.Fprintf(stderr, "%s\n", err)
+               }
+       }()
+       flags := flag.NewFlagSet("", flag.ContinueOnError)
+       flags.SetOutput(stderr)
+       pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
+       err = flags.Parse(args)
+       if err == flag.ErrHelp {
+               err = nil
+               return 0
+       } else if err != nil {
+               return 2
+       }
+       cmd.output = stdout
+
+       if *pprof != "" {
+               go func() {
+                       log.Println(http.ListenAndServe(*pprof, nil))
+               }()
+       }
+
+       cgs, err := ReadCompactGenomes(stdin)
+       if err != nil {
+               return 1
+       }
+       cols := 0
+       for _, cg := range cgs {
+               if cols < len(cg.Variants) {
+                       cols = len(cg.Variants)
+               }
+       }
+       rows := len(cgs)
+       out := make([]uint16, rows*cols)
+       for row, cg := range cgs {
+               for i, v := range cg.Variants {
+                       out[row*cols+i] = uint16(v)
+               }
+       }
+       w := bufio.NewWriter(cmd.output)
+       npw, err := gonpy.NewWriter(nopCloser{w})
+       if err != nil {
+               return 1
+       }
+       npw.Shape = []int{rows, cols}
+       npw.WriteUint16(out)
+       err = w.Flush()
+       if err != nil {
+               return 1
+       }
+       return 0
+}
+
+type nopCloser struct {
+       io.Writer
+}
+
+func (nopCloser) Close() error { return nil }
similarity index 62%
rename from gvcf2numpy_test.go
rename to exportnumpy_test.go
index 21e08f9ac5cd58d0451d09a9e6cc588acf3d1226..4bef7bd1e88ec7147d7b8382b997f83404b5f760 100644 (file)
@@ -8,16 +8,18 @@ import (
        "gopkg.in/check.v1"
 )
 
-type gvcf2numpySuite struct{}
+type exportSuite struct{}
 
-var _ = check.Suite(&gvcf2numpySuite{})
+var _ = check.Suite(&exportSuite{})
 
-func (s *gvcf2numpySuite) TestFastaToNumpy(c *check.C) {
-       var stdout bytes.Buffer
-       var cmd gvcf2numpy
-       exited := cmd.RunCommand("gvcf2numpy", []string{"-tag-library", "testdata/tags", "-ref", "testdata/ref", "testdata/a.1.fasta"}, &bytes.Buffer{}, &stdout, os.Stderr)
+func (s *exportSuite) TestFastaToNumpy(c *check.C) {
+       var buffer bytes.Buffer
+       exited := (&importer{}).RunCommand("import", []string{"-tag-library", "testdata/tags", "-ref", "testdata/ref", "testdata/a.1.fasta"}, &bytes.Buffer{}, &buffer, os.Stderr)
        c.Check(exited, check.Equals, 0)
-       npy, err := gonpy.NewReader(&stdout)
+       var output bytes.Buffer
+       exited = (&exportNumpy{}).RunCommand("export-numpy", nil, &buffer, &output, os.Stderr)
+       c.Check(exited, check.Equals, 0)
+       npy, err := gonpy.NewReader(&output)
        c.Assert(err, check.IsNil)
        variants, err := npy.GetUint16()
        c.Assert(err, check.IsNil)
diff --git a/gob.go b/gob.go
new file mode 100644 (file)
index 0000000..d7ac24e
--- /dev/null
+++ b/gob.go
@@ -0,0 +1,39 @@
+package main
+
+import (
+       "encoding/gob"
+       "io"
+       _ "net/http/pprof"
+
+       "golang.org/x/crypto/blake2b"
+)
+
+type CompactGenome struct {
+       Name     string
+       Variants []tileVariantID
+}
+
+type LibraryEntry struct {
+       TagSet         [][]byte
+       CompactGenomes []CompactGenome
+       TileVariants   []struct {
+               Tag      tagID
+               Blake2b  [blake2b.Size]byte
+               Sequence []byte
+       }
+}
+
+func ReadCompactGenomes(rdr io.Reader) ([]CompactGenome, error) {
+       dec := gob.NewDecoder(rdr)
+       var ret []CompactGenome
+       for {
+               var ent LibraryEntry
+               err := dec.Decode(&ent)
+               if err == io.EOF {
+                       return ret, nil
+               } else if err != nil {
+                       return nil, err
+               }
+               ret = append(ret, ent.CompactGenomes...)
+       }
+}
similarity index 79%
rename from gvcf2numpy.go
rename to import.go
index 156e77091222c408e4bace9b75fc8b2fdf910736..2f9888ce390427c0b426756f936d6c1df338c180 100644 (file)
+++ b/import.go
@@ -3,6 +3,7 @@ package main
 import (
        "bufio"
        "compress/gzip"
+       "encoding/gob"
        "flag"
        "fmt"
        "io"
@@ -19,17 +20,15 @@ import (
        "sync"
        "sync/atomic"
        "time"
-
-       "github.com/kshedden/gonpy"
 )
 
-type gvcf2numpy struct {
+type importer struct {
        tagLibraryFile string
        refFile        string
-       output         io.Writer
+       encoder        *gob.Encoder
 }
 
-func (cmd *gvcf2numpy) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
        var err error
        defer func() {
                if err != nil {
@@ -54,7 +53,6 @@ func (cmd *gvcf2numpy) RunCommand(prog string, args []string, stdin io.Reader, s
                flags.Usage()
                return 2
        }
-       cmd.output = stdout
 
        if *pprof != "" {
                go func() {
@@ -76,18 +74,20 @@ func (cmd *gvcf2numpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        log.Printf("tilelib.Len() == %d", tilelib.Len())
                }
        }()
-       variants, err := cmd.tileGVCFs(tilelib, infiles)
+       w := bufio.NewWriter(stdout)
+       cmd.encoder = gob.NewEncoder(w)
+       err = cmd.tileGVCFs(tilelib, infiles)
        if err != nil {
                return 1
        }
-       err = cmd.printVariants(variants)
+       err = w.Flush()
        if err != nil {
                return 1
        }
        return 0
 }
 
-func (cmd *gvcf2numpy) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, error) {
+func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, error) {
        var input io.ReadCloser
        input, err := os.Open(infile)
        if err != nil {
@@ -104,7 +104,7 @@ func (cmd *gvcf2numpy) tileFasta(tilelib *tileLibrary, infile string) (tileSeq,
        return tilelib.TileFasta(infile, input)
 }
 
-func (cmd *gvcf2numpy) loadTileLibrary() (*tileLibrary, error) {
+func (cmd *importer) loadTileLibrary() (*tileLibrary, error) {
        log.Printf("tag library %s load starting", cmd.tagLibraryFile)
        f, err := os.Open(cmd.tagLibraryFile)
        if err != nil {
@@ -174,49 +174,81 @@ func listInputFiles(paths []string) (files []string, err error) {
        return
 }
 
-func (cmd *gvcf2numpy) tileGVCFs(tilelib *tileLibrary, infiles []string) ([][]tileVariantID, error) {
+func (cmd *importer) tileGVCFs(tilelib *tileLibrary, infiles []string) error {
        starttime := time.Now()
        errs := make(chan error, 1)
-       variants := make([][]tileVariantID, len(infiles)*2)
        todo := make(chan func() error, len(infiles)*2)
-       var wg sync.WaitGroup
-       for i, infile := range infiles {
-               i, infile := i, infile
+       var encodeJobs sync.WaitGroup
+       for _, infile := range infiles {
+               infile := infile
+               var phases sync.WaitGroup
+               phases.Add(2)
+               variants := make([][]tileVariantID, 2)
                if strings.HasSuffix(infile, ".1.fasta") || strings.HasSuffix(infile, ".1.fasta.gz") {
                        todo <- func() error {
+                               defer phases.Done()
                                log.Printf("%s starting", infile)
                                defer log.Printf("%s done", infile)
                                tseqs, err := cmd.tileFasta(tilelib, infile)
-                               variants[i*2] = tseqs.Variants()
+                               variants[0] = tseqs.Variants()
                                return err
                        }
                        infile2 := regexp.MustCompile(`\.1\.fasta(\.gz)?$`).ReplaceAllString(infile, `.2.fasta$1`)
                        todo <- func() error {
+                               defer phases.Done()
                                log.Printf("%s starting", infile2)
                                defer log.Printf("%s done", infile2)
                                tseqs, err := cmd.tileFasta(tilelib, infile2)
-                               variants[i*2+1] = tseqs.Variants()
+                               variants[1] = tseqs.Variants()
                                return err
                        }
                } else {
                        for phase := 0; phase < 2; phase++ {
                                phase := phase
                                todo <- func() error {
+                                       defer phases.Done()
                                        log.Printf("%s phase %d starting", infile, phase+1)
                                        defer log.Printf("%s phase %d done", infile, phase+1)
                                        tseqs, err := cmd.tileGVCF(tilelib, infile, phase)
-                                       variants[i*2+phase] = tseqs.Variants()
+                                       variants[phase] = tseqs.Variants()
                                        return err
                                }
                        }
                }
+               encodeJobs.Add(1)
+               go func() {
+                       defer encodeJobs.Done()
+                       phases.Wait()
+                       if len(errs) > 0 {
+                               return
+                       }
+                       ntags := len(variants[0])
+                       if ntags < len(variants[1]) {
+                               ntags = len(variants[1])
+                       }
+                       flat := make([]tileVariantID, ntags*2)
+                       for i := 0; i < ntags; i++ {
+                               flat[i*2] = variants[0][i]
+                               flat[i*2+1] = variants[1][i]
+                       }
+                       err := cmd.encoder.Encode(LibraryEntry{
+                               CompactGenomes: []CompactGenome{{Name: infile, Variants: flat}},
+                       })
+                       if err != nil {
+                               select {
+                               case errs <- err:
+                               default:
+                               }
+                       }
+               }()
        }
        go close(todo)
+       var tileJobs sync.WaitGroup
        running := int64(runtime.NumCPU())
        for i := 0; i < runtime.NumCPU(); i++ {
-               wg.Add(1)
+               tileJobs.Add(1)
                go func() {
-                       defer wg.Done()
+                       defer tileJobs.Done()
                        defer atomic.AddInt64(&running, -1)
                        for fn := range todo {
                                if len(errs) > 0 {
@@ -236,45 +268,13 @@ func (cmd *gvcf2numpy) tileGVCFs(tilelib *tileLibrary, infiles []string) ([][]ti
                        }
                }()
        }
-       wg.Wait()
+       tileJobs.Wait()
+       encodeJobs.Wait()
        go close(errs)
-       return variants, <-errs
-}
-
-func (cmd *gvcf2numpy) printVariants(variants [][]tileVariantID) error {
-       maxlen := 0
-       for _, v := range variants {
-               if maxlen < len(v) {
-                       maxlen = len(v)
-               }
-       }
-       rows := len(variants) / 2
-       cols := maxlen * 2
-       out := make([]uint16, rows*cols)
-       for row := 0; row < len(variants)/2; row++ {
-               for phase := 0; phase < 2; phase++ {
-                       for tag, variant := range variants[row*2+phase] {
-                               out[row*cols+2*int(tag)+phase] = uint16(variant)
-                       }
-               }
-       }
-       w := bufio.NewWriter(cmd.output)
-       npw, err := gonpy.NewWriter(nopCloser{w})
-       if err != nil {
-               return err
-       }
-       npw.Shape = []int{rows, cols}
-       npw.WriteUint16(out)
-       return w.Flush()
+       return <-errs
 }
 
-type nopCloser struct {
-       io.Writer
-}
-
-func (nopCloser) Close() error { return nil }
-
-func (cmd *gvcf2numpy) tileGVCF(tilelib *tileLibrary, infile string, phase int) (tileseq tileSeq, err error) {
+func (cmd *importer) tileGVCF(tilelib *tileLibrary, infile string, phase int) (tileseq tileSeq, err error) {
        args := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase + 1), infile}
        indexsuffix := ".tbi"
        if _, err := os.Stat(infile + ".csi"); err == nil {