From 056c033351153c6e4f20ff70796a1dca14878302 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Tue, 14 Sep 2021 16:15:35 -0400 Subject: [PATCH] Fix mem usage, improve logging. refs #17996 Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- slice.go | 4 ++-- slicenumpy.go | 18 ++++++++++-------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/slice.go b/slice.go index 01ffb1fc18..50f219fb3c 100644 --- a/slice.go +++ b/slice.go @@ -60,7 +60,7 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std Client: arvados.NewClientFromEnv(), ProjectUUID: *projectUUID, RAM: 200000000000, - VCPUs: 4, + VCPUs: 32, Priority: *priority, KeepCache: 50, APIAccess: true, @@ -107,7 +107,7 @@ func Slice(dstdir, srcdir string, tagsPerFile int) error { encs []*gob.Encoder ) - throttle := throttle{Max: runtime.NumCPU()} + throttle := throttle{Max: runtime.GOMAXPROCS(0)} for _, path := range infiles { path := path throttle.Acquire() diff --git a/slicenumpy.go b/slicenumpy.go index 55b3c0a140..ba80151f3a 100644 --- a/slicenumpy.go +++ b/slicenumpy.go @@ -16,6 +16,7 @@ import ( "runtime" "sort" "strings" + "sync/atomic" "git.arvados.org/arvados.git/sdk/go/arvados" "github.com/arvados/lightning/hgvs" @@ -69,7 +70,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s RAM: 250000000000, VCPUs: 32, Priority: *priority, - KeepCache: 1, + KeepCache: 2, APIAccess: true, } err = runner.TranslatePaths(inputDir, regionsFilename) @@ -174,7 +175,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s } } log.Info("loading reference tiles from all slices") - throttle := throttle{Max: runtime.NumCPU()} + throttle := throttle{Max: runtime.GOMAXPROCS(0)} for _, infile := range infiles { infile := infile throttle.Go(func() error { @@ -215,10 +216,10 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s log.Info("TODO: determining which tiles intersect given regions") log.Info("generating annotations and numpy matrix for each slice") + var done int64 for infileIdx, infile := range infiles { infileIdx, infile := infileIdx, infile throttle.Go(func() error { - defer log.Infof("%s: done", infile) seq := map[tagID][][]byte{} cgs := make(map[string]CompactGenome, len(cgnames)) f, err := open(infile) @@ -226,6 +227,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s return err } defer f.Close() + log.Infof("reading %s", infile) err = DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error { for _, tv := range ent.TileVariants { variants := seq[tv.Tag] @@ -246,12 +248,11 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s tagstart := cgs[cgnames[0]].StartTag tagend := cgs[cgnames[0]].EndTag - log.Infof("TODO: %s: filtering", infile) - log.Infof("TODO: %s: tidying", infile) - log.Infof("TODO: %s: lowqual to -1", infile) + // TODO: filters + // TODO: tidy/renumber annotationsFilename := fmt.Sprintf("%s/matrix.%04d.annotations.csv", *outputDir, infileIdx) - log.Infof("%s: writing annotations to %s", infile, annotationsFilename) + log.Infof("writing %s", annotationsFilename) annof, err := os.Create(annotationsFilename) if err != nil { return err @@ -312,7 +313,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s return err } defer output.Close() - bufw := bufio.NewWriter(output) + bufw := bufio.NewWriterSize(output, 1<<26) npw, err := gonpy.NewWriter(nopCloser{bufw}) if err != nil { return err @@ -332,6 +333,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s if err != nil { return err } + log.Infof("%s: done (%d/%d)", infile, int(atomic.AddInt64(&done, 1)), len(infiles)) return nil }) } -- 2.30.2