Split gvcf2numpy command into import and export-numpy.
[lightning.git] / import.go
similarity index 79%
rename from gvcf2numpy.go
rename to import.go
index 156e77091222c408e4bace9b75fc8b2fdf910736..2f9888ce390427c0b426756f936d6c1df338c180 100644 (file)
+++ b/import.go
@@ -3,6 +3,7 @@ package main
 import (
        "bufio"
        "compress/gzip"
+       "encoding/gob"
        "flag"
        "fmt"
        "io"
@@ -19,17 +20,15 @@ import (
        "sync"
        "sync/atomic"
        "time"
-
-       "github.com/kshedden/gonpy"
 )
 
-type gvcf2numpy struct {
+type importer struct {
        tagLibraryFile string
        refFile        string
-       output         io.Writer
+       encoder        *gob.Encoder
 }
 
-func (cmd *gvcf2numpy) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
        var err error
        defer func() {
                if err != nil {
@@ -54,7 +53,6 @@ func (cmd *gvcf2numpy) RunCommand(prog string, args []string, stdin io.Reader, s
                flags.Usage()
                return 2
        }
-       cmd.output = stdout
 
        if *pprof != "" {
                go func() {
@@ -76,18 +74,20 @@ func (cmd *gvcf2numpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        log.Printf("tilelib.Len() == %d", tilelib.Len())
                }
        }()
-       variants, err := cmd.tileGVCFs(tilelib, infiles)
+       w := bufio.NewWriter(stdout)
+       cmd.encoder = gob.NewEncoder(w)
+       err = cmd.tileGVCFs(tilelib, infiles)
        if err != nil {
                return 1
        }
-       err = cmd.printVariants(variants)
+       err = w.Flush()
        if err != nil {
                return 1
        }
        return 0
 }
 
-func (cmd *gvcf2numpy) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, error) {
+func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, error) {
        var input io.ReadCloser
        input, err := os.Open(infile)
        if err != nil {
@@ -104,7 +104,7 @@ func (cmd *gvcf2numpy) tileFasta(tilelib *tileLibrary, infile string) (tileSeq,
        return tilelib.TileFasta(infile, input)
 }
 
-func (cmd *gvcf2numpy) loadTileLibrary() (*tileLibrary, error) {
+func (cmd *importer) loadTileLibrary() (*tileLibrary, error) {
        log.Printf("tag library %s load starting", cmd.tagLibraryFile)
        f, err := os.Open(cmd.tagLibraryFile)
        if err != nil {
@@ -174,49 +174,81 @@ func listInputFiles(paths []string) (files []string, err error) {
        return
 }
 
-func (cmd *gvcf2numpy) tileGVCFs(tilelib *tileLibrary, infiles []string) ([][]tileVariantID, error) {
+func (cmd *importer) tileGVCFs(tilelib *tileLibrary, infiles []string) error {
        starttime := time.Now()
        errs := make(chan error, 1)
-       variants := make([][]tileVariantID, len(infiles)*2)
        todo := make(chan func() error, len(infiles)*2)
-       var wg sync.WaitGroup
-       for i, infile := range infiles {
-               i, infile := i, infile
+       var encodeJobs sync.WaitGroup
+       for _, infile := range infiles {
+               infile := infile
+               var phases sync.WaitGroup
+               phases.Add(2)
+               variants := make([][]tileVariantID, 2)
                if strings.HasSuffix(infile, ".1.fasta") || strings.HasSuffix(infile, ".1.fasta.gz") {
                        todo <- func() error {
+                               defer phases.Done()
                                log.Printf("%s starting", infile)
                                defer log.Printf("%s done", infile)
                                tseqs, err := cmd.tileFasta(tilelib, infile)
-                               variants[i*2] = tseqs.Variants()
+                               variants[0] = tseqs.Variants()
                                return err
                        }
                        infile2 := regexp.MustCompile(`\.1\.fasta(\.gz)?$`).ReplaceAllString(infile, `.2.fasta$1`)
                        todo <- func() error {
+                               defer phases.Done()
                                log.Printf("%s starting", infile2)
                                defer log.Printf("%s done", infile2)
                                tseqs, err := cmd.tileFasta(tilelib, infile2)
-                               variants[i*2+1] = tseqs.Variants()
+                               variants[1] = tseqs.Variants()
                                return err
                        }
                } else {
                        for phase := 0; phase < 2; phase++ {
                                phase := phase
                                todo <- func() error {
+                                       defer phases.Done()
                                        log.Printf("%s phase %d starting", infile, phase+1)
                                        defer log.Printf("%s phase %d done", infile, phase+1)
                                        tseqs, err := cmd.tileGVCF(tilelib, infile, phase)
-                                       variants[i*2+phase] = tseqs.Variants()
+                                       variants[phase] = tseqs.Variants()
                                        return err
                                }
                        }
                }
+               encodeJobs.Add(1)
+               go func() {
+                       defer encodeJobs.Done()
+                       phases.Wait()
+                       if len(errs) > 0 {
+                               return
+                       }
+                       ntags := len(variants[0])
+                       if ntags < len(variants[1]) {
+                               ntags = len(variants[1])
+                       }
+                       flat := make([]tileVariantID, ntags*2)
+                       for i := 0; i < ntags; i++ {
+                               flat[i*2] = variants[0][i]
+                               flat[i*2+1] = variants[1][i]
+                       }
+                       err := cmd.encoder.Encode(LibraryEntry{
+                               CompactGenomes: []CompactGenome{{Name: infile, Variants: flat}},
+                       })
+                       if err != nil {
+                               select {
+                               case errs <- err:
+                               default:
+                               }
+                       }
+               }()
        }
        go close(todo)
+       var tileJobs sync.WaitGroup
        running := int64(runtime.NumCPU())
        for i := 0; i < runtime.NumCPU(); i++ {
-               wg.Add(1)
+               tileJobs.Add(1)
                go func() {
-                       defer wg.Done()
+                       defer tileJobs.Done()
                        defer atomic.AddInt64(&running, -1)
                        for fn := range todo {
                                if len(errs) > 0 {
@@ -236,45 +268,13 @@ func (cmd *gvcf2numpy) tileGVCFs(tilelib *tileLibrary, infiles []string) ([][]ti
                        }
                }()
        }
-       wg.Wait()
+       tileJobs.Wait()
+       encodeJobs.Wait()
        go close(errs)
-       return variants, <-errs
-}
-
-func (cmd *gvcf2numpy) printVariants(variants [][]tileVariantID) error {
-       maxlen := 0
-       for _, v := range variants {
-               if maxlen < len(v) {
-                       maxlen = len(v)
-               }
-       }
-       rows := len(variants) / 2
-       cols := maxlen * 2
-       out := make([]uint16, rows*cols)
-       for row := 0; row < len(variants)/2; row++ {
-               for phase := 0; phase < 2; phase++ {
-                       for tag, variant := range variants[row*2+phase] {
-                               out[row*cols+2*int(tag)+phase] = uint16(variant)
-                       }
-               }
-       }
-       w := bufio.NewWriter(cmd.output)
-       npw, err := gonpy.NewWriter(nopCloser{w})
-       if err != nil {
-               return err
-       }
-       npw.Shape = []int{rows, cols}
-       npw.WriteUint16(out)
-       return w.Flush()
+       return <-errs
 }
 
-type nopCloser struct {
-       io.Writer
-}
-
-func (nopCloser) Close() error { return nil }
-
-func (cmd *gvcf2numpy) tileGVCF(tilelib *tileLibrary, infile string, phase int) (tileseq tileSeq, err error) {
+func (cmd *importer) tileGVCF(tilelib *tileLibrary, infile string, phase int) (tileseq tileSeq, err error) {
        args := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase + 1), infile}
        indexsuffix := ".tbi"
        if _, err := os.Stat(infile + ".csi"); err == nil {