limit concurrency
authorTom Clegg <tom@tomclegg.ca>
Tue, 14 Jan 2020 17:20:33 +0000 (12:20 -0500)
committerTom Clegg <tom@tomclegg.ca>
Tue, 14 Jan 2020 17:20:33 +0000 (12:20 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@tomclegg.ca>

gvcf2numpy.go
tilelib.go

index 80345dc3702cef3525c2968d0197a842e9540532..890614d1891ad2c5e42f1b6b920a8cdd40df064d 100644 (file)
@@ -99,9 +99,7 @@ func (cmd *gvcf2numpy) tileGVCFs(tilelib *tileLibrary, infiles []string) error {
                                        }
                                        return
                                }
-                               for chr, path := range tseq {
-                                       cmd.printVariants(fmt.Sprintf("%s chr %s phase %d", infile, chr, phase+1), path)
-                               }
+                               cmd.printVariants(fmt.Sprintf("%s phase %d", infile, phase+1), tseq)
                        }(infile, phase)
                }
        }
@@ -115,16 +113,20 @@ func (cmd *gvcf2numpy) tileGVCFs(tilelib *tileLibrary, infiles []string) error {
        return nil
 }
 
-func (cmd *gvcf2numpy) printVariants(label string, path []tileLibRef) {
+func (cmd *gvcf2numpy) printVariants(label string, tseq map[string][]tileLibRef) {
        maxtag := tagID(-1)
-       for _, tvar := range path {
-               if maxtag < tvar.tag {
-                       maxtag = tvar.tag
+       for _, path := range tseq {
+               for _, tvar := range path {
+                       if maxtag < tvar.tag {
+                               maxtag = tvar.tag
+                       }
                }
        }
        variant := make([]tileVariantID, maxtag+1)
-       for _, tvar := range path {
-               variant[tvar.tag] = tvar.variant
+       for _, path := range tseq {
+               for _, tvar := range path {
+                       variant[tvar.tag] = tvar.variant
+               }
        }
 
        {
index ed67c3c9fa88596ab973a347695fca88c04e451d..894364488eea93cf583c9609af78062b0590032c 100644 (file)
@@ -29,49 +29,50 @@ type tileLibrary struct {
 
 func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader) (tileSeq, error) {
        ret := tileSeq{}
-       var wg sync.WaitGroup
-       flush := func(seqlabel string, fasta []byte) {
-               defer wg.Done()
-               var path []tileLibRef
-               if len(fasta) == 0 {
-                       return
+       type jobT struct {
+               label string
+               fasta []byte
+       }
+       todo := make(chan jobT)
+       scanner := bufio.NewScanner(rdr)
+       go func() {
+               defer close(todo)
+               var fasta []byte
+               var seqlabel string
+               for scanner.Scan() {
+                       buf := scanner.Bytes()
+                       if len(buf) == 0 || buf[0] == '>' {
+                               todo <- jobT{seqlabel, fasta}
+                               seqlabel, fasta = string(buf[1:]), nil
+                               log.Printf("%s %s reading fasta", filelabel, seqlabel)
+                       } else {
+                               fasta = append(fasta, bytes.ToLower(buf)...)
+                       }
+               }
+               todo <- jobT{seqlabel, fasta}
+       }()
+       for job := range todo {
+               if len(job.fasta) == 0 {
+                       continue
                }
+               log.Printf("%s %s tiling", filelabel, job.label)
+               var path []tileLibRef
                tilestart := -1        // position in fasta of tile that ends here
                tiletagid := tagID(-1) // tag id starting tile that ends here
-               tilelib.taglib.FindAll(fasta, func(id tagID, pos int) {
+               tilelib.taglib.FindAll(job.fasta, func(id tagID, pos int) {
                        if tilestart >= 0 {
-                               path = append(path, tilelib.getRef(tiletagid, fasta[tilestart:pos]))
+                               path = append(path, tilelib.getRef(tiletagid, job.fasta[tilestart:pos]))
                        }
                        tilestart = pos
                        tiletagid = id
                })
                if tiletagid >= 0 {
-                       path = append(path, tilelib.getRef(tiletagid, fasta[tilestart:]))
+                       path = append(path, tilelib.getRef(tiletagid, job.fasta[tilestart:]))
                }
-               ret[seqlabel] = path
-               log.Printf("%s %s tiled with path len %d", filelabel, seqlabel, len(path))
-       }
-       var fasta []byte
-       var seqlabel string
-       scanner := bufio.NewScanner(rdr)
-       for scanner.Scan() {
-               buf := scanner.Bytes()
-               if len(buf) == 0 || buf[0] == '>' {
-                       wg.Add(1)
-                       go flush(seqlabel, fasta)
-                       fasta = nil
-                       seqlabel = string(buf[1:])
-               } else {
-                       fasta = append(fasta, bytes.ToLower(buf)...)
-               }
-       }
-       if err := scanner.Err(); err != nil {
-               return nil, err
+               ret[job.label] = path
+               log.Printf("%s %s tiled with path len %d", filelabel, job.label, len(path))
        }
-       wg.Add(1)
-       go flush(seqlabel, fasta)
-       wg.Wait()
-       return ret, nil
+       return ret, scanner.Err()
 }
 
 // Return a tileLibRef for a tile with the given tag and sequence,