Throttle goroutines and unflushed bufs to NCPUs.
[lightning.git] / export.go
index b6b160ddb79311fe3a5964b25e2ca0efe482abdc..4cd977b6800d1316557b0a40a2c65afb54709e86 100644 (file)
--- a/export.go
+++ b/export.go
@@ -1,4 +1,4 @@
-package main
+package lightning
 
 import (
        "bufio"
@@ -272,7 +272,7 @@ func (cmd *exporter) export(out, bedout io.Writer, librdr io.Reader, gz bool, ta
        }
 
        log.Infof("assembling %d sequences concurrently", len(seqnames))
-       var wg sync.WaitGroup
+       throttle := throttle{Max: 8}
        outbuf := make([]bytes.Buffer, len(seqnames))
        bedbuf := make([]bytes.Buffer, len(seqnames))
        for seqidx, seqname := range seqnames {
@@ -282,35 +282,34 @@ func (cmd *exporter) export(out, bedout io.Writer, librdr io.Reader, gz bool, ta
                if bedout == nil {
                        bedbuf = nil
                }
-               // TODO: limit number of goroutines and unflushed bufs to ncpus
-               wg.Add(1)
+               throttle.Acquire()
                go func() {
-                       defer wg.Done()
+                       defer throttle.Release()
                        cmd.exportSeq(outbuf, bedbuf, taglen, seqname, refseq[seqname], tileVariant, cgs)
                        log.Infof("assembled %q to outbuf %d bedbuf %d", seqname, outbuf.Len(), bedbuf.Len())
                }()
        }
-       wg.Wait()
+       throttle.Wait()
 
-       wg.Add(1)
+       throttle.Acquire()
        go func() {
-               defer wg.Done()
+               defer throttle.Release()
                for i, seqname := range seqnames {
                        log.Infof("writing outbuf %s", seqname)
                        io.Copy(out, &outbuf[i])
                }
        }()
        if bedout != nil {
-               wg.Add(1)
+               throttle.Acquire()
                go func() {
-                       defer wg.Done()
+                       defer throttle.Release()
                        for i, seqname := range seqnames {
                                log.Infof("writing bedbuf %s", seqname)
                                io.Copy(bedout, &bedbuf[i])
                        }
                }()
        }
-       wg.Wait()
+       throttle.Wait()
        return nil
 }
 
@@ -405,8 +404,13 @@ func (cmd *exporter) exportSeq(outw, bedw io.Writer, taglen int, seqname string,
                                thickstart = 0
                        }
                        thickend := refpos
+
                        // coverage score, 0 to 1000
-                       score := 1000 * tagcoverage / len(cgs) / 2
+                       score := 1000
+                       if len(cgs) > 0 {
+                               score = 1000 * tagcoverage / len(cgs) / 2
+                       }
+
                        fmt.Fprintf(bedw, "%s %d %d %d %d . %d %d\n",
                                seqname, tilestart, tileend,
                                libref.Tag,