Bump import memory.
[lightning.git] / exportnumpy.go
index fd198116b1512c01f6fb51db67bc82354edd40d5..99b0381c93c615128aa1d2d2508c9d1f1e4ccc5d 100644 (file)
@@ -2,6 +2,7 @@ package main
 
 import (
        "bufio"
+       "context"
        "errors"
        "flag"
        "fmt"
@@ -10,14 +11,18 @@ import (
        "net/http"
        _ "net/http/pprof"
        "os"
+       "path"
        "sort"
+       "strings"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "github.com/kshedden/gonpy"
        log "github.com/sirupsen/logrus"
 )
 
-type exportNumpy struct{}
+type exportNumpy struct {
+       filter filter
+}
 
 func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
        var err error
@@ -34,7 +39,11 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader,
        priority := flags.Int("priority", 500, "container request priority")
        inputFilename := flags.String("i", "-", "input `file`")
        outputFilename := flags.String("o", "-", "output `file`")
+       annotationsFilename := flags.String("output-annotations", "", "output `file` for tile variant annotations csv")
+       librefsFilename := flags.String("output-onehot2tilevar", "", "when using -one-hot, create csv `file` mapping column# to tag# and variant#")
+       labelsFilename := flags.String("output-labels", "", "output `file` for genome labels csv")
        onehot := flags.Bool("one-hot", false, "recode tile variants as one-hot")
+       cmd.filter.Flags(flags)
        err = flags.Parse(args)
        if err == flag.ErrHelp {
                err = nil
@@ -58,21 +67,33 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader,
                        Name:        "lightning export-numpy",
                        Client:      arvados.NewClientFromEnv(),
                        ProjectUUID: *projectUUID,
-                       RAM:         128000000000,
-                       VCPUs:       2,
+                       RAM:         450000000000,
+                       VCPUs:       32,
                        Priority:    *priority,
+                       KeepCache:   1,
+                       APIAccess:   true,
                }
                err = runner.TranslatePaths(inputFilename)
                if err != nil {
                        return 1
                }
-               runner.Args = []string{"export-numpy", "-local=true", fmt.Sprintf("-one-hot=%v", *onehot), "-i", *inputFilename, "-o", "/mnt/output/library.npy"}
+               runner.Args = []string{"export-numpy", "-local=true",
+                       fmt.Sprintf("-one-hot=%v", *onehot),
+                       "-i", *inputFilename,
+                       "-o", "/mnt/output/matrix.npy",
+                       "-output-annotations", "/mnt/output/annotations.csv",
+                       "-output-onehot2tilevar", "/mnt/output/onehot2tilevar.csv",
+                       "-output-labels", "/mnt/output/labels.csv",
+                       "-max-variants", fmt.Sprintf("%d", cmd.filter.MaxVariants),
+                       "-min-coverage", fmt.Sprintf("%f", cmd.filter.MinCoverage),
+                       "-max-tag", fmt.Sprintf("%d", cmd.filter.MaxTag),
+               }
                var output string
                output, err = runner.Run()
                if err != nil {
                        return 1
                }
-               fmt.Fprintln(stdout, output+"/library.npy")
+               fmt.Fprintln(stdout, output+"/matrix.npy")
                return 0
        }
 
@@ -80,13 +101,19 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader,
        if *inputFilename == "-" {
                input = ioutil.NopCloser(stdin)
        } else {
-               input, err = os.Open(*inputFilename)
+               input, err = open(*inputFilename)
                if err != nil {
                        return 1
                }
                defer input.Close()
        }
-       cgs, err := ReadCompactGenomes(input)
+       input = ioutil.NopCloser(bufio.NewReaderSize(input, 8*1024*1024))
+       tilelib := &tileLibrary{
+               retainNoCalls:       true,
+               retainTileSequences: true,
+               compactGenomes:      map[string][]tileVariantID{},
+       }
+       err = tilelib.LoadGob(context.Background(), input, strings.HasSuffix(*inputFilename, ".gz"), nil)
        if err != nil {
                return 1
        }
@@ -94,10 +121,57 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader,
        if err != nil {
                return 1
        }
-       sort.Slice(cgs, func(i, j int) bool { return cgs[i].Name < cgs[j].Name })
 
-       out, rows, cols := cgs2array(cgs)
+       log.Info("filtering")
+       cmd.filter.Apply(tilelib)
+       log.Info("tidying")
+       tilelib.Tidy()
+
+       if *annotationsFilename != "" {
+               log.Infof("writing annotations")
+               var annow io.WriteCloser
+               annow, err = os.OpenFile(*annotationsFilename, os.O_CREATE|os.O_WRONLY, 0666)
+               if err != nil {
+                       return 1
+               }
+               defer annow.Close()
+               err = (&annotatecmd{maxTileSize: 5000}).exportTileDiffs(annow, tilelib)
+               if err != nil {
+                       return 1
+               }
+               err = annow.Close()
+               if err != nil {
+                       return 1
+               }
+       }
+
+       log.Info("building numpy array")
+       out, rows, cols, names := cgs2array(tilelib)
 
+       if *labelsFilename != "" {
+               log.Infof("writing labels to %s", *labelsFilename)
+               var f *os.File
+               f, err = os.OpenFile(*labelsFilename, os.O_CREATE|os.O_WRONLY, 0777)
+               if err != nil {
+                       return 1
+               }
+               defer f.Close()
+               _, outBasename := path.Split(*outputFilename)
+               for i, name := range names {
+                       _, err = fmt.Fprintf(f, "%d,%q,%q\n", i, trimFilenameForLabel(name), outBasename)
+                       if err != nil {
+                               err = fmt.Errorf("write %s: %w", *labelsFilename, err)
+                               return 1
+                       }
+               }
+               err = f.Close()
+               if err != nil {
+                       err = fmt.Errorf("close %s: %w", *labelsFilename, err)
+                       return 1
+               }
+       }
+
+       log.Info("writing numpy file")
        var output io.WriteCloser
        if *outputFilename == "-" {
                output = nopCloser{stdout}
@@ -114,10 +188,20 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader,
                return 1
        }
        if *onehot {
-               out, cols = recodeOnehot(out, cols)
+               log.Info("recoding to onehot")
+               recoded, librefs, recodedcols := recodeOnehot(out, cols)
+               out, cols = recoded, recodedcols
+               if *librefsFilename != "" {
+                       log.Infof("writing onehot column mapping")
+                       err = cmd.writeLibRefs(*librefsFilename, tilelib, librefs)
+                       if err != nil {
+                               return 1
+                       }
+               }
        }
+       log.Info("writing numpy")
        npw.Shape = []int{rows, cols}
-       npw.WriteUint16(out)
+       npw.WriteInt16(out)
        err = bufw.Flush()
        if err != nil {
                return 1
@@ -129,25 +213,68 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader,
        return 0
 }
 
-func cgs2array(cgs []CompactGenome) (data []uint16, rows, cols int) {
-       rows = len(cgs)
-       for _, cg := range cgs {
-               if cols < len(cg.Variants) {
-                       cols = len(cg.Variants)
+func (*exportNumpy) writeLibRefs(fnm string, tilelib *tileLibrary, librefs []tileLibRef) error {
+       f, err := os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0666)
+       if err != nil {
+               return err
+       }
+       defer f.Close()
+       for i, libref := range librefs {
+               _, err = fmt.Fprintf(f, "%d,%d,%d\n", i, libref.Tag, libref.Variant)
+               if err != nil {
+                       return err
                }
        }
-       data = make([]uint16, rows*cols)
-       for row, cg := range cgs {
-               for i, v := range cg.Variants {
-                       data[row*cols+i] = uint16(v)
+       return f.Close()
+}
+
+func cgs2array(tilelib *tileLibrary) (data []int16, rows, cols int, cgnames []string) {
+       for name := range tilelib.compactGenomes {
+               cgnames = append(cgnames, name)
+       }
+       sort.Slice(cgnames, func(i, j int) bool {
+               return trimFilenameForLabel(cgnames[i]) < trimFilenameForLabel(cgnames[j])
+       })
+
+       rows = len(tilelib.compactGenomes)
+       for _, cg := range tilelib.compactGenomes {
+               if cols < len(cg) {
+                       cols = len(cg)
+               }
+       }
+
+       // flag low-quality tile variants so we can change to -1 below
+       lowqual := make([]map[tileVariantID]bool, cols/2)
+       for tag, variants := range tilelib.variant {
+               lq := lowqual[tag]
+               for varidx, hash := range variants {
+                       if len(tilelib.seq[hash]) == 0 {
+                               if lq == nil {
+                                       lq = map[tileVariantID]bool{}
+                                       lowqual[tag] = lq
+                               }
+                               lq[tileVariantID(varidx+1)] = true
+                       }
                }
        }
+
+       data = make([]int16, rows*cols)
+       for row, name := range cgnames {
+               for i, v := range tilelib.compactGenomes[name] {
+                       if v > 0 && lowqual[i/2][v] {
+                               data[row*cols+i] = -1
+                       } else {
+                               data[row*cols+i] = int16(v)
+                       }
+               }
+       }
+
        return
 }
 
-func recodeOnehot(in []uint16, incols int) ([]uint16, int) {
+func recodeOnehot(in []int16, incols int) (out []int16, librefs []tileLibRef, outcols int) {
        rows := len(in) / incols
-       maxvalue := make([]uint16, incols)
+       maxvalue := make([]int16, incols)
        for row := 0; row < rows; row++ {
                for col := 0; col < incols; col++ {
                        if v := in[row*incols+col]; maxvalue[col] < v {
@@ -156,20 +283,30 @@ func recodeOnehot(in []uint16, incols int) ([]uint16, int) {
                }
        }
        outcol := make([]int, incols)
-       outcols := 0
-       for incol, v := range maxvalue {
+       dropped := 0
+       for incol, maxv := range maxvalue {
                outcol[incol] = outcols
-               outcols += int(v)
+               if maxv == 0 {
+                       dropped++
+               }
+               for v := 1; v <= int(maxv); v++ {
+                       librefs = append(librefs, tileLibRef{Tag: tagID(incol), Variant: tileVariantID(v)})
+                       outcols++
+               }
        }
-       out := make([]uint16, rows*outcols)
-       for row := 0; row < rows; row++ {
+       log.Printf("recodeOnehot: dropped %d input cols with zero maxvalue", dropped)
+
+       out = make([]int16, rows*outcols)
+       for inidx, row := 0, 0; row < rows; row++ {
+               outrow := out[row*outcols:]
                for col := 0; col < incols; col++ {
-                       if v := in[row*incols+col]; v > 0 {
-                               out[row*outcols+outcol[col]+int(v)-1] = 1
+                       if v := in[inidx]; v > 0 {
+                               outrow[outcol[col]+int(v)-1] = 1
                        }
+                       inidx++
                }
        }
-       return out, outcols
+       return
 }
 
 type nopCloser struct {
@@ -177,3 +314,17 @@ type nopCloser struct {
 }
 
 func (nopCloser) Close() error { return nil }
+
+func trimFilenameForLabel(s string) string {
+       if i := strings.LastIndex(s, "/"); i >= 0 {
+               s = s[i+1:]
+       }
+       s = strings.TrimSuffix(s, ".gz")
+       s = strings.TrimSuffix(s, ".fa")
+       s = strings.TrimSuffix(s, ".fasta")
+       s = strings.TrimSuffix(s, ".1")
+       s = strings.TrimSuffix(s, ".2")
+       s = strings.TrimSuffix(s, ".gz")
+       s = strings.TrimSuffix(s, ".vcf")
+       return s
+}