Option to write numpy in chunks (same rows, fewer columns).
authorTom Clegg <tom@tomclegg.ca>
Thu, 11 Mar 2021 15:54:42 +0000 (10:54 -0500)
committerTom Clegg <tom@tomclegg.ca>
Thu, 11 Mar 2021 15:54:42 +0000 (10:54 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

exportnumpy.go
pca.go

index ec6c83b886831983f2a45225bf97c7271d027fc2..a323ba970b271b182c26c7f8b17ddb4aa60dbe55 100644 (file)
@@ -44,6 +44,7 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader,
        librefsFilename := flags.String("output-onehot2tilevar", "", "when using -one-hot, create csv `file` mapping column# to tag# and variant#")
        labelsFilename := flags.String("output-labels", "", "output `file` for genome labels csv")
        onehot := flags.Bool("one-hot", false, "recode tile variants as one-hot")
+       chunks := flags.Int("chunks", 1, "split output into `N` numpy files")
        cmd.filter.Flags(flags)
        err = flags.Parse(args)
        if err == flag.ErrHelp {
@@ -88,6 +89,7 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader,
                        "-max-variants", fmt.Sprintf("%d", cmd.filter.MaxVariants),
                        "-min-coverage", fmt.Sprintf("%f", cmd.filter.MinCoverage),
                        "-max-tag", fmt.Sprintf("%d", cmd.filter.MaxTag),
+                       "-chunks", fmt.Sprintf("%d", *chunks),
                }
                var output string
                output, err = runner.Run()
@@ -147,7 +149,8 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader,
        }
 
        log.Info("building numpy array")
-       out, rows, cols, names := cgs2array(tilelib)
+       names := cgnames(tilelib)
+       lowqual := lowqual(tilelib)
 
        if *labelsFilename != "" {
                log.Infof("writing labels to %s", *labelsFilename)
@@ -172,47 +175,66 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader,
                }
        }
 
-       log.Info("writing numpy file")
-       var output io.WriteCloser
-       if *outputFilename == "-" {
-               output = nopCloser{stdout}
-       } else {
-               output, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
-               if err != nil {
-                       return 1
+       chunksize := (len(tilelib.variant) + *chunks - 1) / *chunks
+       for chunk := 0; chunk < *chunks; chunk++ {
+               tagstart := chunk * chunksize
+               tagend := tagstart + chunksize
+               if tagend > len(tilelib.variant) {
+                       tagend = len(tilelib.variant)
                }
-               defer output.Close()
-       }
-       bufw := bufio.NewWriter(output)
-       npw, err := gonpy.NewWriter(nopCloser{bufw})
-       if err != nil {
-               return 1
-       }
-       if *onehot {
-               log.Info("recoding to onehot")
-               recoded, librefs, recodedcols := recodeOnehot(out, cols)
-               out, cols = recoded, recodedcols
-               if *librefsFilename != "" {
-                       log.Infof("writing onehot column mapping")
-                       err = cmd.writeLibRefs(*librefsFilename, tilelib, librefs)
+               out, rows, cols := cgs2array(tilelib, names, lowqual, tagstart, tagend)
+
+               var npw *gonpy.NpyWriter
+               var output io.WriteCloser
+               fnm := *outputFilename
+               if fnm == "-" {
+                       output = nopCloser{stdout}
+               } else {
+                       if *chunks > 1 {
+                               if strings.HasSuffix(fnm, ".npy") {
+                                       fnm = fmt.Sprintf("%s.%d.npy", fnm[:len(fnm)-4], chunk)
+                               } else {
+                                       fnm = fmt.Sprintf("%s.%d", fnm, chunk)
+                               }
+                       }
+                       output, err = os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0777)
                        if err != nil {
                                return 1
                        }
+                       defer output.Close()
+               }
+               bufw := bufio.NewWriter(output)
+               npw, err = gonpy.NewWriter(nopCloser{bufw})
+               if err != nil {
+                       return 1
+               }
+               if *onehot {
+                       log.Info("recoding to onehot")
+                       recoded, librefs, recodedcols := recodeOnehot(out, cols)
+                       out, cols = recoded, recodedcols
+                       if *librefsFilename != "" {
+                               log.Infof("writing onehot column mapping")
+                               err = cmd.writeLibRefs(*librefsFilename, tilelib, librefs)
+                               if err != nil {
+                                       return 1
+                               }
+                       }
+               }
+               log.WithFields(logrus.Fields{
+                       "filename": fnm,
+                       "rows":     rows,
+                       "cols":     cols,
+               }).Info("writing numpy")
+               npw.Shape = []int{rows, cols}
+               npw.WriteInt16(out)
+               err = bufw.Flush()
+               if err != nil {
+                       return 1
+               }
+               err = output.Close()
+               if err != nil {
+                       return 1
                }
-       }
-       log.WithFields(logrus.Fields{
-               "rows": rows,
-               "cols": cols,
-       }).Info("writing numpy")
-       npw.Shape = []int{rows, cols}
-       npw.WriteInt16(out)
-       err = bufw.Flush()
-       if err != nil {
-               return 1
-       }
-       err = output.Close()
-       if err != nil {
-               return 1
        }
        return 0
 }
@@ -232,23 +254,18 @@ func (*exportNumpy) writeLibRefs(fnm string, tilelib *tileLibrary, librefs []til
        return f.Close()
 }
 
-func cgs2array(tilelib *tileLibrary) (data []int16, rows, cols int, cgnames []string) {
+func cgnames(tilelib *tileLibrary) (cgnames []string) {
        for name := range tilelib.compactGenomes {
                cgnames = append(cgnames, name)
        }
        sort.Slice(cgnames, func(i, j int) bool {
                return trimFilenameForLabel(cgnames[i]) < trimFilenameForLabel(cgnames[j])
        })
+       return
+}
 
-       rows = len(tilelib.compactGenomes)
-       for _, cg := range tilelib.compactGenomes {
-               if cols < len(cg) {
-                       cols = len(cg)
-               }
-       }
-
-       // flag low-quality tile variants so we can change to -1 below
-       lowqual := make([]map[tileVariantID]bool, cols/2)
+func lowqual(tilelib *tileLibrary) (lowqual []map[tileVariantID]bool) {
+       lowqual = make([]map[tileVariantID]bool, len(tilelib.variant))
        for tag, variants := range tilelib.variant {
                lq := lowqual[tag]
                for varidx, hash := range variants {
@@ -261,18 +278,30 @@ func cgs2array(tilelib *tileLibrary) (data []int16, rows, cols int, cgnames []st
                        }
                }
        }
+       return
+}
 
+func cgs2array(tilelib *tileLibrary, names []string, lowqual []map[tileVariantID]bool, tagstart, tagend int) (data []int16, rows, cols int) {
+       rows = len(tilelib.compactGenomes)
+       cols = (tagend - tagstart) * 2
        data = make([]int16, rows*cols)
-       for row, name := range cgnames {
-               for i, v := range tilelib.compactGenomes[name] {
-                       if v > 0 && lowqual[i/2][v] {
+       for row, name := range names {
+               cg := tilelib.compactGenomes[name]
+               if tagstart*2 >= len(cg) {
+                       continue
+               }
+               cg = cg[tagstart*2:]
+               if cols < len(cg) {
+                       cg = cg[:cols]
+               }
+               for i, v := range cg {
+                       if v > 0 && lowqual[tagstart+i/2][v] {
                                data[row*cols+i] = -1
                        } else {
                                data[row*cols+i] = int16(v)
                        }
                }
        }
-
        return
 }
 
diff --git a/pca.go b/pca.go
index 0248fa557090d580873430d8ef0e60cb04b03222..a5c720d24a3ac6aee6e3ec6d5b27968e84521787 100644 (file)
--- a/pca.go
+++ b/pca.go
@@ -162,7 +162,7 @@ func (cmd *goPCA) RunCommand(prog string, args []string, stdin io.Reader, stdout
        tilelib.Tidy()
 
        log.Print("converting cgs to array")
-       data, rows, cols, _ := cgs2array(tilelib)
+       data, rows, cols := cgs2array(tilelib, cgnames(tilelib), lowqual(tilelib), 0, len(tilelib.variant))
        if *onehot {
                log.Printf("recode one-hot: %d rows, %d cols", rows, cols)
                data, _, cols = recodeOnehot(data, cols)