Fix blocking on gob encode.
authorTom Clegg <tom@curii.com>
Thu, 30 Dec 2021 00:19:26 +0000 (19:19 -0500)
committerTom Clegg <tom@curii.com>
Thu, 30 Dec 2021 00:19:26 +0000 (19:19 -0500)
refs #18438

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

slice_test.go
slicenumpy.go

index 15dc220a741f585e240fd48c19916082076a989f..5b3479d2fb700a2a3c26dd3166e282959f56c158 100644 (file)
@@ -134,6 +134,7 @@ func (s *sliceSuite) TestImportAndSlice(c *check.C) {
                        "-regions=" + tmpdir + "/chr1-12-100.bed",
                        "-input-dir=" + slicedir,
                        "-output-dir=" + npydir,
+                       "-chunked-hgvs-matrix=true",
                }, nil, os.Stderr, os.Stderr)
                c.Check(exited, check.Equals, 0)
                out, _ := exec.Command("find", npydir, "-ls").CombinedOutput()
@@ -182,6 +183,7 @@ func (s *sliceSuite) TestImportAndSlice(c *check.C) {
                        "-input-dir=" + slicedir,
                        "-output-dir=" + npydir,
                        "-merge-output=true",
+                       "-single-hgvs-matrix=true",
                }, nil, os.Stderr, os.Stderr)
                c.Check(exited, check.Equals, 0)
                out, _ := exec.Command("find", npydir, "-ls").CombinedOutput()
index 670836a4210c3871cbaa49bf3f18b4d6ee6e516f..4e66938eecc76d608fea3b6e7af46614b91b8aeb 100644 (file)
@@ -249,9 +249,10 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                log.Printf("after applying mask, len(reftile) == %d", len(reftile))
        }
 
+       type hgvsColSet map[hgvs.Variant][2][]int8
+       encodeHGVS := throttle{Max: len(refseq)}
+       encodeHGVSTodo := map[string]chan hgvsColSet{}
        tmpHGVSCols := map[string]*os.File{}
-       bufHGVSCols := map[string]*bufio.Writer{}
-       encodeHGVSCols := map[string]*gob.Encoder{}
        if *hgvsChunked {
                for seqname := range refseq {
                        var f *os.File
@@ -263,8 +264,20 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        bufw := bufio.NewWriterSize(f, 1<<24)
                        enc := gob.NewEncoder(bufw)
                        tmpHGVSCols[seqname] = f
-                       bufHGVSCols[seqname] = bufw
-                       encodeHGVSCols[seqname] = enc
+                       todo := make(chan hgvsColSet, 128)
+                       encodeHGVSTodo[seqname] = todo
+                       encodeHGVS.Go(func() error {
+                               for colset := range todo {
+                                       err := enc.Encode(colset)
+                                       if err != nil {
+                                               encodeHGVS.Report(err)
+                                               for range todo {
+                                               }
+                                               return err
+                                       }
+                               }
+                               return bufw.Flush()
+                       })
                }
        }
 
@@ -457,7 +470,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                                        // =ref or a different variant in that
                                        // position, or (-1) is lacking
                                        // coverage / couldn't be diffed.
-                                       hgvsCol := map[hgvs.Variant][2][]int8{}
+                                       hgvsCol := hgvsColSet{}
                                        for _, diffs := range variantDiffs {
                                                for _, diff := range diffs {
                                                        if _, ok := hgvsCol[diff]; ok {
@@ -492,7 +505,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                                                        }
                                                }
                                        }
-                                       encodeHGVSCols[rt.seqname].Encode(hgvsCol)
+                                       encodeHGVSTodo[rt.seqname] <- hgvsCol
                                }
                                outcol++
                        }
@@ -554,11 +567,11 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
        if *hgvsChunked {
                log.Info("flushing hgvsCols temp files")
                for seqname := range refseq {
-                       err = bufHGVSCols[seqname].Flush()
-                       if err != nil {
-                               return 1
-                       }
-                       bufHGVSCols[seqname] = nil // free buffer memory
+                       close(encodeHGVSTodo[seqname])
+               }
+               err = encodeHGVS.Wait()
+               if err != nil {
+                       return 1
                }
                for seqname := range refseq {
                        log.Infof("%s: reading hgvsCols from temp file", seqname)
@@ -567,7 +580,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        if err != nil {
                                return 1
                        }
-                       var hgvsCols map[hgvs.Variant][2][]int8
+                       var hgvsCols hgvsColSet
                        dec := gob.NewDecoder(bufio.NewReaderSize(f, 1<<24))
                        for err == nil {
                                err = dec.Decode(&hgvsCols)