Write numpy array.
authorTom Clegg <tom@tomclegg.ca>
Thu, 16 Jan 2020 00:16:14 +0000 (19:16 -0500)
committerTom Clegg <tom@tomclegg.ca>
Thu, 16 Jan 2020 00:16:14 +0000 (19:16 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@tomclegg.ca>

go.mod
go.sum
gvcf2numpy.go

diff --git a/go.mod b/go.mod
index 37e886665e78b9b1a88e5f57df1d3d57a70573c2..7e35d3e8bcb8a06b640868f580b5716aca3c87e6 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -4,6 +4,7 @@ go 1.13
 
 require (
        git.arvados.org/arvados.git v0.0.0-20200107160329-7db3857d78a0
+       github.com/kshedden/gonpy v0.0.0-20190510000443-66c21fac4672
        golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2
        gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405
 )
diff --git a/go.sum b/go.sum
index ad97e7060bde6e6225abac3438a5e7550bb21245..50e32f9887e0b14d9c7880d72725796528d11806 100644 (file)
--- a/go.sum
+++ b/go.sum
@@ -69,6 +69,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V
 github.com/kevinburke/ssh_config v0.0.0-20171013211458-802051befeb5/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
 github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
+github.com/kshedden/gonpy v0.0.0-20190510000443-66c21fac4672 h1:LQLnybCU54zB8Gj8c1DPeZEheIAn3eZ8Cc9fYqM4ac8=
+github.com/kshedden/gonpy v0.0.0-20190510000443-66c21fac4672/go.mod h1:+uEXxXG0RlfBPqG1tq5QN/F2jRlcuY0dExSONLpEwcA=
 github.com/lib/pq v0.0.0-20171126050459-83612a56d3dd/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
 github.com/marstr/guid v1.1.1-0.20170427235115-8bdf7d1a087c/go.mod h1:74gB1z2wpxxInTG6yaqA7KrtM0NZ+RbrcqDvYHefzho=
 github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
index 392bd54d2cef8bcd6fef9b6d743bc7cd61341769..55f25361bc4b6c7e7772a41d24ca0e321bd3aa72 100644 (file)
@@ -1,6 +1,7 @@
 package main
 
 import (
+       "bufio"
        "compress/gzip"
        "flag"
        "fmt"
@@ -13,13 +14,14 @@ import (
        "sort"
        "strings"
        "sync"
+
+       "github.com/kshedden/gonpy"
 )
 
 type gvcf2numpy struct {
        tagLibraryFile string
        refFile        string
        output         io.Writer
-       outputMtx      sync.Mutex
 }
 
 func (cmd *gvcf2numpy) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
@@ -78,7 +80,11 @@ func (cmd *gvcf2numpy) RunCommand(prog string, args []string, stdin io.Reader, s
        log.Printf("tag library %s load done", cmd.tagLibraryFile)
 
        tilelib := tileLibrary{taglib: &taglib}
-       err = cmd.tileGVCFs(&tilelib, infiles)
+       tseqs, err := cmd.tileGVCFs(&tilelib, infiles)
+       if err != nil {
+               return 1
+       }
+       err = cmd.printVariants(tseqs)
        if err != nil {
                return 1
        }
@@ -122,20 +128,22 @@ func listVCFFiles(paths []string) (files []string, err error) {
        return
 }
 
-func (cmd *gvcf2numpy) tileGVCFs(tilelib *tileLibrary, infiles []string) error {
+func (cmd *gvcf2numpy) tileGVCFs(tilelib *tileLibrary, infiles []string) ([]tileSeq, error) {
        limit := make(chan bool, runtime.NumCPU())
        errs := make(chan error)
+       tseqs := make([]tileSeq, len(infiles)*2)
        var wg sync.WaitGroup
-       for _, infile := range infiles {
+       for i, infile := range infiles {
                for phase := 0; phase < 2; phase++ {
                        wg.Add(1)
-                       go func(infile string, phase int) {
+                       go func(i int, infile string, phase int) {
                                defer wg.Done()
                                limit <- true
                                defer func() { <-limit }()
                                log.Printf("%s phase %d starting", infile, phase+1)
                                defer log.Printf("%s phase %d done", infile, phase+1)
-                               tseq, err := cmd.tileGVCF(tilelib, infile, phase)
+                               var err error
+                               tseqs[i*2+phase], err = cmd.tileGVCF(tilelib, infile, phase)
                                if err != nil {
                                        select {
                                        case errs <- err:
@@ -143,8 +151,7 @@ func (cmd *gvcf2numpy) tileGVCFs(tilelib *tileLibrary, infiles []string) error {
                                        }
                                        return
                                }
-                               cmd.printVariants(fmt.Sprintf("%s phase %d", infile, phase+1), tseq)
-                       }(infile, phase)
+                       }(i, infile, phase)
                }
        }
        go func() {
@@ -152,39 +159,48 @@ func (cmd *gvcf2numpy) tileGVCFs(tilelib *tileLibrary, infiles []string) error {
                close(errs)
        }()
        if err := <-errs; err != nil {
-               return err
+               return nil, err
        }
-       return nil
+       return tseqs, nil
 }
 
-func (cmd *gvcf2numpy) printVariants(label string, tseq map[string][]tileLibRef) {
+func (cmd *gvcf2numpy) printVariants(tseqs []tileSeq) error {
        maxtag := tagID(-1)
-       for _, path := range tseq {
-               for _, tvar := range path {
-                       if maxtag < tvar.tag {
-                               maxtag = tvar.tag
+       for _, tseq := range tseqs {
+               for _, path := range tseq {
+                       for _, tvar := range path {
+                               if maxtag < tvar.tag {
+                                       maxtag = tvar.tag
+                               }
                        }
                }
        }
-       variant := make([]tileVariantID, maxtag+1)
-       for _, path := range tseq {
-               for _, tvar := range path {
-                       variant[tvar.tag] = tvar.variant
+       out := make([]int32, len(tseqs)*int(maxtag+1))
+       for i := 0; i < len(tseqs)/2; i++ {
+               for phase := 0; phase < 2; phase++ {
+                       for _, path := range tseqs[i*2+phase] {
+                               for _, tvar := range path {
+                                       out[2*int(tvar.tag)+phase] = int32(tvar.variant)
+                               }
+                       }
                }
        }
-
-       {
-               excerpt := variant
-               if len(excerpt) > 100 {
-                       excerpt = excerpt[:100]
-               }
-               log.Printf("%q %v\n", label, excerpt)
+       w := bufio.NewWriter(cmd.output)
+       npw, err := gonpy.NewWriter(nopCloser{w})
+       if err != nil {
+               return err
        }
-       cmd.outputMtx.Lock()
-       defer cmd.outputMtx.Unlock()
-       fmt.Fprintf(cmd.output, "%q %v\n", label, variant)
+       npw.Shape = []int{len(tseqs) / 2, 2 * (int(maxtag) + 1)}
+       npw.WriteInt32(out)
+       return w.Flush()
+}
+
+type nopCloser struct {
+       io.Writer
 }
 
+func (nopCloser) Close() error { return nil }
+
 func (cmd *gvcf2numpy) tileGVCF(tilelib *tileLibrary, infile string, phase int) (tileseq tileSeq, err error) {
        args := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase + 1), infile}
        indexsuffix := ".tbi"