Throttle goroutines and unflushed bufs to NCPUs.
[lightning.git] / export.go
index a37ec725bf80f4318eae2d676803391b93797414..4cd977b6800d1316557b0a40a2c65afb54709e86 100644 (file)
--- a/export.go
+++ b/export.go
@@ -1,4 +1,4 @@
-package main
+package lightning
 
 import (
        "bufio"
@@ -132,9 +132,9 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std
        var mtx sync.Mutex
        var cgs []CompactGenome
        tilelib := tileLibrary{
-               includeNoCalls: true,
+               retainNoCalls: true,
        }
-       err = tilelib.LoadGob(context.Background(), input, func(cg CompactGenome) {
+       err = tilelib.LoadGob(context.Background(), input, strings.HasSuffix(*inputFilename, ".gz"), func(cg CompactGenome) {
                if *pick != "" && *pick != cg.Name {
                        return
                }
@@ -188,7 +188,7 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std
                bedbufw = bufio.NewWriter(bedout)
        }
 
-       err = cmd.export(bufw, bedout, input, tilelib.taglib.keylen, refseq, cgs)
+       err = cmd.export(bufw, bedout, input, strings.HasSuffix(*inputFilename, ".gz"), tilelib.taglib.keylen, refseq, cgs)
        if err != nil {
                return 1
        }
@@ -217,7 +217,7 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std
        return 0
 }
 
-func (cmd *exporter) export(out, bedout io.Writer, librdr io.Reader, taglen int, refseq map[string][]tileLibRef, cgs []CompactGenome) error {
+func (cmd *exporter) export(out, bedout io.Writer, librdr io.Reader, gz bool, taglen int, refseq map[string][]tileLibRef, cgs []CompactGenome) error {
        need := map[tileLibRef]bool{}
        var seqnames []string
        for seqname, librefs := range refseq {
@@ -242,7 +242,7 @@ func (cmd *exporter) export(out, bedout io.Writer, librdr io.Reader, taglen int,
 
        log.Infof("export: loading %d tile variants", len(need))
        tileVariant := map[tileLibRef]TileVariant{}
-       err := DecodeLibrary(librdr, func(ent *LibraryEntry) error {
+       err := DecodeLibrary(librdr, gz, func(ent *LibraryEntry) error {
                for _, tv := range ent.TileVariants {
                        libref := tileLibRef{Tag: tv.Tag, Variant: tv.Variant}
                        if need[libref] {
@@ -272,7 +272,7 @@ func (cmd *exporter) export(out, bedout io.Writer, librdr io.Reader, taglen int,
        }
 
        log.Infof("assembling %d sequences concurrently", len(seqnames))
-       var wg sync.WaitGroup
+       throttle := throttle{Max: 8}
        outbuf := make([]bytes.Buffer, len(seqnames))
        bedbuf := make([]bytes.Buffer, len(seqnames))
        for seqidx, seqname := range seqnames {
@@ -282,35 +282,34 @@ func (cmd *exporter) export(out, bedout io.Writer, librdr io.Reader, taglen int,
                if bedout == nil {
                        bedbuf = nil
                }
-               // TODO: limit number of goroutines and unflushed bufs to ncpus
-               wg.Add(1)
+               throttle.Acquire()
                go func() {
-                       defer wg.Done()
+                       defer throttle.Release()
                        cmd.exportSeq(outbuf, bedbuf, taglen, seqname, refseq[seqname], tileVariant, cgs)
                        log.Infof("assembled %q to outbuf %d bedbuf %d", seqname, outbuf.Len(), bedbuf.Len())
                }()
        }
-       wg.Wait()
+       throttle.Wait()
 
-       wg.Add(1)
+       throttle.Acquire()
        go func() {
-               defer wg.Done()
+               defer throttle.Release()
                for i, seqname := range seqnames {
                        log.Infof("writing outbuf %s", seqname)
                        io.Copy(out, &outbuf[i])
                }
        }()
        if bedout != nil {
-               wg.Add(1)
+               throttle.Acquire()
                go func() {
-                       defer wg.Done()
+                       defer throttle.Release()
                        for i, seqname := range seqnames {
                                log.Infof("writing bedbuf %s", seqname)
                                io.Copy(bedout, &bedbuf[i])
                        }
                }()
        }
-       wg.Wait()
+       throttle.Wait()
        return nil
 }
 
@@ -336,6 +335,12 @@ func (cmd *exporter) exportSeq(outw, bedw io.Writer, taglen int, seqname string,
                                        continue
                                }
                                genometile := tileVariant[tileLibRef{Tag: libref.Tag, Variant: variant}]
+                               if len(genometile.Sequence) == 0 {
+                                       // Hash is known but sequence
+                                       // is not, e.g., retainNoCalls
+                                       // was false during import
+                                       continue
+                               }
                                refSequence := reftile.Sequence
                                // If needed, extend the reference
                                // sequence up to the tag at the end
@@ -399,8 +404,13 @@ func (cmd *exporter) exportSeq(outw, bedw io.Writer, taglen int, seqname string,
                                thickstart = 0
                        }
                        thickend := refpos
+
                        // coverage score, 0 to 1000
-                       score := 1000 * tagcoverage / len(cgs) / 2
+                       score := 1000
+                       if len(cgs) > 0 {
+                               score = 1000 * tagcoverage / len(cgs) / 2
+                       }
+
                        fmt.Fprintf(bedw, "%s %d %d %d %d . %d %d\n",
                                seqname, tilestart, tileend,
                                libref.Tag,