Fix mem usage, improve logging.
authorTom Clegg <tom@tomclegg.ca>
Tue, 14 Sep 2021 20:15:35 +0000 (16:15 -0400)
committerTom Clegg <tom@tomclegg.ca>
Tue, 14 Sep 2021 20:15:35 +0000 (16:15 -0400)
refs #17996

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

slice.go
slicenumpy.go

index 01ffb1fc18bc818557281845c4fddfa66d7a9669..50f219fb3cb6fba95d2b61817085a83f3ebf070b 100644 (file)
--- a/slice.go
+++ b/slice.go
@@ -60,7 +60,7 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std
                        Client:      arvados.NewClientFromEnv(),
                        ProjectUUID: *projectUUID,
                        RAM:         200000000000,
-                       VCPUs:       4,
+                       VCPUs:       32,
                        Priority:    *priority,
                        KeepCache:   50,
                        APIAccess:   true,
@@ -107,7 +107,7 @@ func Slice(dstdir, srcdir string, tagsPerFile int) error {
                encs       []*gob.Encoder
        )
 
-       throttle := throttle{Max: runtime.NumCPU()}
+       throttle := throttle{Max: runtime.GOMAXPROCS(0)}
        for _, path := range infiles {
                path := path
                throttle.Acquire()
index 55b3c0a140861f72a436f6739f42344f0ed9680a..ba80151f3ac22679f70b3d7a59232df3a7ec2c38 100644 (file)
@@ -16,6 +16,7 @@ import (
        "runtime"
        "sort"
        "strings"
+       "sync/atomic"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "github.com/arvados/lightning/hgvs"
@@ -69,7 +70,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        RAM:         250000000000,
                        VCPUs:       32,
                        Priority:    *priority,
-                       KeepCache:   1,
+                       KeepCache:   2,
                        APIAccess:   true,
                }
                err = runner.TranslatePaths(inputDir, regionsFilename)
@@ -174,7 +175,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                }
        }
        log.Info("loading reference tiles from all slices")
-       throttle := throttle{Max: runtime.NumCPU()}
+       throttle := throttle{Max: runtime.GOMAXPROCS(0)}
        for _, infile := range infiles {
                infile := infile
                throttle.Go(func() error {
@@ -215,10 +216,10 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
        log.Info("TODO: determining which tiles intersect given regions")
 
        log.Info("generating annotations and numpy matrix for each slice")
+       var done int64
        for infileIdx, infile := range infiles {
                infileIdx, infile := infileIdx, infile
                throttle.Go(func() error {
-                       defer log.Infof("%s: done", infile)
                        seq := map[tagID][][]byte{}
                        cgs := make(map[string]CompactGenome, len(cgnames))
                        f, err := open(infile)
@@ -226,6 +227,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                                return err
                        }
                        defer f.Close()
+                       log.Infof("reading %s", infile)
                        err = DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error {
                                for _, tv := range ent.TileVariants {
                                        variants := seq[tv.Tag]
@@ -246,12 +248,11 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        tagstart := cgs[cgnames[0]].StartTag
                        tagend := cgs[cgnames[0]].EndTag
 
-                       log.Infof("TODO: %s: filtering", infile)
-                       log.Infof("TODO: %s: tidying", infile)
-                       log.Infof("TODO: %s: lowqual to -1", infile)
+                       // TODO: filters
+                       // TODO: tidy/renumber
 
                        annotationsFilename := fmt.Sprintf("%s/matrix.%04d.annotations.csv", *outputDir, infileIdx)
-                       log.Infof("%s: writing annotations to %s", infile, annotationsFilename)
+                       log.Infof("writing %s", annotationsFilename)
                        annof, err := os.Create(annotationsFilename)
                        if err != nil {
                                return err
@@ -312,7 +313,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                                return err
                        }
                        defer output.Close()
-                       bufw := bufio.NewWriter(output)
+                       bufw := bufio.NewWriterSize(output, 1<<26)
                        npw, err := gonpy.NewWriter(nopCloser{bufw})
                        if err != nil {
                                return err
@@ -332,6 +333,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        if err != nil {
                                return err
                        }
+                       log.Infof("%s: done (%d/%d)", infile, int(atomic.AddInt64(&done, 1)), len(infiles))
                        return nil
                })
        }