Avoid buffering entire output in memory.
authorTom Clegg <tom@tomclegg.ca>
Tue, 11 May 2021 05:41:09 +0000 (01:41 -0400)
committerTom Clegg <tom@tomclegg.ca>
Tue, 11 May 2021 05:41:09 +0000 (01:41 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

export.go

index 4dbef1910b661fab5f7a7ac36d36104c8e2f0135..00c2360dd46db5385e68da9c571f7ba182168121 100644 (file)
--- a/export.go
+++ b/export.go
@@ -14,6 +14,7 @@ import (
        "path"
        "sort"
        "strings"
+       "sync"
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
@@ -90,7 +91,7 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std
                        Name:        "lightning export",
                        Client:      arvados.NewClientFromEnv(),
                        ProjectUUID: *projectUUID,
-                       RAM:         700000000000,
+                       RAM:         750000000000,
                        VCPUs:       32,
                        Priority:    *priority,
                }
@@ -302,45 +303,56 @@ func (cmd *exporter) export(out, bedout io.Writer, librdr io.Reader, gz bool, ti
                return fmt.Errorf("%d needed tiles are missing from library", len(missing))
        }
 
-       log.Infof("assembling %d sequences concurrently", len(seqnames))
-       throttle := throttle{Max: 8}
        outbuf := make([]bytes.Buffer, len(seqnames))
        bedbuf := make([]bytes.Buffer, len(seqnames))
-       for seqidx, seqname := range seqnames {
-               seqname := seqname
-               outbuf := &outbuf[seqidx]
-               bedbuf := &bedbuf[seqidx]
-               if bedout == nil {
-                       bedbuf = nil
-               }
-               throttle.Acquire()
-               go func() {
-                       defer throttle.Release()
-                       cmd.exportSeq(outbuf, bedbuf, tilelib.taglib.keylen, seqname, refseq[seqname], tileVariant, cgs)
-                       log.Infof("assembled %q to outbuf %d bedbuf %d", seqname, outbuf.Len(), bedbuf.Len())
-               }()
+       ready := make([]chan struct{}, len(seqnames))
+       for i := range ready {
+               ready[i] = make(chan struct{})
        }
-       throttle.Wait()
 
-       throttle.Acquire()
+       var output sync.WaitGroup
+       output.Add(1)
        go func() {
-               defer throttle.Release()
+               defer output.Done()
                for i, seqname := range seqnames {
+                       <-ready[i]
                        log.Infof("writing outbuf %s", seqname)
                        io.Copy(out, &outbuf[i])
+                       log.Infof("writing outbuf %s done", seqname)
                }
        }()
-       if bedout != nil {
-               throttle.Acquire()
-               go func() {
-                       defer throttle.Release()
+       output.Add(1)
+       go func() {
+               defer output.Done()
+               if bedout != nil {
                        for i, seqname := range seqnames {
+                               <-ready[i]
                                log.Infof("writing bedbuf %s", seqname)
                                io.Copy(bedout, &bedbuf[i])
+                               log.Infof("writing bedbuf %s done", seqname)
                        }
+               }
+       }()
+
+       throttle := throttle{Max: 4}
+       log.Infof("assembling %d sequences in %d goroutines", len(seqnames), throttle.Max)
+       for seqidx, seqname := range seqnames {
+               seqidx, seqname := seqidx, seqname
+               outbuf := &outbuf[seqidx]
+               bedbuf := &bedbuf[seqidx]
+               if bedout == nil {
+                       bedbuf = nil
+               }
+               throttle.Acquire()
+               go func() {
+                       defer throttle.Release()
+                       defer close(ready[seqidx])
+                       cmd.exportSeq(outbuf, bedbuf, tilelib.taglib.keylen, seqname, refseq[seqname], tileVariant, cgs)
+                       log.Infof("assembled %q to outbuf %d bedbuf %d", seqname, outbuf.Len(), bedbuf.Len())
                }()
        }
-       throttle.Wait()
+
+       output.Wait()
        return nil
 }