From: Tom Clegg Date: Mon, 22 Nov 2021 15:20:52 +0000 (-0500) Subject: Implement -regions and -expand-regions for slice-numpy. X-Git-Url: https://git.arvados.org/lightning.git/commitdiff_plain/d81fc575f556e71b5c552d03c626b43c0744b45f Implement -regions and -expand-regions for slice-numpy. refs #18438 Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- diff --git a/exportnumpy.go b/exportnumpy.go index 39f228a1b0..c5a2b7af4e 100644 --- a/exportnumpy.go +++ b/exportnumpy.go @@ -404,22 +404,20 @@ func cgs2array(tilelib *tileLibrary, names []string, lowqual []map[tileVariantID return } -func chooseTiles(tilelib *tileLibrary, regionsFilename string, expandRegions int) (drop []bool, err error) { - if regionsFilename == "" { - return - } +func makeMask(regionsFilename string, expandRegions int) (*mask, error) { + log.Printf("makeMask: reading %s", regionsFilename) rfile, err := zopen(regionsFilename) if err != nil { - return + return nil, err } defer rfile.Close() regions, err := ioutil.ReadAll(rfile) if err != nil { - return + return nil, err } - log.Print("chooseTiles: building mask") - mask := &mask{} + log.Print("makeMask: building mask") + var mask mask for _, line := range bytes.Split(regions, []byte{'\n'}) { if bytes.HasPrefix(line, []byte{'#'}) { continue @@ -443,14 +441,24 @@ func chooseTiles(tilelib *tileLibrary, regionsFilename string, expandRegions int // GFF/GTF end++ } else { - err = fmt.Errorf("cannot parse input line as BED or GFF/GTF: %q", line) - return + return nil, fmt.Errorf("cannot parse input line as BED or GFF/GTF: %q", line) } } mask.Add(refseqname, start-expandRegions, end+expandRegions) } - log.Print("chooseTiles: mask.Freeze") + log.Print("makeMask: mask.Freeze") mask.Freeze() + return &mask, nil +} + +func chooseTiles(tilelib *tileLibrary, regionsFilename string, expandRegions int) (drop []bool, err error) { + if regionsFilename == "" { + return + } + mask, err := makeMask(regionsFilename, expandRegions) + if err != nil { + return + } tagset := tilelib.taglib.Tags() if len(tagset) == 0 { diff --git a/slice.go b/slice.go index fed3766dfe..d2073ad9c0 100644 --- a/slice.go +++ b/slice.go @@ -66,8 +66,8 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std Name: "lightning slice", Client: arvados.NewClientFromEnv(), ProjectUUID: *projectUUID, - RAM: 300000000000, - VCPUs: 32, + RAM: 500000000000, + VCPUs: 64, Priority: *priority, KeepCache: 2, APIAccess: true, diff --git a/slice_test.go b/slice_test.go index d84d95f30f..7332100634 100644 --- a/slice_test.go +++ b/slice_test.go @@ -82,45 +82,87 @@ func (s *sliceSuite) TestImportAndSlice(c *check.C) { c.Logf("%s", out) c.Log("=== slice-numpy ===") - npydir := c.MkDir() - exited = (&sliceNumpy{}).RunCommand("slice-numpy", []string{ - "-local=true", - "-input-dir=" + slicedir, - "-output-dir=" + npydir, - }, nil, os.Stderr, os.Stderr) - c.Check(exited, check.Equals, 0) - out, _ = exec.Command("find", npydir, "-ls").CombinedOutput() - c.Logf("%s", out) - - f, err := os.Open(npydir + "/matrix.0000.npy") - c.Assert(err, check.IsNil) - defer f.Close() - npy, err := gonpy.NewReader(f) - c.Assert(err, check.IsNil) - c.Check(npy.Shape, check.DeepEquals, []int{4, 4}) - variants, err := npy.GetInt16() - c.Check(variants, check.DeepEquals, []int16{2, 1, 1, 2, -1, -1, 1, 1, 2, 1, 1, 2, -1, -1, 1, 1}) - - annotations, err := ioutil.ReadFile(npydir + "/matrix.0000.annotations.csv") - c.Assert(err, check.IsNil) - c.Logf("%s", annotations) - for _, s := range []string{ - "chr1:g.161A>T", - "chr1:g.178A>T", - "chr1:g.1_3delinsGGC", - "chr1:g.222_224del", - } { - c.Check(string(annotations), check.Matches, "(?ms).*"+s+".*") + { + npydir := c.MkDir() + exited := (&sliceNumpy{}).RunCommand("slice-numpy", []string{ + "-local=true", + "-input-dir=" + slicedir, + "-output-dir=" + npydir, + }, nil, os.Stderr, os.Stderr) + c.Check(exited, check.Equals, 0) + out, _ := exec.Command("find", npydir, "-ls").CombinedOutput() + c.Logf("%s", out) + + f, err := os.Open(npydir + "/matrix.0000.npy") + c.Assert(err, check.IsNil) + defer f.Close() + npy, err := gonpy.NewReader(f) + c.Assert(err, check.IsNil) + c.Check(npy.Shape, check.DeepEquals, []int{4, 4}) + variants, err := npy.GetInt16() + c.Check(variants, check.DeepEquals, []int16{2, 1, 1, 2, -1, -1, 1, 1, 2, 1, 1, 2, -1, -1, 1, 1}) + + annotations, err := ioutil.ReadFile(npydir + "/matrix.0000.annotations.csv") + c.Assert(err, check.IsNil) + c.Logf("%s", annotations) + for _, s := range []string{ + "chr1:g.161A>T", + "chr1:g.178A>T", + "chr1:g.1_3delinsGGC", + "chr1:g.222_224del", + } { + c.Check(string(annotations), check.Matches, "(?ms).*"+s+".*") + } + + annotations, err = ioutil.ReadFile(npydir + "/matrix.0002.annotations.csv") + c.Assert(err, check.IsNil) + c.Logf("%s", annotations) + for _, s := range []string{ + ",2,chr2:g.1_3delinsAAA", + ",2,chr2:g.125_127delinsAAA", + ",4,chr2:g.125_127delinsAAA", + } { + c.Check(string(annotations), check.Matches, "(?ms).*"+s+".*") + } } - annotations, err = ioutil.ReadFile(npydir + "/matrix.0002.annotations.csv") - c.Assert(err, check.IsNil) - c.Logf("%s", annotations) - for _, s := range []string{ - ",2,chr2:g.1_3delinsAAA", - ",2,chr2:g.125_127delinsAAA", - ",4,chr2:g.125_127delinsAAA", - } { - c.Check(string(annotations), check.Matches, "(?ms).*"+s+".*") + c.Log("=== slice-numpy + regions ===") + { + npydir := c.MkDir() + exited := (&sliceNumpy{}).RunCommand("slice-numpy", []string{ + "-local=true", + "-regions=" + tmpdir + "/chr1-12-100.bed", + "-input-dir=" + slicedir, + "-output-dir=" + npydir, + }, nil, os.Stderr, os.Stderr) + c.Check(exited, check.Equals, 0) + out, _ := exec.Command("find", npydir, "-ls").CombinedOutput() + c.Logf("%s", out) + + f, err := os.Open(npydir + "/matrix.0000.npy") + c.Assert(err, check.IsNil) + defer f.Close() + npy, err := gonpy.NewReader(f) + c.Assert(err, check.IsNil) + c.Check(npy.Shape, check.DeepEquals, []int{4, 2}) + variants, err := npy.GetInt16() + c.Check(variants, check.DeepEquals, []int16{2, 1, -1, -1, 2, 1, -1, -1}) + + annotations, err := ioutil.ReadFile(npydir + "/matrix.0000.annotations.csv") + c.Assert(err, check.IsNil) + c.Logf("%s", annotations) + for _, s := range []string{ + "chr1:g.161A>T", + "chr1:g.178A>T", + "chr1:g.1_3delinsGGC", + "chr1:g.222_224del", + } { + c.Check(string(annotations), check.Matches, "(?ms).*"+s+".*") + } + + annotations, err = ioutil.ReadFile(npydir + "/matrix.0002.annotations.csv") + c.Assert(err, check.IsNil) + c.Logf("%s", annotations) + c.Check(string(annotations), check.Equals, "") } } diff --git a/slicenumpy.go b/slicenumpy.go index 4d3bd2745d..8f3a861db5 100644 --- a/slicenumpy.go +++ b/slicenumpy.go @@ -81,12 +81,12 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s return 1 } runner.Args = []string{"slice-numpy", "-local=true", - "-pprof", ":6060", - "-input-dir", *inputDir, - "-output-dir", "/mnt/output", - "-threads", fmt.Sprintf("%d", cmd.threads), - "-regions", *regionsFilename, - "-expand-regions", fmt.Sprintf("%d", *expandRegions), + "-pprof=:6060", + "-input-dir=" + *inputDir, + "-output-dir=/mnt/output", + "-threads=" + fmt.Sprintf("%d", cmd.threads), + "-regions=" + *regionsFilename, + "-expand-regions=" + fmt.Sprintf("%d", *expandRegions), } runner.Args = append(runner.Args, cmd.filter.Args()...) var output string @@ -223,7 +223,19 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s } throttleCPU.Wait() - log.Info("TODO: determining which tiles intersect given regions") + var mask *mask + if *regionsFilename != "" { + mask, err = makeMask(*regionsFilename, *expandRegions) + if err != nil { + return 1 + } + // Delete reftile entries for masked-out regions. + for tag, rt := range reftile { + if !mask.Check(strings.TrimPrefix(rt.seqname, "chr"), rt.pos, rt.pos+len(rt.tiledata)) { + delete(reftile, tag) + } + } + } throttleMem := throttle{Max: cmd.threads} // TODO: estimate using mem and data size throttleNumpyMem := throttle{Max: cmd.threads/2 + 1} @@ -245,6 +257,12 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s if tv.Ref { continue } + if mask != nil && reftile[tv.Tag] == nil { + // Don't waste + // time/memory on + // masked-out tiles. + continue + } variants := seq[tv.Tag] if len(variants) == 0 { variants = make([]TileVariant, 100) @@ -331,16 +349,25 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s return err } annow := bufio.NewWriterSize(annof, 1<<20) - for tag, variants := range seq { + outcol := 0 + for tag := tagstart; tag < tagend; tag++ { rt, ok := reftile[tag] if !ok { - // Reference does not use any - // variant of this tile. - // TODO: log this? mention it - // in annotations? + if mask == nil { + outcol++ + } + // Excluded by specified + // regions, or reference does + // not use any variant of this + // tile. (TODO: log this? + // mention it in annotations?) + continue + } + variants, ok := seq[tag] + if !ok { + outcol++ continue } - outcol := tag - tagID(tagstart) reftilestr := strings.ToUpper(string(rt.tiledata)) remap := variantRemap[tag-tagstart] done := make([]bool, len(variants)) @@ -363,6 +390,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s fmt.Fprintf(annow, "%d,%d,%d,%s:g.%s,%s,%d,%s,%s,%s\n", tag, outcol, v, rt.seqname, diff.String(), rt.seqname, diff.Position, diff.Ref, diff.New, diff.Left) } } + outcol++ } err = annow.Flush() if err != nil { @@ -373,19 +401,25 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s return err } - throttleNumpyMem.Acquire() log.Infof("%04d: preparing numpy", infileIdx) + throttleNumpyMem.Acquire() rows := len(cgnames) - cols := 2 * int(tagend-tagstart) + cols := 2 * outcol out := make([]int16, rows*cols) for row, name := range cgnames { out := out[row*cols:] + outcol := 0 for col, v := range cgs[name].Variants { - if variants, ok := seq[tagstart+tagID(col/2)]; ok && len(variants) > int(v) && len(variants[v].Sequence) > 0 { - out[col] = int16(variantRemap[col/2][v]) + tag := tagstart + tagID(col/2) + if mask != nil && reftile[tag] == nil { + continue + } + if variants, ok := seq[tag]; ok && len(variants) > int(v) && len(variants[v].Sequence) > 0 { + out[outcol] = int16(variantRemap[tag-tagstart][v]) } else { - out[col] = -1 + out[outcol] = -1 } + outcol++ } } seq = nil