From 6cde9a52e1e146ff559e42659400eefb4cd5808e Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Fri, 24 Sep 2021 14:08:59 -0400 Subject: [PATCH] Tweak cpu/mem usage. refs #17966 Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- slicenumpy.go | 38 ++++++++++++++++++++++++-------------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/slicenumpy.go b/slicenumpy.go index a8d217fefa..557f0c5591 100644 --- a/slicenumpy.go +++ b/slicenumpy.go @@ -27,7 +27,8 @@ import ( ) type sliceNumpy struct { - filter filter + filter filter + threads int } func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { @@ -48,6 +49,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s ref := flags.String("ref", "", "reference name (if blank, choose last one that appears in input)") regionsFilename := flags.String("regions", "", "only output columns/annotations that intersect regions in specified bed `file`") expandRegions := flags.Int("expand-regions", 0, "expand specified regions by `N` base pairs on each side`") + flags.IntVar(&cmd.threads, "threads", 16, "number of memory-hungry assembly threads") cmd.filter.Flags(flags) err = flags.Parse(args) if err == flag.ErrHelp { @@ -68,8 +70,8 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s Name: "lightning slice-numpy", Client: arvados.NewClientFromEnv(), ProjectUUID: *projectUUID, - RAM: 250000000000, - VCPUs: 32, + RAM: 240000000000, + VCPUs: 64, Priority: *priority, KeepCache: 2, APIAccess: true, @@ -82,6 +84,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s "-pprof", ":6060", "-input-dir", *inputDir, "-output-dir", "/mnt/output", + "-threads", fmt.Sprintf("%d", cmd.threads), "-regions", *regionsFilename, "-expand-regions", fmt.Sprintf("%d", *expandRegions), } @@ -177,10 +180,10 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s } } log.Info("loading reference tiles from all slices") - throttle1 := throttle{Max: runtime.GOMAXPROCS(0)} + throttleCPU := throttle{Max: runtime.GOMAXPROCS(0)} for _, infile := range infiles { infile := infile - throttle1.Go(func() error { + throttleCPU.Go(func() error { defer log.Infof("%s: done", infile) f, err := open(infile) if err != nil { @@ -197,12 +200,14 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s }) }) } - throttle1.Wait() + if err = throttleCPU.Wait(); err != nil { + return 1 + } log.Info("reconstructing reference sequences") for seqname, cseq := range refseq { seqname, cseq := seqname, cseq - throttle1.Go(func() error { + throttleCPU.Go(func() error { defer log.Printf("... %s done", seqname) pos := 0 for _, libref := range cseq { @@ -213,15 +218,17 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s return nil }) } - throttle1.Wait() + throttleCPU.Wait() log.Info("TODO: determining which tiles intersect given regions") + throttleMem := throttle{Max: cmd.threads} // TODO: estimate using mem and data size + throttleNumpyMem := throttle{Max: cmd.threads/2 + 1} log.Info("generating annotations and numpy matrix for each slice") var done int64 for infileIdx, infile := range infiles { infileIdx, infile := infileIdx, infile - throttle1.Go(func() error { + throttleMem.Go(func() error { seq := make(map[tagID][]TileVariant, 50000) cgs := make(map[string]CompactGenome, len(cgnames)) f, err := open(infile) @@ -257,12 +264,12 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s log.Infof("renumber/dedup variants for tags %d-%d", tagstart, tagend) variantRemap := make([][]tileVariantID, tagend-tagstart) - throttle2 := throttle{Max: runtime.GOMAXPROCS(0)} + throttleCPU := throttle{Max: runtime.GOMAXPROCS(0)} for tag, variants := range seq { tag, variants := tag, variants - throttle2.Acquire() + throttleCPU.Acquire() go func() { - defer throttle2.Release() + defer throttleCPU.Release() count := make(map[[blake2b.Size256]byte]int, len(variants)) for _, cg := range cgs { idx := (tag - tagstart) * 2 @@ -303,7 +310,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s variantRemap[tag-tagstart] = remap }() } - throttle2.Wait() + throttleCPU.Wait() annotationsFilename := fmt.Sprintf("%s/matrix.%04d.annotations.csv", *outputDir, infileIdx) log.Infof("writing %s", annotationsFilename) @@ -347,6 +354,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s return err } + throttleNumpyMem.Acquire() log.Infof("%s: preparing numpy", infile) rows := len(cgnames) cols := 2 * int(tagend-tagstart) @@ -361,6 +369,8 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s } } } + seq = nil + throttleNumpyMem.Release() fnm := fmt.Sprintf("%s/matrix.%04d.npy", *outputDir, infileIdx) output, err := os.Create(fnm) @@ -392,7 +402,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s return nil }) } - if err = throttle1.Wait(); err != nil { + if err = throttleMem.Wait(); err != nil { return 1 } return 0 -- 2.30.2