From e59d79fdd85e5e5531e442ddbe2b05d0586396ea Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Mon, 27 Dec 2021 13:01:24 -0500 Subject: [PATCH] Separate options to output single/chunked hgvs matrices. refs #18438 Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- slicenumpy.go | 155 ++++++++++++++++++++++++++++++++------------------ 1 file changed, 101 insertions(+), 54 deletions(-) diff --git a/slicenumpy.go b/slicenumpy.go index 2cb67960b6..193d3ef4d0 100644 --- a/slicenumpy.go +++ b/slicenumpy.go @@ -52,6 +52,8 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s regionsFilename := flags.String("regions", "", "only output columns/annotations that intersect regions in specified bed `file`") expandRegions := flags.Int("expand-regions", 0, "expand specified regions by `N` base pairs on each side`") mergeOutput := flags.Bool("merge-output", false, "merge output into one matrix.npy and one matrix.annotations.csv") + hgvsSingle := flags.Bool("single-hgvs-matrix", false, "also generate hgvs-based matrix") + hgvsChunked := flags.Bool("chunked-hgvs-matrix", false, "also generate hgvs-based matrix per chromosome") flags.IntVar(&cmd.threads, "threads", 16, "number of memory-hungry assembly threads") cmd.filter.Flags(flags) err = flags.Parse(args) @@ -91,6 +93,8 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s "-regions=" + *regionsFilename, "-expand-regions=" + fmt.Sprintf("%d", *expandRegions), "-merge-output=" + fmt.Sprintf("%v", *mergeOutput), + "-single-hgvs-matrix=" + fmt.Sprintf("%v", *hgvsSingle), + "-chunked-hgvs-matrix=" + fmt.Sprintf("%v", *hgvsChunked), } runner.Args = append(runner.Args, cmd.filter.Args()...) var output string @@ -244,7 +248,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s } var toMerge [][]int16 - if *mergeOutput { + if *mergeOutput || *hgvsSingle || *hgvsChunked { toMerge = make([][]int16, len(infiles)) } @@ -444,10 +448,11 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s seq = nil throttleNumpyMem.Release() - if *mergeOutput { + if *mergeOutput || *hgvsSingle || *hgvsChunked { log.Infof("%04d: matrix fragment %d rows x %d cols", infileIdx, rows, cols) toMerge[infileIdx] = out - } else { + } + if !*mergeOutput { fnm := fmt.Sprintf("%s/matrix.%04d.npy", *outputDir, infileIdx) err = writeNumpyInt16(fnm, out, rows, cols) if err != nil { @@ -461,28 +466,36 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s if err = throttleMem.Wait(); err != nil { return 1 } - if *mergeOutput { - log.Info("merging output matrix and annotations") - - annoFilename := fmt.Sprintf("%s/matrix.annotations.csv", *outputDir) - annof, err := os.Create(annoFilename) - if err != nil { - return 1 + if *mergeOutput || *hgvsSingle || *hgvsChunked { + var annow *bufio.Writer + var annof *os.File + if *mergeOutput { + annoFilename := fmt.Sprintf("%s/matrix.annotations.csv", *outputDir) + annof, err = os.Create(annoFilename) + if err != nil { + return 1 + } + annow = bufio.NewWriterSize(annof, 1<<20) } - annow := bufio.NewWriterSize(annof, 1<<20) rows := len(cgnames) cols := 0 for _, chunk := range toMerge { cols += len(chunk) / rows } - out := make([]int16, rows*cols) + log.Infof("merging output matrix (rows=%d, cols=%d, mem=%d) and annotations", rows, cols, rows*cols*2) + var out []int16 + if *mergeOutput { + out = make([]int16, rows*cols) + } hgvsCols := map[string][2][]int16{} // hgvs -> [[g0,g1,g2,...], [g0,g1,g2,...]] (slice of genomes for each phase) startcol := 0 for outIdx, chunk := range toMerge { chunkcols := len(chunk) / rows - for row := 0; row < rows; row++ { - copy(out[row*cols+startcol:], chunk[row*chunkcols:(row+1)*chunkcols]) + if *mergeOutput { + for row := 0; row < rows; row++ { + copy(out[row*cols+startcol:], chunk[row*chunkcols:(row+1)*chunkcols]) + } } toMerge[outIdx] = nil @@ -492,9 +505,11 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s if err != nil { return 1 } - err = os.Remove(annotationsFilename) - if err != nil { - return 1 + if *mergeOutput { + err = os.Remove(annotationsFilename) + if err != nil { + return 1 + } } for _, line := range bytes.Split(buf, []byte{'\n'}) { if len(line) == 0 { @@ -554,9 +569,13 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s Ref: string(refseq), New: string(refseq), } - fmt.Fprintf(annow, "%d,%d,%d,%s:g.%s,%s,%d,%s,%s,%s\n", tag, incol+startcol/2, rt.variant, seqname, hgvsref.String(), seqname, pos, refseq, refseq, fields[8]) + if annow != nil { + fmt.Fprintf(annow, "%d,%d,%d,%s:g.%s,%s,%d,%s,%s,%s\n", tag, incol+startcol/2, rt.variant, seqname, hgvsref.String(), seqname, pos, refseq, refseq, fields[8]) + } + } + if annow != nil { + fmt.Fprintf(annow, "%d,%d,%d,%s,%s,%d,%s,%s,%s\n", tag, incol+startcol/2, tileVariant, hgvsID, seqname, pos, refseq, fields[7], fields[8]) } - fmt.Fprintf(annow, "%d,%d,%d,%s,%s,%d,%s,%s,%s\n", tag, incol+startcol/2, tileVariant, hgvsID, seqname, pos, refseq, fields[7], fields[8]) for ph := 0; ph < 2; ph++ { for row := 0; row < rows; row++ { v := chunk[row*chunkcols+incol*2+ph] @@ -569,48 +588,76 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s startcol += chunkcols } - err = annow.Flush() - if err != nil { - return 1 - } - err = annof.Close() - if err != nil { - return 1 - } - err = writeNumpyInt16(fmt.Sprintf("%s/matrix.npy", *outputDir), out, rows, cols) - if err != nil { - return 1 + if *mergeOutput { + err = annow.Flush() + if err != nil { + return 1 + } + err = annof.Close() + if err != nil { + return 1 + } + err = writeNumpyInt16(fmt.Sprintf("%s/matrix.npy", *outputDir), out, rows, cols) + if err != nil { + return 1 + } } out = nil - cols = len(hgvsCols) * 2 - log.Printf("building hgvs-based matrix: %d rows x %d cols", rows, cols) - out = make([]int16, rows*cols) - hgvsIDs := make([]string, 0, len(hgvsCols)) - for hgvsID := range hgvsCols { - hgvsIDs = append(hgvsIDs, hgvsID) + var seqnames []string + if *hgvsSingle { + seqnames = []string{""} } - sort.Strings(hgvsIDs) - var hgvsLabels bytes.Buffer - for idx, hgvsID := range hgvsIDs { - fmt.Fprintf(&hgvsLabels, "%d,%s\n", idx, hgvsID) - for ph := 0; ph < 2; ph++ { - hgvscol := hgvsCols[hgvsID][ph] - for row, val := range hgvscol { - out[row*cols+idx*2+ph] = val - } + if *hgvsChunked { + for seqname := range refseq { + seqnames = append(seqnames, seqname) } } - err = writeNumpyInt16(fmt.Sprintf("%s/hgvs.npy", *outputDir), out, rows, cols) - if err != nil { - return 1 - } + for _, seqname := range seqnames { + basename := "hgvs" + wantPrefix := "" + if seqname == "" { + cols = len(hgvsCols) * 2 + } else { + basename = "hgvs." + seqname + wantPrefix = seqname + ":" + cols = 0 + for hgvsID := range hgvsCols { + if strings.HasPrefix(hgvsID, wantPrefix) { + cols += 2 + } + } + } + log.Printf("building hgvs-based matrix [%s]: %d rows x %d cols", seqname, rows, cols) + out = make([]int16, rows*cols) + hgvsIDs := make([]string, 0, cols/2) + for hgvsID := range hgvsCols { + if strings.HasPrefix(hgvsID, wantPrefix) { + hgvsIDs = append(hgvsIDs, hgvsID) + } + } + sort.Strings(hgvsIDs) + var hgvsLabels bytes.Buffer + for idx, hgvsID := range hgvsIDs { + fmt.Fprintf(&hgvsLabels, "%d,%s\n", idx, hgvsID) + for ph := 0; ph < 2; ph++ { + hgvscol := hgvsCols[hgvsID][ph] + for row, val := range hgvscol { + out[row*cols+idx*2+ph] = val + } + } + } + err = writeNumpyInt16(fmt.Sprintf("%s/%s.npy", *outputDir, basename), out, rows, cols) + if err != nil { + return 1 + } - fnm := fmt.Sprintf("%s/hgvs.annotations.csv", *outputDir) - log.Printf("writing hgvs labels: %s", fnm) - err = ioutil.WriteFile(fnm, hgvsLabels.Bytes(), 0777) - if err != nil { - return 1 + fnm := fmt.Sprintf("%s/%s.annotations.csv", *outputDir, basename) + log.Printf("writing hgvs labels: %s", fnm) + err = ioutil.WriteFile(fnm, hgvsLabels.Bytes(), 0777) + if err != nil { + return 1 + } } } return 0 -- 2.30.2