Output tile variant arrays.
authorTom Clegg <tom@tomclegg.ca>
Tue, 14 Jan 2020 16:16:41 +0000 (11:16 -0500)
committerTom Clegg <tom@tomclegg.ca>
Tue, 14 Jan 2020 16:20:30 +0000 (11:20 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@tomclegg.ca>

gvcf2numpy.go
tilelib.go

index 1cf3a4274bb29a9190e39e6607cf2f8cee490e1b..87212434c497492e407e5a69e226c6ac85e53045 100644 (file)
@@ -15,6 +15,8 @@ import (
 type gvcf2numpy struct {
        tagLibraryFile string
        refFile        string
+       output         io.Writer
+       outputMtx      sync.Mutex
 }
 
 func (cmd *gvcf2numpy) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
@@ -41,6 +43,9 @@ func (cmd *gvcf2numpy) RunCommand(prog string, args []string, stdin io.Reader, s
                flags.Usage()
                return 2
        }
+       cmd.output = stdout
+
+       log.Printf("tag library %s load starting", cmd.tagLibraryFile)
        f, err := os.Open(cmd.tagLibraryFile)
        if err != nil {
                return 1
@@ -62,6 +67,8 @@ func (cmd *gvcf2numpy) RunCommand(prog string, args []string, stdin io.Reader, s
                err = fmt.Errorf("cannot tile: tag library is empty")
                return 1
        }
+       log.Printf("tag library %s load done", cmd.tagLibraryFile)
+
        tilelib := tileLibrary{taglib: &taglib}
        err = cmd.tileGVCFs(&tilelib, flags.Args())
        if err != nil {
@@ -78,6 +85,8 @@ func (cmd *gvcf2numpy) tileGVCFs(tilelib *tileLibrary, infiles []string) error {
                        wg.Add(1)
                        go func(infile string, phase int) {
                                defer wg.Done()
+                               log.Printf("%s phase %d starting", infile, phase+1)
+                               defer log.Printf("%s phase %d done", infile, phase+1)
                                tseq, err := cmd.tileGVCF(tilelib, infile, phase)
                                if err != nil {
                                        select {
@@ -114,11 +123,16 @@ func (cmd *gvcf2numpy) printVariants(label string, path []tileLibRef) {
                variant[tvar.tag] = tvar.variant
        }
 
-       excerpt := variant
-       if len(excerpt) > 100 {
-               excerpt = excerpt[:100]
+       {
+               excerpt := variant
+               if len(excerpt) > 100 {
+                       excerpt = excerpt[:100]
+               }
+               log.Printf("%q %v\n", label, excerpt)
        }
-       log.Printf("%s: %v...", label, excerpt)
+       cmd.outputMtx.Lock()
+       defer cmd.outputMtx.Unlock()
+       fmt.Fprintf(cmd.output, "%q %v\n", label, variant)
 }
 
 func (cmd *gvcf2numpy) tileGVCF(tilelib *tileLibrary, infile string, phase int) (tileseq tileSeq, err error) {
@@ -142,7 +156,7 @@ func (cmd *gvcf2numpy) tileGVCF(tilelib *tileLibrary, infile string, phase int)
        if err != nil {
                return
        }
-       tileseq, err = tilelib.TileFasta(stdout)
+       tileseq, err = tilelib.TileFasta(fmt.Sprintf("%s phase %d", infile, phase+1), stdout)
        if err != nil {
                return
        }
index a45b077bc764232506c60185a2fdc861f651a1d2..ed67c3c9fa88596ab973a347695fca88c04e451d 100644 (file)
@@ -5,6 +5,7 @@ import (
        "bytes"
        "crypto/md5"
        "io"
+       "log"
        "sync"
 )
 
@@ -21,14 +22,16 @@ type tileLibrary struct {
        taglib  *tagLibrary
        variant [][][md5.Size]byte
        // count [][]int
-       seq map[[md5.Size]byte][]byte
+       // seq map[[md5.Size]byte][]byte
 
        mtx sync.Mutex
 }
 
-func (tilelib *tileLibrary) TileFasta(rdr io.Reader) (tileSeq, error) {
+func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader) (tileSeq, error) {
        ret := tileSeq{}
-       flush := func(label string, fasta []byte) {
+       var wg sync.WaitGroup
+       flush := func(seqlabel string, fasta []byte) {
+               defer wg.Done()
                var path []tileLibRef
                if len(fasta) == 0 {
                        return
@@ -38,26 +41,26 @@ func (tilelib *tileLibrary) TileFasta(rdr io.Reader) (tileSeq, error) {
                tilelib.taglib.FindAll(fasta, func(id tagID, pos int) {
                        if tilestart >= 0 {
                                path = append(path, tilelib.getRef(tiletagid, fasta[tilestart:pos]))
-                               // log.Printf("%q: tile %d is variant %d of tile %d", label, len(path), path[len(path)-1], id)
                        }
                        tilestart = pos
                        tiletagid = id
                })
                if tiletagid >= 0 {
                        path = append(path, tilelib.getRef(tiletagid, fasta[tilestart:]))
-                       // log.Printf("%q: tile %d is variant %d of tile %d", label, len(path), path[len(path)-1], tiletagid)
                }
-               ret[label] = path
+               ret[seqlabel] = path
+               log.Printf("%s %s tiled with path len %d", filelabel, seqlabel, len(path))
        }
        var fasta []byte
-       var label string
+       var seqlabel string
        scanner := bufio.NewScanner(rdr)
        for scanner.Scan() {
                buf := scanner.Bytes()
                if len(buf) == 0 || buf[0] == '>' {
-                       flush(label, fasta)
+                       wg.Add(1)
+                       go flush(seqlabel, fasta)
                        fasta = nil
-                       label = string(buf[1:])
+                       seqlabel = string(buf[1:])
                } else {
                        fasta = append(fasta, bytes.ToLower(buf)...)
                }
@@ -65,7 +68,9 @@ func (tilelib *tileLibrary) TileFasta(rdr io.Reader) (tileSeq, error) {
        if err := scanner.Err(); err != nil {
                return nil, err
        }
-       flush(label, fasta)
+       wg.Add(1)
+       go flush(seqlabel, fasta)
+       wg.Wait()
        return ret, nil
 }
 
@@ -74,11 +79,11 @@ func (tilelib *tileLibrary) TileFasta(rdr io.Reader) (tileSeq, error) {
 func (tilelib *tileLibrary) getRef(tag tagID, seq []byte) tileLibRef {
        tilelib.mtx.Lock()
        defer tilelib.mtx.Unlock()
-       if tilelib.seq == nil {
-               tilelib.seq = map[[md5.Size]byte][]byte{}
-       }
-       for len(tilelib.variant) <= int(tag) {
-               tilelib.variant = append(tilelib.variant, nil)
+       // if tilelib.seq == nil {
+       //      tilelib.seq = map[[md5.Size]byte][]byte{}
+       // }
+       if len(tilelib.variant) <= int(tag) {
+               tilelib.variant = append(tilelib.variant, make([][][md5.Size]byte, int(tag)-len(tilelib.variant)+1)...)
        }
        seqhash := md5.Sum(seq)
        for i, varhash := range tilelib.variant[tag] {
@@ -87,6 +92,6 @@ func (tilelib *tileLibrary) getRef(tag tagID, seq []byte) tileLibRef {
                }
        }
        tilelib.variant[tag] = append(tilelib.variant[tag], seqhash)
-       tilelib.seq[seqhash] = append([]byte(nil), seq...)
+       // tilelib.seq[seqhash] = append([]byte(nil), seq...)
        return tileLibRef{tag: tag, variant: tileVariantID(len(tilelib.variant[tag]))}
 }