From 84b052c9e1b6e0c9a3b9ac6611ee9de9f73c43d4 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Wed, 24 Nov 2021 15:19:19 -0500 Subject: [PATCH] Option to merge matrices and annotations. refs #18438 Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- slice_test.go | 46 ++++++++++++++++++- slicenumpy.go | 125 ++++++++++++++++++++++++++++++++++++++++---------- 2 files changed, 144 insertions(+), 27 deletions(-) diff --git a/slice_test.go b/slice_test.go index 7332100634..15dc220a74 100644 --- a/slice_test.go +++ b/slice_test.go @@ -160,9 +160,51 @@ func (s *sliceSuite) TestImportAndSlice(c *check.C) { c.Check(string(annotations), check.Matches, "(?ms).*"+s+".*") } - annotations, err = ioutil.ReadFile(npydir + "/matrix.0002.annotations.csv") + for _, fnm := range []string{ + npydir + "/matrix.0001.annotations.csv", + npydir + "/matrix.0002.annotations.csv", + } { + annotations, err := ioutil.ReadFile(fnm) + c.Assert(err, check.IsNil) + c.Check(string(annotations), check.Equals, "", check.Commentf(fnm)) + } + } + + err = ioutil.WriteFile(tmpdir+"/chr1and2-100-200.bed", []byte("chr1\t100\t200\ttest.1\nchr2\t100\t200\ttest.2\n"), 0644) + c.Check(err, check.IsNil) + + c.Log("=== slice-numpy + regions + merge ===") + { + npydir := c.MkDir() + exited := (&sliceNumpy{}).RunCommand("slice-numpy", []string{ + "-local=true", + "-regions=" + tmpdir + "/chr1and2-100-200.bed", + "-input-dir=" + slicedir, + "-output-dir=" + npydir, + "-merge-output=true", + }, nil, os.Stderr, os.Stderr) + c.Check(exited, check.Equals, 0) + out, _ := exec.Command("find", npydir, "-ls").CombinedOutput() + c.Logf("%s", out) + + f, err := os.Open(npydir + "/matrix.npy") + c.Assert(err, check.IsNil) + defer f.Close() + npy, err := gonpy.NewReader(f) + c.Assert(err, check.IsNil) + c.Check(npy.Shape, check.DeepEquals, []int{4, 4}) + variants, err := npy.GetInt16() + c.Check(variants, check.DeepEquals, []int16{2, 1, 3, 1, -1, -1, 4, 2, 2, 1, 3, 1, -1, -1, 4, 2}) + + annotations, err := ioutil.ReadFile(npydir + "/matrix.annotations.csv") c.Assert(err, check.IsNil) c.Logf("%s", annotations) - c.Check(string(annotations), check.Equals, "") + for _, s := range []string{ + "0,0,1,chr1:g.161A>T", + "0,0,1,chr1:g.178A>T", + "4,1,2,chr2:g.125_127delinsAAA", + } { + c.Check(string(annotations), check.Matches, "(?ms).*"+s+".*") + } } } diff --git a/slicenumpy.go b/slicenumpy.go index 8f3a861db5..fe1a77497e 100644 --- a/slicenumpy.go +++ b/slicenumpy.go @@ -16,6 +16,7 @@ import ( "regexp" "runtime" "sort" + "strconv" "strings" "sync/atomic" @@ -49,6 +50,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s ref := flags.String("ref", "", "reference name (if blank, choose last one that appears in input)") regionsFilename := flags.String("regions", "", "only output columns/annotations that intersect regions in specified bed `file`") expandRegions := flags.Int("expand-regions", 0, "expand specified regions by `N` base pairs on each side`") + mergeOutput := flags.Bool("merge-output", false, "merge output into one matrix.npy and one matrix.annotations.csv") flags.IntVar(&cmd.threads, "threads", 16, "number of memory-hungry assembly threads") cmd.filter.Flags(flags) err = flags.Parse(args) @@ -87,6 +89,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s "-threads=" + fmt.Sprintf("%d", cmd.threads), "-regions=" + *regionsFilename, "-expand-regions=" + fmt.Sprintf("%d", *expandRegions), + "-merge-output=" + fmt.Sprintf("%v", *mergeOutput), } runner.Args = append(runner.Args, cmd.filter.Args()...) var output string @@ -237,6 +240,11 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s } } + var toMerge [][]int16 + if *mergeOutput { + toMerge = make([][]int16, len(infiles)) + } + throttleMem := throttle{Max: cmd.threads} // TODO: estimate using mem and data size throttleNumpyMem := throttle{Max: cmd.threads/2 + 1} log.Info("generating annotations and numpy matrix for each slice") @@ -425,31 +433,15 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s seq = nil throttleNumpyMem.Release() - fnm := fmt.Sprintf("%s/matrix.%04d.npy", *outputDir, infileIdx) - output, err := os.Create(fnm) - if err != nil { - return err - } - defer output.Close() - bufw := bufio.NewWriterSize(output, 1<<26) - npw, err := gonpy.NewWriter(nopCloser{bufw}) - if err != nil { - return err - } - log.WithFields(log.Fields{ - "filename": fnm, - "rows": rows, - "cols": cols, - }).Infof("%04d: writing numpy", infileIdx) - npw.Shape = []int{rows, cols} - npw.WriteInt16(out) - err = bufw.Flush() - if err != nil { - return err - } - err = output.Close() - if err != nil { - return err + if *mergeOutput { + log.Infof("%04d: matrix fragment %d rows x %d cols", infileIdx, rows, cols) + toMerge[infileIdx] = out + } else { + fnm := fmt.Sprintf("%s/matrix.%04d.npy", *outputDir, infileIdx) + err = writeNumpyInt16(fnm, out, rows, cols) + if err != nil { + return err + } } log.Infof("%s: done (%d/%d)", infile, int(atomic.AddInt64(&done, 1)), len(infiles)) return nil @@ -458,5 +450,88 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s if err = throttleMem.Wait(); err != nil { return 1 } + if *mergeOutput { + log.Info("merging output matrix and annotations") + + annoFilename := fmt.Sprintf("%s/matrix.annotations.csv", *outputDir) + annof, err := os.Create(annoFilename) + if err != nil { + return 1 + } + annow := bufio.NewWriterSize(annof, 1<<20) + + rows := len(cgnames) + cols := 0 + for _, chunk := range toMerge { + cols += len(chunk) / rows + } + out := make([]int16, rows*cols) + startcol := 0 + for outIdx, chunk := range toMerge { + chunkcols := len(chunk) / rows + for row := 0; row < rows; row++ { + copy(out[row*cols+startcol:], chunk[row*chunkcols:(row+1)*chunkcols]) + } + toMerge[outIdx] = nil + + annotationsFilename := fmt.Sprintf("%s/matrix.%04d.annotations.csv", *outputDir, outIdx) + log.Infof("reading %s", annotationsFilename) + buf, err := os.ReadFile(annotationsFilename) + if err != nil { + return 1 + } + err = os.Remove(annotationsFilename) + if err != nil { + return 1 + } + for _, line := range bytes.Split(buf, []byte{'\n'}) { + if len(line) == 0 { + continue + } + fields := bytes.SplitN(line, []byte{','}, 3) + incol, _ := strconv.Atoi(string(fields[1])) + fmt.Fprintf(annow, "%s,%d,%s\n", fields[0], incol+startcol/2, fields[2]) + } + + startcol += chunkcols + } + err = annow.Flush() + if err != nil { + return 1 + } + err = annof.Close() + if err != nil { + return 1 + } + err = writeNumpyInt16(fmt.Sprintf("%s/matrix.npy", *outputDir), out, rows, cols) + if err != nil { + return 1 + } + } return 0 } + +func writeNumpyInt16(fnm string, out []int16, rows, cols int) error { + output, err := os.Create(fnm) + if err != nil { + return err + } + defer output.Close() + bufw := bufio.NewWriterSize(output, 1<<26) + npw, err := gonpy.NewWriter(nopCloser{bufw}) + if err != nil { + return err + } + log.WithFields(log.Fields{ + "filename": fnm, + "rows": rows, + "cols": cols, + }).Infof("writing numpy: %s", fnm) + npw.Shape = []int{rows, cols} + npw.WriteInt16(out) + err = bufw.Flush() + if err != nil { + return err + } + return output.Close() +} -- 2.30.2