From c5be98ec79b8bd3dbd8d129daa6648b0f8ac5b61 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Wed, 29 Dec 2021 19:19:26 -0500 Subject: [PATCH] Fix blocking on gob encode. refs #18438 Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- slice_test.go | 2 ++ slicenumpy.go | 37 +++++++++++++++++++++++++------------ 2 files changed, 27 insertions(+), 12 deletions(-) diff --git a/slice_test.go b/slice_test.go index 15dc220a74..5b3479d2fb 100644 --- a/slice_test.go +++ b/slice_test.go @@ -134,6 +134,7 @@ func (s *sliceSuite) TestImportAndSlice(c *check.C) { "-regions=" + tmpdir + "/chr1-12-100.bed", "-input-dir=" + slicedir, "-output-dir=" + npydir, + "-chunked-hgvs-matrix=true", }, nil, os.Stderr, os.Stderr) c.Check(exited, check.Equals, 0) out, _ := exec.Command("find", npydir, "-ls").CombinedOutput() @@ -182,6 +183,7 @@ func (s *sliceSuite) TestImportAndSlice(c *check.C) { "-input-dir=" + slicedir, "-output-dir=" + npydir, "-merge-output=true", + "-single-hgvs-matrix=true", }, nil, os.Stderr, os.Stderr) c.Check(exited, check.Equals, 0) out, _ := exec.Command("find", npydir, "-ls").CombinedOutput() diff --git a/slicenumpy.go b/slicenumpy.go index 670836a421..4e66938eec 100644 --- a/slicenumpy.go +++ b/slicenumpy.go @@ -249,9 +249,10 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s log.Printf("after applying mask, len(reftile) == %d", len(reftile)) } + type hgvsColSet map[hgvs.Variant][2][]int8 + encodeHGVS := throttle{Max: len(refseq)} + encodeHGVSTodo := map[string]chan hgvsColSet{} tmpHGVSCols := map[string]*os.File{} - bufHGVSCols := map[string]*bufio.Writer{} - encodeHGVSCols := map[string]*gob.Encoder{} if *hgvsChunked { for seqname := range refseq { var f *os.File @@ -263,8 +264,20 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s bufw := bufio.NewWriterSize(f, 1<<24) enc := gob.NewEncoder(bufw) tmpHGVSCols[seqname] = f - bufHGVSCols[seqname] = bufw - encodeHGVSCols[seqname] = enc + todo := make(chan hgvsColSet, 128) + encodeHGVSTodo[seqname] = todo + encodeHGVS.Go(func() error { + for colset := range todo { + err := enc.Encode(colset) + if err != nil { + encodeHGVS.Report(err) + for range todo { + } + return err + } + } + return bufw.Flush() + }) } } @@ -457,7 +470,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s // =ref or a different variant in that // position, or (-1) is lacking // coverage / couldn't be diffed. - hgvsCol := map[hgvs.Variant][2][]int8{} + hgvsCol := hgvsColSet{} for _, diffs := range variantDiffs { for _, diff := range diffs { if _, ok := hgvsCol[diff]; ok { @@ -492,7 +505,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s } } } - encodeHGVSCols[rt.seqname].Encode(hgvsCol) + encodeHGVSTodo[rt.seqname] <- hgvsCol } outcol++ } @@ -554,11 +567,11 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s if *hgvsChunked { log.Info("flushing hgvsCols temp files") for seqname := range refseq { - err = bufHGVSCols[seqname].Flush() - if err != nil { - return 1 - } - bufHGVSCols[seqname] = nil // free buffer memory + close(encodeHGVSTodo[seqname]) + } + err = encodeHGVS.Wait() + if err != nil { + return 1 } for seqname := range refseq { log.Infof("%s: reading hgvsCols from temp file", seqname) @@ -567,7 +580,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s if err != nil { return 1 } - var hgvsCols map[hgvs.Variant][2][]int8 + var hgvsCols hgvsColSet dec := gob.NewDecoder(bufio.NewReaderSize(f, 1<<24)) for err == nil { err = dec.Decode(&hgvsCols) -- 2.30.2