Option to merge matrices and annotations.
authorTom Clegg <tom@curii.com>
Wed, 24 Nov 2021 20:19:19 +0000 (15:19 -0500)
committerTom Clegg <tom@curii.com>
Wed, 24 Nov 2021 20:19:19 +0000 (15:19 -0500)
refs #18438

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

slice_test.go
slicenumpy.go

index 73321006342303c94671b5598a544fefbe519a14..15dc220a741f585e240fd48c19916082076a989f 100644 (file)
@@ -160,9 +160,51 @@ func (s *sliceSuite) TestImportAndSlice(c *check.C) {
                        c.Check(string(annotations), check.Matches, "(?ms).*"+s+".*")
                }
 
-               annotations, err = ioutil.ReadFile(npydir + "/matrix.0002.annotations.csv")
+               for _, fnm := range []string{
+                       npydir + "/matrix.0001.annotations.csv",
+                       npydir + "/matrix.0002.annotations.csv",
+               } {
+                       annotations, err := ioutil.ReadFile(fnm)
+                       c.Assert(err, check.IsNil)
+                       c.Check(string(annotations), check.Equals, "", check.Commentf(fnm))
+               }
+       }
+
+       err = ioutil.WriteFile(tmpdir+"/chr1and2-100-200.bed", []byte("chr1\t100\t200\ttest.1\nchr2\t100\t200\ttest.2\n"), 0644)
+       c.Check(err, check.IsNil)
+
+       c.Log("=== slice-numpy + regions + merge ===")
+       {
+               npydir := c.MkDir()
+               exited := (&sliceNumpy{}).RunCommand("slice-numpy", []string{
+                       "-local=true",
+                       "-regions=" + tmpdir + "/chr1and2-100-200.bed",
+                       "-input-dir=" + slicedir,
+                       "-output-dir=" + npydir,
+                       "-merge-output=true",
+               }, nil, os.Stderr, os.Stderr)
+               c.Check(exited, check.Equals, 0)
+               out, _ := exec.Command("find", npydir, "-ls").CombinedOutput()
+               c.Logf("%s", out)
+
+               f, err := os.Open(npydir + "/matrix.npy")
+               c.Assert(err, check.IsNil)
+               defer f.Close()
+               npy, err := gonpy.NewReader(f)
+               c.Assert(err, check.IsNil)
+               c.Check(npy.Shape, check.DeepEquals, []int{4, 4})
+               variants, err := npy.GetInt16()
+               c.Check(variants, check.DeepEquals, []int16{2, 1, 3, 1, -1, -1, 4, 2, 2, 1, 3, 1, -1, -1, 4, 2})
+
+               annotations, err := ioutil.ReadFile(npydir + "/matrix.annotations.csv")
                c.Assert(err, check.IsNil)
                c.Logf("%s", annotations)
-               c.Check(string(annotations), check.Equals, "")
+               for _, s := range []string{
+                       "0,0,1,chr1:g.161A>T",
+                       "0,0,1,chr1:g.178A>T",
+                       "4,1,2,chr2:g.125_127delinsAAA",
+               } {
+                       c.Check(string(annotations), check.Matches, "(?ms).*"+s+".*")
+               }
        }
 }
index 8f3a861db56cd2e4f6909f1ad8353df2e496823a..fe1a77497ee4e3325fa294fd7d0cf62d23605e74 100644 (file)
@@ -16,6 +16,7 @@ import (
        "regexp"
        "runtime"
        "sort"
+       "strconv"
        "strings"
        "sync/atomic"
 
@@ -49,6 +50,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
        ref := flags.String("ref", "", "reference name (if blank, choose last one that appears in input)")
        regionsFilename := flags.String("regions", "", "only output columns/annotations that intersect regions in specified bed `file`")
        expandRegions := flags.Int("expand-regions", 0, "expand specified regions by `N` base pairs on each side`")
+       mergeOutput := flags.Bool("merge-output", false, "merge output into one matrix.npy and one matrix.annotations.csv")
        flags.IntVar(&cmd.threads, "threads", 16, "number of memory-hungry assembly threads")
        cmd.filter.Flags(flags)
        err = flags.Parse(args)
@@ -87,6 +89,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        "-threads=" + fmt.Sprintf("%d", cmd.threads),
                        "-regions=" + *regionsFilename,
                        "-expand-regions=" + fmt.Sprintf("%d", *expandRegions),
+                       "-merge-output=" + fmt.Sprintf("%v", *mergeOutput),
                }
                runner.Args = append(runner.Args, cmd.filter.Args()...)
                var output string
@@ -237,6 +240,11 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                }
        }
 
+       var toMerge [][]int16
+       if *mergeOutput {
+               toMerge = make([][]int16, len(infiles))
+       }
+
        throttleMem := throttle{Max: cmd.threads} // TODO: estimate using mem and data size
        throttleNumpyMem := throttle{Max: cmd.threads/2 + 1}
        log.Info("generating annotations and numpy matrix for each slice")
@@ -425,31 +433,15 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        seq = nil
                        throttleNumpyMem.Release()
 
-                       fnm := fmt.Sprintf("%s/matrix.%04d.npy", *outputDir, infileIdx)
-                       output, err := os.Create(fnm)
-                       if err != nil {
-                               return err
-                       }
-                       defer output.Close()
-                       bufw := bufio.NewWriterSize(output, 1<<26)
-                       npw, err := gonpy.NewWriter(nopCloser{bufw})
-                       if err != nil {
-                               return err
-                       }
-                       log.WithFields(log.Fields{
-                               "filename": fnm,
-                               "rows":     rows,
-                               "cols":     cols,
-                       }).Infof("%04d: writing numpy", infileIdx)
-                       npw.Shape = []int{rows, cols}
-                       npw.WriteInt16(out)
-                       err = bufw.Flush()
-                       if err != nil {
-                               return err
-                       }
-                       err = output.Close()
-                       if err != nil {
-                               return err
+                       if *mergeOutput {
+                               log.Infof("%04d: matrix fragment %d rows x %d cols", infileIdx, rows, cols)
+                               toMerge[infileIdx] = out
+                       } else {
+                               fnm := fmt.Sprintf("%s/matrix.%04d.npy", *outputDir, infileIdx)
+                               err = writeNumpyInt16(fnm, out, rows, cols)
+                               if err != nil {
+                                       return err
+                               }
                        }
                        log.Infof("%s: done (%d/%d)", infile, int(atomic.AddInt64(&done, 1)), len(infiles))
                        return nil
@@ -458,5 +450,88 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
        if err = throttleMem.Wait(); err != nil {
                return 1
        }
+       if *mergeOutput {
+               log.Info("merging output matrix and annotations")
+
+               annoFilename := fmt.Sprintf("%s/matrix.annotations.csv", *outputDir)
+               annof, err := os.Create(annoFilename)
+               if err != nil {
+                       return 1
+               }
+               annow := bufio.NewWriterSize(annof, 1<<20)
+
+               rows := len(cgnames)
+               cols := 0
+               for _, chunk := range toMerge {
+                       cols += len(chunk) / rows
+               }
+               out := make([]int16, rows*cols)
+               startcol := 0
+               for outIdx, chunk := range toMerge {
+                       chunkcols := len(chunk) / rows
+                       for row := 0; row < rows; row++ {
+                               copy(out[row*cols+startcol:], chunk[row*chunkcols:(row+1)*chunkcols])
+                       }
+                       toMerge[outIdx] = nil
+
+                       annotationsFilename := fmt.Sprintf("%s/matrix.%04d.annotations.csv", *outputDir, outIdx)
+                       log.Infof("reading %s", annotationsFilename)
+                       buf, err := os.ReadFile(annotationsFilename)
+                       if err != nil {
+                               return 1
+                       }
+                       err = os.Remove(annotationsFilename)
+                       if err != nil {
+                               return 1
+                       }
+                       for _, line := range bytes.Split(buf, []byte{'\n'}) {
+                               if len(line) == 0 {
+                                       continue
+                               }
+                               fields := bytes.SplitN(line, []byte{','}, 3)
+                               incol, _ := strconv.Atoi(string(fields[1]))
+                               fmt.Fprintf(annow, "%s,%d,%s\n", fields[0], incol+startcol/2, fields[2])
+                       }
+
+                       startcol += chunkcols
+               }
+               err = annow.Flush()
+               if err != nil {
+                       return 1
+               }
+               err = annof.Close()
+               if err != nil {
+                       return 1
+               }
+               err = writeNumpyInt16(fmt.Sprintf("%s/matrix.npy", *outputDir), out, rows, cols)
+               if err != nil {
+                       return 1
+               }
+       }
        return 0
 }
+
+func writeNumpyInt16(fnm string, out []int16, rows, cols int) error {
+       output, err := os.Create(fnm)
+       if err != nil {
+               return err
+       }
+       defer output.Close()
+       bufw := bufio.NewWriterSize(output, 1<<26)
+       npw, err := gonpy.NewWriter(nopCloser{bufw})
+       if err != nil {
+               return err
+       }
+       log.WithFields(log.Fields{
+               "filename": fnm,
+               "rows":     rows,
+               "cols":     cols,
+       }).Infof("writing numpy: %s", fnm)
+       npw.Shape = []int{rows, cols}
+       npw.WriteInt16(out)
+       err = bufw.Flush()
+       if err != nil {
+               return err
+       }
+       return output.Close()
+}