From 03dd36bef9bc9a5d59a5f6397bbd4ca3bdf2a58d Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Tue, 14 Jan 2020 12:20:33 -0500 Subject: [PATCH] limit concurrency Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- gvcf2numpy.go | 20 +++++++++------- tilelib.go | 65 ++++++++++++++++++++++++++------------------------- 2 files changed, 44 insertions(+), 41 deletions(-) diff --git a/gvcf2numpy.go b/gvcf2numpy.go index 80345dc370..890614d189 100644 --- a/gvcf2numpy.go +++ b/gvcf2numpy.go @@ -99,9 +99,7 @@ func (cmd *gvcf2numpy) tileGVCFs(tilelib *tileLibrary, infiles []string) error { } return } - for chr, path := range tseq { - cmd.printVariants(fmt.Sprintf("%s chr %s phase %d", infile, chr, phase+1), path) - } + cmd.printVariants(fmt.Sprintf("%s phase %d", infile, phase+1), tseq) }(infile, phase) } } @@ -115,16 +113,20 @@ func (cmd *gvcf2numpy) tileGVCFs(tilelib *tileLibrary, infiles []string) error { return nil } -func (cmd *gvcf2numpy) printVariants(label string, path []tileLibRef) { +func (cmd *gvcf2numpy) printVariants(label string, tseq map[string][]tileLibRef) { maxtag := tagID(-1) - for _, tvar := range path { - if maxtag < tvar.tag { - maxtag = tvar.tag + for _, path := range tseq { + for _, tvar := range path { + if maxtag < tvar.tag { + maxtag = tvar.tag + } } } variant := make([]tileVariantID, maxtag+1) - for _, tvar := range path { - variant[tvar.tag] = tvar.variant + for _, path := range tseq { + for _, tvar := range path { + variant[tvar.tag] = tvar.variant + } } { diff --git a/tilelib.go b/tilelib.go index ed67c3c9fa..894364488e 100644 --- a/tilelib.go +++ b/tilelib.go @@ -29,49 +29,50 @@ type tileLibrary struct { func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader) (tileSeq, error) { ret := tileSeq{} - var wg sync.WaitGroup - flush := func(seqlabel string, fasta []byte) { - defer wg.Done() - var path []tileLibRef - if len(fasta) == 0 { - return + type jobT struct { + label string + fasta []byte + } + todo := make(chan jobT) + scanner := bufio.NewScanner(rdr) + go func() { + defer close(todo) + var fasta []byte + var seqlabel string + for scanner.Scan() { + buf := scanner.Bytes() + if len(buf) == 0 || buf[0] == '>' { + todo <- jobT{seqlabel, fasta} + seqlabel, fasta = string(buf[1:]), nil + log.Printf("%s %s reading fasta", filelabel, seqlabel) + } else { + fasta = append(fasta, bytes.ToLower(buf)...) + } + } + todo <- jobT{seqlabel, fasta} + }() + for job := range todo { + if len(job.fasta) == 0 { + continue } + log.Printf("%s %s tiling", filelabel, job.label) + var path []tileLibRef tilestart := -1 // position in fasta of tile that ends here tiletagid := tagID(-1) // tag id starting tile that ends here - tilelib.taglib.FindAll(fasta, func(id tagID, pos int) { + tilelib.taglib.FindAll(job.fasta, func(id tagID, pos int) { if tilestart >= 0 { - path = append(path, tilelib.getRef(tiletagid, fasta[tilestart:pos])) + path = append(path, tilelib.getRef(tiletagid, job.fasta[tilestart:pos])) } tilestart = pos tiletagid = id }) if tiletagid >= 0 { - path = append(path, tilelib.getRef(tiletagid, fasta[tilestart:])) + path = append(path, tilelib.getRef(tiletagid, job.fasta[tilestart:])) } - ret[seqlabel] = path - log.Printf("%s %s tiled with path len %d", filelabel, seqlabel, len(path)) - } - var fasta []byte - var seqlabel string - scanner := bufio.NewScanner(rdr) - for scanner.Scan() { - buf := scanner.Bytes() - if len(buf) == 0 || buf[0] == '>' { - wg.Add(1) - go flush(seqlabel, fasta) - fasta = nil - seqlabel = string(buf[1:]) - } else { - fasta = append(fasta, bytes.ToLower(buf)...) - } - } - if err := scanner.Err(); err != nil { - return nil, err + ret[job.label] = path + log.Printf("%s %s tiled with path len %d", filelabel, job.label, len(path)) } - wg.Add(1) - go flush(seqlabel, fasta) - wg.Wait() - return ret, nil + return ret, scanner.Err() } // Return a tileLibRef for a tile with the given tag and sequence, -- 2.39.5