package main
import (
+ "bufio"
"compress/gzip"
"flag"
"fmt"
"sort"
"strings"
"sync"
+
+ "github.com/kshedden/gonpy"
)
type gvcf2numpy struct {
tagLibraryFile string
refFile string
output io.Writer
- outputMtx sync.Mutex
}
func (cmd *gvcf2numpy) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
log.Printf("tag library %s load done", cmd.tagLibraryFile)
tilelib := tileLibrary{taglib: &taglib}
- err = cmd.tileGVCFs(&tilelib, infiles)
+ tseqs, err := cmd.tileGVCFs(&tilelib, infiles)
+ if err != nil {
+ return 1
+ }
+ err = cmd.printVariants(tseqs)
if err != nil {
return 1
}
return
}
-func (cmd *gvcf2numpy) tileGVCFs(tilelib *tileLibrary, infiles []string) error {
+func (cmd *gvcf2numpy) tileGVCFs(tilelib *tileLibrary, infiles []string) ([]tileSeq, error) {
limit := make(chan bool, runtime.NumCPU())
errs := make(chan error)
+ tseqs := make([]tileSeq, len(infiles)*2)
var wg sync.WaitGroup
- for _, infile := range infiles {
+ for i, infile := range infiles {
for phase := 0; phase < 2; phase++ {
wg.Add(1)
- go func(infile string, phase int) {
+ go func(i int, infile string, phase int) {
defer wg.Done()
limit <- true
defer func() { <-limit }()
log.Printf("%s phase %d starting", infile, phase+1)
defer log.Printf("%s phase %d done", infile, phase+1)
- tseq, err := cmd.tileGVCF(tilelib, infile, phase)
+ var err error
+ tseqs[i*2+phase], err = cmd.tileGVCF(tilelib, infile, phase)
if err != nil {
select {
case errs <- err:
}
return
}
- cmd.printVariants(fmt.Sprintf("%s phase %d", infile, phase+1), tseq)
- }(infile, phase)
+ }(i, infile, phase)
}
}
go func() {
close(errs)
}()
if err := <-errs; err != nil {
- return err
+ return nil, err
}
- return nil
+ return tseqs, nil
}
-func (cmd *gvcf2numpy) printVariants(label string, tseq map[string][]tileLibRef) {
+func (cmd *gvcf2numpy) printVariants(tseqs []tileSeq) error {
maxtag := tagID(-1)
- for _, path := range tseq {
- for _, tvar := range path {
- if maxtag < tvar.tag {
- maxtag = tvar.tag
+ for _, tseq := range tseqs {
+ for _, path := range tseq {
+ for _, tvar := range path {
+ if maxtag < tvar.tag {
+ maxtag = tvar.tag
+ }
}
}
}
- variant := make([]tileVariantID, maxtag+1)
- for _, path := range tseq {
- for _, tvar := range path {
- variant[tvar.tag] = tvar.variant
+ out := make([]int32, len(tseqs)*int(maxtag+1))
+ for i := 0; i < len(tseqs)/2; i++ {
+ for phase := 0; phase < 2; phase++ {
+ for _, path := range tseqs[i*2+phase] {
+ for _, tvar := range path {
+ out[2*int(tvar.tag)+phase] = int32(tvar.variant)
+ }
+ }
}
}
-
- {
- excerpt := variant
- if len(excerpt) > 100 {
- excerpt = excerpt[:100]
- }
- log.Printf("%q %v\n", label, excerpt)
+ w := bufio.NewWriter(cmd.output)
+ npw, err := gonpy.NewWriter(nopCloser{w})
+ if err != nil {
+ return err
}
- cmd.outputMtx.Lock()
- defer cmd.outputMtx.Unlock()
- fmt.Fprintf(cmd.output, "%q %v\n", label, variant)
+ npw.Shape = []int{len(tseqs) / 2, 2 * (int(maxtag) + 1)}
+ npw.WriteInt32(out)
+ return w.Flush()
+}
+
+type nopCloser struct {
+ io.Writer
}
+func (nopCloser) Close() error { return nil }
+
func (cmd *gvcf2numpy) tileGVCF(tilelib *tileLibrary, infile string, phase int) (tileseq tileSeq, err error) {
args := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase + 1), infile}
indexsuffix := ".tbi"