--- /dev/null
+package main
+
+import (
+ "bufio"
+ "flag"
+ "fmt"
+ "io"
+ "log"
+ "net/http"
+ _ "net/http/pprof"
+
+ "github.com/kshedden/gonpy"
+)
+
+type exportNumpy struct {
+ output io.Writer
+}
+
+func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+ var err error
+ defer func() {
+ if err != nil {
+ fmt.Fprintf(stderr, "%s\n", err)
+ }
+ }()
+ flags := flag.NewFlagSet("", flag.ContinueOnError)
+ flags.SetOutput(stderr)
+ pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
+ err = flags.Parse(args)
+ if err == flag.ErrHelp {
+ err = nil
+ return 0
+ } else if err != nil {
+ return 2
+ }
+ cmd.output = stdout
+
+ if *pprof != "" {
+ go func() {
+ log.Println(http.ListenAndServe(*pprof, nil))
+ }()
+ }
+
+ cgs, err := ReadCompactGenomes(stdin)
+ if err != nil {
+ return 1
+ }
+ cols := 0
+ for _, cg := range cgs {
+ if cols < len(cg.Variants) {
+ cols = len(cg.Variants)
+ }
+ }
+ rows := len(cgs)
+ out := make([]uint16, rows*cols)
+ for row, cg := range cgs {
+ for i, v := range cg.Variants {
+ out[row*cols+i] = uint16(v)
+ }
+ }
+ w := bufio.NewWriter(cmd.output)
+ npw, err := gonpy.NewWriter(nopCloser{w})
+ if err != nil {
+ return 1
+ }
+ npw.Shape = []int{rows, cols}
+ npw.WriteUint16(out)
+ err = w.Flush()
+ if err != nil {
+ return 1
+ }
+ return 0
+}
+
+type nopCloser struct {
+ io.Writer
+}
+
+func (nopCloser) Close() error { return nil }
"gopkg.in/check.v1"
)
-type gvcf2numpySuite struct{}
+type exportSuite struct{}
-var _ = check.Suite(&gvcf2numpySuite{})
+var _ = check.Suite(&exportSuite{})
-func (s *gvcf2numpySuite) TestFastaToNumpy(c *check.C) {
- var stdout bytes.Buffer
- var cmd gvcf2numpy
- exited := cmd.RunCommand("gvcf2numpy", []string{"-tag-library", "testdata/tags", "-ref", "testdata/ref", "testdata/a.1.fasta"}, &bytes.Buffer{}, &stdout, os.Stderr)
+func (s *exportSuite) TestFastaToNumpy(c *check.C) {
+ var buffer bytes.Buffer
+ exited := (&importer{}).RunCommand("import", []string{"-tag-library", "testdata/tags", "-ref", "testdata/ref", "testdata/a.1.fasta"}, &bytes.Buffer{}, &buffer, os.Stderr)
c.Check(exited, check.Equals, 0)
- npy, err := gonpy.NewReader(&stdout)
+ var output bytes.Buffer
+ exited = (&exportNumpy{}).RunCommand("export-numpy", nil, &buffer, &output, os.Stderr)
+ c.Check(exited, check.Equals, 0)
+ npy, err := gonpy.NewReader(&output)
c.Assert(err, check.IsNil)
variants, err := npy.GetUint16()
c.Assert(err, check.IsNil)
import (
"bufio"
"compress/gzip"
+ "encoding/gob"
"flag"
"fmt"
"io"
"sync"
"sync/atomic"
"time"
-
- "github.com/kshedden/gonpy"
)
-type gvcf2numpy struct {
+type importer struct {
tagLibraryFile string
refFile string
- output io.Writer
+ encoder *gob.Encoder
}
-func (cmd *gvcf2numpy) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
var err error
defer func() {
if err != nil {
flags.Usage()
return 2
}
- cmd.output = stdout
if *pprof != "" {
go func() {
log.Printf("tilelib.Len() == %d", tilelib.Len())
}
}()
- variants, err := cmd.tileGVCFs(tilelib, infiles)
+ w := bufio.NewWriter(stdout)
+ cmd.encoder = gob.NewEncoder(w)
+ err = cmd.tileGVCFs(tilelib, infiles)
if err != nil {
return 1
}
- err = cmd.printVariants(variants)
+ err = w.Flush()
if err != nil {
return 1
}
return 0
}
-func (cmd *gvcf2numpy) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, error) {
+func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, error) {
var input io.ReadCloser
input, err := os.Open(infile)
if err != nil {
return tilelib.TileFasta(infile, input)
}
-func (cmd *gvcf2numpy) loadTileLibrary() (*tileLibrary, error) {
+func (cmd *importer) loadTileLibrary() (*tileLibrary, error) {
log.Printf("tag library %s load starting", cmd.tagLibraryFile)
f, err := os.Open(cmd.tagLibraryFile)
if err != nil {
return
}
-func (cmd *gvcf2numpy) tileGVCFs(tilelib *tileLibrary, infiles []string) ([][]tileVariantID, error) {
+func (cmd *importer) tileGVCFs(tilelib *tileLibrary, infiles []string) error {
starttime := time.Now()
errs := make(chan error, 1)
- variants := make([][]tileVariantID, len(infiles)*2)
todo := make(chan func() error, len(infiles)*2)
- var wg sync.WaitGroup
- for i, infile := range infiles {
- i, infile := i, infile
+ var encodeJobs sync.WaitGroup
+ for _, infile := range infiles {
+ infile := infile
+ var phases sync.WaitGroup
+ phases.Add(2)
+ variants := make([][]tileVariantID, 2)
if strings.HasSuffix(infile, ".1.fasta") || strings.HasSuffix(infile, ".1.fasta.gz") {
todo <- func() error {
+ defer phases.Done()
log.Printf("%s starting", infile)
defer log.Printf("%s done", infile)
tseqs, err := cmd.tileFasta(tilelib, infile)
- variants[i*2] = tseqs.Variants()
+ variants[0] = tseqs.Variants()
return err
}
infile2 := regexp.MustCompile(`\.1\.fasta(\.gz)?$`).ReplaceAllString(infile, `.2.fasta$1`)
todo <- func() error {
+ defer phases.Done()
log.Printf("%s starting", infile2)
defer log.Printf("%s done", infile2)
tseqs, err := cmd.tileFasta(tilelib, infile2)
- variants[i*2+1] = tseqs.Variants()
+ variants[1] = tseqs.Variants()
return err
}
} else {
for phase := 0; phase < 2; phase++ {
phase := phase
todo <- func() error {
+ defer phases.Done()
log.Printf("%s phase %d starting", infile, phase+1)
defer log.Printf("%s phase %d done", infile, phase+1)
tseqs, err := cmd.tileGVCF(tilelib, infile, phase)
- variants[i*2+phase] = tseqs.Variants()
+ variants[phase] = tseqs.Variants()
return err
}
}
}
+ encodeJobs.Add(1)
+ go func() {
+ defer encodeJobs.Done()
+ phases.Wait()
+ if len(errs) > 0 {
+ return
+ }
+ ntags := len(variants[0])
+ if ntags < len(variants[1]) {
+ ntags = len(variants[1])
+ }
+ flat := make([]tileVariantID, ntags*2)
+ for i := 0; i < ntags; i++ {
+ flat[i*2] = variants[0][i]
+ flat[i*2+1] = variants[1][i]
+ }
+ err := cmd.encoder.Encode(LibraryEntry{
+ CompactGenomes: []CompactGenome{{Name: infile, Variants: flat}},
+ })
+ if err != nil {
+ select {
+ case errs <- err:
+ default:
+ }
+ }
+ }()
}
go close(todo)
+ var tileJobs sync.WaitGroup
running := int64(runtime.NumCPU())
for i := 0; i < runtime.NumCPU(); i++ {
- wg.Add(1)
+ tileJobs.Add(1)
go func() {
- defer wg.Done()
+ defer tileJobs.Done()
defer atomic.AddInt64(&running, -1)
for fn := range todo {
if len(errs) > 0 {
}
}()
}
- wg.Wait()
+ tileJobs.Wait()
+ encodeJobs.Wait()
go close(errs)
- return variants, <-errs
-}
-
-func (cmd *gvcf2numpy) printVariants(variants [][]tileVariantID) error {
- maxlen := 0
- for _, v := range variants {
- if maxlen < len(v) {
- maxlen = len(v)
- }
- }
- rows := len(variants) / 2
- cols := maxlen * 2
- out := make([]uint16, rows*cols)
- for row := 0; row < len(variants)/2; row++ {
- for phase := 0; phase < 2; phase++ {
- for tag, variant := range variants[row*2+phase] {
- out[row*cols+2*int(tag)+phase] = uint16(variant)
- }
- }
- }
- w := bufio.NewWriter(cmd.output)
- npw, err := gonpy.NewWriter(nopCloser{w})
- if err != nil {
- return err
- }
- npw.Shape = []int{rows, cols}
- npw.WriteUint16(out)
- return w.Flush()
+ return <-errs
}
-type nopCloser struct {
- io.Writer
-}
-
-func (nopCloser) Close() error { return nil }
-
-func (cmd *gvcf2numpy) tileGVCF(tilelib *tileLibrary, infile string, phase int) (tileseq tileSeq, err error) {
+func (cmd *importer) tileGVCF(tilelib *tileLibrary, infile string, phase int) (tileseq tileSeq, err error) {
args := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase + 1), infile}
indexsuffix := ".tbi"
if _, err := os.Stat(infile + ".csi"); err == nil {