Tweak cpu/mem usage.
authorTom Clegg <tom@tomclegg.ca>
Fri, 24 Sep 2021 18:08:59 +0000 (14:08 -0400)
committerTom Clegg <tom@tomclegg.ca>
Sat, 25 Sep 2021 13:47:15 +0000 (09:47 -0400)
refs #17966

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

slicenumpy.go

index a8d217fefa664c3d566a93554f01cf5224ef5367..557f0c55911d694436a29200ef1571aa62a7004e 100644 (file)
@@ -27,7 +27,8 @@ import (
 )
 
 type sliceNumpy struct {
-       filter filter
+       filter  filter
+       threads int
 }
 
 func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
@@ -48,6 +49,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
        ref := flags.String("ref", "", "reference name (if blank, choose last one that appears in input)")
        regionsFilename := flags.String("regions", "", "only output columns/annotations that intersect regions in specified bed `file`")
        expandRegions := flags.Int("expand-regions", 0, "expand specified regions by `N` base pairs on each side`")
+       flags.IntVar(&cmd.threads, "threads", 16, "number of memory-hungry assembly threads")
        cmd.filter.Flags(flags)
        err = flags.Parse(args)
        if err == flag.ErrHelp {
@@ -68,8 +70,8 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        Name:        "lightning slice-numpy",
                        Client:      arvados.NewClientFromEnv(),
                        ProjectUUID: *projectUUID,
-                       RAM:         250000000000,
-                       VCPUs:       32,
+                       RAM:         240000000000,
+                       VCPUs:       64,
                        Priority:    *priority,
                        KeepCache:   2,
                        APIAccess:   true,
@@ -82,6 +84,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        "-pprof", ":6060",
                        "-input-dir", *inputDir,
                        "-output-dir", "/mnt/output",
+                       "-threads", fmt.Sprintf("%d", cmd.threads),
                        "-regions", *regionsFilename,
                        "-expand-regions", fmt.Sprintf("%d", *expandRegions),
                }
@@ -177,10 +180,10 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                }
        }
        log.Info("loading reference tiles from all slices")
-       throttle1 := throttle{Max: runtime.GOMAXPROCS(0)}
+       throttleCPU := throttle{Max: runtime.GOMAXPROCS(0)}
        for _, infile := range infiles {
                infile := infile
-               throttle1.Go(func() error {
+               throttleCPU.Go(func() error {
                        defer log.Infof("%s: done", infile)
                        f, err := open(infile)
                        if err != nil {
@@ -197,12 +200,14 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        })
                })
        }
-       throttle1.Wait()
+       if err = throttleCPU.Wait(); err != nil {
+               return 1
+       }
 
        log.Info("reconstructing reference sequences")
        for seqname, cseq := range refseq {
                seqname, cseq := seqname, cseq
-               throttle1.Go(func() error {
+               throttleCPU.Go(func() error {
                        defer log.Printf("... %s done", seqname)
                        pos := 0
                        for _, libref := range cseq {
@@ -213,15 +218,17 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        return nil
                })
        }
-       throttle1.Wait()
+       throttleCPU.Wait()
 
        log.Info("TODO: determining which tiles intersect given regions")
 
+       throttleMem := throttle{Max: cmd.threads} // TODO: estimate using mem and data size
+       throttleNumpyMem := throttle{Max: cmd.threads/2 + 1}
        log.Info("generating annotations and numpy matrix for each slice")
        var done int64
        for infileIdx, infile := range infiles {
                infileIdx, infile := infileIdx, infile
-               throttle1.Go(func() error {
+               throttleMem.Go(func() error {
                        seq := make(map[tagID][]TileVariant, 50000)
                        cgs := make(map[string]CompactGenome, len(cgnames))
                        f, err := open(infile)
@@ -257,12 +264,12 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
 
                        log.Infof("renumber/dedup variants for tags %d-%d", tagstart, tagend)
                        variantRemap := make([][]tileVariantID, tagend-tagstart)
-                       throttle2 := throttle{Max: runtime.GOMAXPROCS(0)}
+                       throttleCPU := throttle{Max: runtime.GOMAXPROCS(0)}
                        for tag, variants := range seq {
                                tag, variants := tag, variants
-                               throttle2.Acquire()
+                               throttleCPU.Acquire()
                                go func() {
-                                       defer throttle2.Release()
+                                       defer throttleCPU.Release()
                                        count := make(map[[blake2b.Size256]byte]int, len(variants))
                                        for _, cg := range cgs {
                                                idx := (tag - tagstart) * 2
@@ -303,7 +310,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                                        variantRemap[tag-tagstart] = remap
                                }()
                        }
-                       throttle2.Wait()
+                       throttleCPU.Wait()
 
                        annotationsFilename := fmt.Sprintf("%s/matrix.%04d.annotations.csv", *outputDir, infileIdx)
                        log.Infof("writing %s", annotationsFilename)
@@ -347,6 +354,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                                return err
                        }
 
+                       throttleNumpyMem.Acquire()
                        log.Infof("%s: preparing numpy", infile)
                        rows := len(cgnames)
                        cols := 2 * int(tagend-tagstart)
@@ -361,6 +369,8 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                                        }
                                }
                        }
+                       seq = nil
+                       throttleNumpyMem.Release()
 
                        fnm := fmt.Sprintf("%s/matrix.%04d.npy", *outputDir, infileIdx)
                        output, err := os.Create(fnm)
@@ -392,7 +402,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        return nil
                })
        }
-       if err = throttle1.Wait(); err != nil {
+       if err = throttleMem.Wait(); err != nil {
                return 1
        }
        return 0