From: Tom Clegg Date: Thu, 16 Jan 2020 00:16:14 +0000 (-0500) Subject: Write numpy array. X-Git-Url: https://git.arvados.org/lightning.git/commitdiff_plain/ae8d2ff39c95cbf033d9cc24e64b77b5b80463b0 Write numpy array. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- diff --git a/go.mod b/go.mod index 37e886665e..7e35d3e8bc 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.13 require ( git.arvados.org/arvados.git v0.0.0-20200107160329-7db3857d78a0 + github.com/kshedden/gonpy v0.0.0-20190510000443-66c21fac4672 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405 ) diff --git a/go.sum b/go.sum index ad97e7060b..50e32f9887 100644 --- a/go.sum +++ b/go.sum @@ -69,6 +69,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V github.com/kevinburke/ssh_config v0.0.0-20171013211458-802051befeb5/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= +github.com/kshedden/gonpy v0.0.0-20190510000443-66c21fac4672 h1:LQLnybCU54zB8Gj8c1DPeZEheIAn3eZ8Cc9fYqM4ac8= +github.com/kshedden/gonpy v0.0.0-20190510000443-66c21fac4672/go.mod h1:+uEXxXG0RlfBPqG1tq5QN/F2jRlcuY0dExSONLpEwcA= github.com/lib/pq v0.0.0-20171126050459-83612a56d3dd/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/marstr/guid v1.1.1-0.20170427235115-8bdf7d1a087c/go.mod h1:74gB1z2wpxxInTG6yaqA7KrtM0NZ+RbrcqDvYHefzho= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= diff --git a/gvcf2numpy.go b/gvcf2numpy.go index 392bd54d2c..55f25361bc 100644 --- a/gvcf2numpy.go +++ b/gvcf2numpy.go @@ -1,6 +1,7 @@ package main import ( + "bufio" "compress/gzip" "flag" "fmt" @@ -13,13 +14,14 @@ import ( "sort" "strings" "sync" + + "github.com/kshedden/gonpy" ) type gvcf2numpy struct { tagLibraryFile string refFile string output io.Writer - outputMtx sync.Mutex } func (cmd *gvcf2numpy) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { @@ -78,7 +80,11 @@ func (cmd *gvcf2numpy) RunCommand(prog string, args []string, stdin io.Reader, s log.Printf("tag library %s load done", cmd.tagLibraryFile) tilelib := tileLibrary{taglib: &taglib} - err = cmd.tileGVCFs(&tilelib, infiles) + tseqs, err := cmd.tileGVCFs(&tilelib, infiles) + if err != nil { + return 1 + } + err = cmd.printVariants(tseqs) if err != nil { return 1 } @@ -122,20 +128,22 @@ func listVCFFiles(paths []string) (files []string, err error) { return } -func (cmd *gvcf2numpy) tileGVCFs(tilelib *tileLibrary, infiles []string) error { +func (cmd *gvcf2numpy) tileGVCFs(tilelib *tileLibrary, infiles []string) ([]tileSeq, error) { limit := make(chan bool, runtime.NumCPU()) errs := make(chan error) + tseqs := make([]tileSeq, len(infiles)*2) var wg sync.WaitGroup - for _, infile := range infiles { + for i, infile := range infiles { for phase := 0; phase < 2; phase++ { wg.Add(1) - go func(infile string, phase int) { + go func(i int, infile string, phase int) { defer wg.Done() limit <- true defer func() { <-limit }() log.Printf("%s phase %d starting", infile, phase+1) defer log.Printf("%s phase %d done", infile, phase+1) - tseq, err := cmd.tileGVCF(tilelib, infile, phase) + var err error + tseqs[i*2+phase], err = cmd.tileGVCF(tilelib, infile, phase) if err != nil { select { case errs <- err: @@ -143,8 +151,7 @@ func (cmd *gvcf2numpy) tileGVCFs(tilelib *tileLibrary, infiles []string) error { } return } - cmd.printVariants(fmt.Sprintf("%s phase %d", infile, phase+1), tseq) - }(infile, phase) + }(i, infile, phase) } } go func() { @@ -152,39 +159,48 @@ func (cmd *gvcf2numpy) tileGVCFs(tilelib *tileLibrary, infiles []string) error { close(errs) }() if err := <-errs; err != nil { - return err + return nil, err } - return nil + return tseqs, nil } -func (cmd *gvcf2numpy) printVariants(label string, tseq map[string][]tileLibRef) { +func (cmd *gvcf2numpy) printVariants(tseqs []tileSeq) error { maxtag := tagID(-1) - for _, path := range tseq { - for _, tvar := range path { - if maxtag < tvar.tag { - maxtag = tvar.tag + for _, tseq := range tseqs { + for _, path := range tseq { + for _, tvar := range path { + if maxtag < tvar.tag { + maxtag = tvar.tag + } } } } - variant := make([]tileVariantID, maxtag+1) - for _, path := range tseq { - for _, tvar := range path { - variant[tvar.tag] = tvar.variant + out := make([]int32, len(tseqs)*int(maxtag+1)) + for i := 0; i < len(tseqs)/2; i++ { + for phase := 0; phase < 2; phase++ { + for _, path := range tseqs[i*2+phase] { + for _, tvar := range path { + out[2*int(tvar.tag)+phase] = int32(tvar.variant) + } + } } } - - { - excerpt := variant - if len(excerpt) > 100 { - excerpt = excerpt[:100] - } - log.Printf("%q %v\n", label, excerpt) + w := bufio.NewWriter(cmd.output) + npw, err := gonpy.NewWriter(nopCloser{w}) + if err != nil { + return err } - cmd.outputMtx.Lock() - defer cmd.outputMtx.Unlock() - fmt.Fprintf(cmd.output, "%q %v\n", label, variant) + npw.Shape = []int{len(tseqs) / 2, 2 * (int(maxtag) + 1)} + npw.WriteInt32(out) + return w.Flush() +} + +type nopCloser struct { + io.Writer } +func (nopCloser) Close() error { return nil } + func (cmd *gvcf2numpy) tileGVCF(tilelib *tileLibrary, infile string, phase int) (tileseq tileSeq, err error) { args := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase + 1), infile} indexsuffix := ".tbi"