refs #18438
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>
"-regions=" + tmpdir + "/chr1-12-100.bed",
"-input-dir=" + slicedir,
"-output-dir=" + npydir,
"-regions=" + tmpdir + "/chr1-12-100.bed",
"-input-dir=" + slicedir,
"-output-dir=" + npydir,
+ "-chunked-hgvs-matrix=true",
}, nil, os.Stderr, os.Stderr)
c.Check(exited, check.Equals, 0)
out, _ := exec.Command("find", npydir, "-ls").CombinedOutput()
}, nil, os.Stderr, os.Stderr)
c.Check(exited, check.Equals, 0)
out, _ := exec.Command("find", npydir, "-ls").CombinedOutput()
"-input-dir=" + slicedir,
"-output-dir=" + npydir,
"-merge-output=true",
"-input-dir=" + slicedir,
"-output-dir=" + npydir,
"-merge-output=true",
+ "-single-hgvs-matrix=true",
}, nil, os.Stderr, os.Stderr)
c.Check(exited, check.Equals, 0)
out, _ := exec.Command("find", npydir, "-ls").CombinedOutput()
}, nil, os.Stderr, os.Stderr)
c.Check(exited, check.Equals, 0)
out, _ := exec.Command("find", npydir, "-ls").CombinedOutput()
log.Printf("after applying mask, len(reftile) == %d", len(reftile))
}
log.Printf("after applying mask, len(reftile) == %d", len(reftile))
}
+ type hgvsColSet map[hgvs.Variant][2][]int8
+ encodeHGVS := throttle{Max: len(refseq)}
+ encodeHGVSTodo := map[string]chan hgvsColSet{}
tmpHGVSCols := map[string]*os.File{}
tmpHGVSCols := map[string]*os.File{}
- bufHGVSCols := map[string]*bufio.Writer{}
- encodeHGVSCols := map[string]*gob.Encoder{}
if *hgvsChunked {
for seqname := range refseq {
var f *os.File
if *hgvsChunked {
for seqname := range refseq {
var f *os.File
bufw := bufio.NewWriterSize(f, 1<<24)
enc := gob.NewEncoder(bufw)
tmpHGVSCols[seqname] = f
bufw := bufio.NewWriterSize(f, 1<<24)
enc := gob.NewEncoder(bufw)
tmpHGVSCols[seqname] = f
- bufHGVSCols[seqname] = bufw
- encodeHGVSCols[seqname] = enc
+ todo := make(chan hgvsColSet, 128)
+ encodeHGVSTodo[seqname] = todo
+ encodeHGVS.Go(func() error {
+ for colset := range todo {
+ err := enc.Encode(colset)
+ if err != nil {
+ encodeHGVS.Report(err)
+ for range todo {
+ }
+ return err
+ }
+ }
+ return bufw.Flush()
+ })
// =ref or a different variant in that
// position, or (-1) is lacking
// coverage / couldn't be diffed.
// =ref or a different variant in that
// position, or (-1) is lacking
// coverage / couldn't be diffed.
- hgvsCol := map[hgvs.Variant][2][]int8{}
+ hgvsCol := hgvsColSet{}
for _, diffs := range variantDiffs {
for _, diff := range diffs {
if _, ok := hgvsCol[diff]; ok {
for _, diffs := range variantDiffs {
for _, diff := range diffs {
if _, ok := hgvsCol[diff]; ok {
- encodeHGVSCols[rt.seqname].Encode(hgvsCol)
+ encodeHGVSTodo[rt.seqname] <- hgvsCol
if *hgvsChunked {
log.Info("flushing hgvsCols temp files")
for seqname := range refseq {
if *hgvsChunked {
log.Info("flushing hgvsCols temp files")
for seqname := range refseq {
- err = bufHGVSCols[seqname].Flush()
- if err != nil {
- return 1
- }
- bufHGVSCols[seqname] = nil // free buffer memory
+ close(encodeHGVSTodo[seqname])
+ }
+ err = encodeHGVS.Wait()
+ if err != nil {
+ return 1
}
for seqname := range refseq {
log.Infof("%s: reading hgvsCols from temp file", seqname)
}
for seqname := range refseq {
log.Infof("%s: reading hgvsCols from temp file", seqname)
if err != nil {
return 1
}
if err != nil {
return 1
}
- var hgvsCols map[hgvs.Variant][2][]int8
+ var hgvsCols hgvsColSet
dec := gob.NewDecoder(bufio.NewReaderSize(f, 1<<24))
for err == nil {
err = dec.Decode(&hgvsCols)
dec := gob.NewDecoder(bufio.NewReaderSize(f, 1<<24))
for err == nil {
err = dec.Decode(&hgvsCols)