"path"
"sort"
"strings"
+ "sync"
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
Name: "lightning export",
Client: arvados.NewClientFromEnv(),
ProjectUUID: *projectUUID,
- RAM: 700000000000,
+ RAM: 750000000000,
VCPUs: 32,
Priority: *priority,
}
return fmt.Errorf("%d needed tiles are missing from library", len(missing))
}
- log.Infof("assembling %d sequences concurrently", len(seqnames))
- throttle := throttle{Max: 8}
outbuf := make([]bytes.Buffer, len(seqnames))
bedbuf := make([]bytes.Buffer, len(seqnames))
- for seqidx, seqname := range seqnames {
- seqname := seqname
- outbuf := &outbuf[seqidx]
- bedbuf := &bedbuf[seqidx]
- if bedout == nil {
- bedbuf = nil
- }
- throttle.Acquire()
- go func() {
- defer throttle.Release()
- cmd.exportSeq(outbuf, bedbuf, tilelib.taglib.keylen, seqname, refseq[seqname], tileVariant, cgs)
- log.Infof("assembled %q to outbuf %d bedbuf %d", seqname, outbuf.Len(), bedbuf.Len())
- }()
+ ready := make([]chan struct{}, len(seqnames))
+ for i := range ready {
+ ready[i] = make(chan struct{})
}
- throttle.Wait()
- throttle.Acquire()
+ var output sync.WaitGroup
+ output.Add(1)
go func() {
- defer throttle.Release()
+ defer output.Done()
for i, seqname := range seqnames {
+ <-ready[i]
log.Infof("writing outbuf %s", seqname)
io.Copy(out, &outbuf[i])
+ log.Infof("writing outbuf %s done", seqname)
}
}()
- if bedout != nil {
- throttle.Acquire()
- go func() {
- defer throttle.Release()
+ output.Add(1)
+ go func() {
+ defer output.Done()
+ if bedout != nil {
for i, seqname := range seqnames {
+ <-ready[i]
log.Infof("writing bedbuf %s", seqname)
io.Copy(bedout, &bedbuf[i])
+ log.Infof("writing bedbuf %s done", seqname)
}
+ }
+ }()
+
+ throttle := throttle{Max: 4}
+ log.Infof("assembling %d sequences in %d goroutines", len(seqnames), throttle.Max)
+ for seqidx, seqname := range seqnames {
+ seqidx, seqname := seqidx, seqname
+ outbuf := &outbuf[seqidx]
+ bedbuf := &bedbuf[seqidx]
+ if bedout == nil {
+ bedbuf = nil
+ }
+ throttle.Acquire()
+ go func() {
+ defer throttle.Release()
+ defer close(ready[seqidx])
+ cmd.exportSeq(outbuf, bedbuf, tilelib.taglib.keylen, seqname, refseq[seqname], tileVariant, cgs)
+ log.Infof("assembled %q to outbuf %d bedbuf %d", seqname, outbuf.Len(), bedbuf.Len())
}()
}
- throttle.Wait()
+
+ output.Wait()
return nil
}