Generate numpy matrices from slices.
authorTom Clegg <tom@tomclegg.ca>
Mon, 13 Sep 2021 14:01:49 +0000 (10:01 -0400)
committerTom Clegg <tom@tomclegg.ca>
Mon, 13 Sep 2021 14:01:49 +0000 (10:01 -0400)
refs #17996

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

cmd.go
slicenumpy.go [new file with mode: 0644]
throttle.go

diff --git a/cmd.go b/cmd.go
index 3975bf7b0348aff8a84b99712e83758813a79ede..dc5009a9c7e1f886b97f59f89422b5236cd11966 100644 (file)
--- a/cmd.go
+++ b/cmd.go
@@ -31,6 +31,7 @@ var (
                "export-numpy":       &exportNumpy{},
                "flake":              &flakecmd{},
                "slice":              &slicecmd{},
+               "slice-numpy":        &sliceNumpy{},
                "numpy-comvar":       &numpyComVar{},
                "filter":             &filtercmd{},
                "build-docker-image": &buildDockerImage{},
diff --git a/slicenumpy.go b/slicenumpy.go
new file mode 100644 (file)
index 0000000..601b35a
--- /dev/null
@@ -0,0 +1,292 @@
+// Copyright (C) The Lightning Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package lightning
+
+import (
+       "bufio"
+       "flag"
+       "fmt"
+       "io"
+       "net/http"
+       _ "net/http/pprof"
+       "os"
+       "runtime"
+       "sort"
+       "strings"
+
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "github.com/kshedden/gonpy"
+       "github.com/sirupsen/logrus"
+       log "github.com/sirupsen/logrus"
+)
+
+type sliceNumpy struct {
+       filter filter
+}
+
+func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+       var err error
+       defer func() {
+               if err != nil {
+                       fmt.Fprintf(stderr, "%s\n", err)
+               }
+       }()
+       flags := flag.NewFlagSet("", flag.ContinueOnError)
+       flags.SetOutput(stderr)
+       pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
+       runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
+       projectUUID := flags.String("project", "", "project `UUID` for output data")
+       priority := flags.Int("priority", 500, "container request priority")
+       inputDir := flags.String("input-dir", "./in", "input `directory`")
+       outputDir := flags.String("output-dir", "./out", "output `directory`")
+       regionsFilename := flags.String("regions", "", "only output columns/annotations that intersect regions in specified bed `file`")
+       expandRegions := flags.Int("expand-regions", 0, "expand specified regions by `N` base pairs on each side`")
+       cmd.filter.Flags(flags)
+       err = flags.Parse(args)
+       if err == flag.ErrHelp {
+               err = nil
+               return 0
+       } else if err != nil {
+               return 2
+       }
+
+       if *pprof != "" {
+               go func() {
+                       log.Println(http.ListenAndServe(*pprof, nil))
+               }()
+       }
+
+       if !*runlocal {
+               runner := arvadosContainerRunner{
+                       Name:        "lightning slice-numpy",
+                       Client:      arvados.NewClientFromEnv(),
+                       ProjectUUID: *projectUUID,
+                       RAM:         150000000000,
+                       VCPUs:       32,
+                       Priority:    *priority,
+                       KeepCache:   1,
+                       APIAccess:   true,
+               }
+               err = runner.TranslatePaths(inputDir, regionsFilename)
+               if err != nil {
+                       return 1
+               }
+               runner.Args = []string{"slice-numpy", "-local=true",
+                       "-pprof", ":6060",
+                       "-input-dir", *inputDir,
+                       "-output-dir", "/mnt/output",
+                       "-regions", *regionsFilename,
+                       "-expand-regions", fmt.Sprintf("%d", *expandRegions),
+               }
+               runner.Args = append(runner.Args, cmd.filter.Args()...)
+               var output string
+               output, err = runner.Run()
+               if err != nil {
+                       return 1
+               }
+               fmt.Fprintln(stdout, output)
+               return 0
+       }
+
+       infiles, err := allGobFiles(*inputDir)
+       if err != nil {
+               return 1
+       }
+       if len(infiles) == 0 {
+               err = fmt.Errorf("no input files found in %s", *inputDir)
+               return 1
+       }
+       sort.Strings(infiles)
+
+       var cgnames []string
+       refseqs := map[string]map[string][]tileLibRef{}
+       in0, err := open(infiles[0])
+       if err != nil {
+               return 1
+       }
+       DecodeLibrary(in0, strings.HasSuffix(infiles[0], ".gz"), func(ent *LibraryEntry) error {
+               for _, cseq := range ent.CompactSequences {
+                       refseqs[cseq.Name] = cseq.TileSequences
+               }
+               for _, cg := range ent.CompactGenomes {
+                       cgnames = append(cgnames, cg.Name)
+               }
+               return nil
+       })
+       if err != nil {
+               return 1
+       }
+       in0.Close()
+       sort.Strings(cgnames)
+
+       {
+               labelsFilename := *outputDir + "/labels.csv"
+               log.Infof("writing labels to %s", labelsFilename)
+               var f *os.File
+               f, err = os.Create(labelsFilename)
+               if err != nil {
+                       return 1
+               }
+               defer f.Close()
+               for i, name := range cgnames {
+                       _, err = fmt.Fprintf(f, "%d,%q\n", i, trimFilenameForLabel(name))
+                       if err != nil {
+                               err = fmt.Errorf("write %s: %w", labelsFilename, err)
+                               return 1
+                       }
+               }
+               err = f.Close()
+               if err != nil {
+                       err = fmt.Errorf("close %s: %w", labelsFilename, err)
+                       return 1
+               }
+       }
+
+       log.Info("building list of reference tiles to load") // TODO: more efficient if we had saved all ref tiles in slice0
+       reftiledata := map[tileLibRef]*[]byte{}
+       for _, ref := range refseqs {
+               for _, cseq := range ref {
+                       for _, libref := range cseq {
+                               reftiledata[libref] = new([]byte)
+                       }
+               }
+       }
+       log.Info("loading reference tiles from all slices")
+       throttle := throttle{Max: runtime.NumCPU()}
+       for _, infile := range infiles {
+               infile := infile
+               throttle.Go(func() error {
+                       defer log.Infof("%s: done", infile)
+                       f, err := open(infile)
+                       if err != nil {
+                               return err
+                       }
+                       defer f.Close()
+                       return DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error {
+                               for _, tv := range ent.TileVariants {
+                                       libref := tileLibRef{tv.Tag, tv.Variant}
+                                       if dst, ok := reftiledata[libref]; ok {
+                                               *dst = tv.Sequence
+                                       }
+                               }
+                               return nil
+                       })
+               })
+       }
+       throttle.Wait()
+
+       log.Info("TODO: determining which tiles intersect given regions")
+
+       log.Info("generating annotations and numpy matrix for each slice")
+       for infileIdx, infile := range infiles {
+               infileIdx, infile := infileIdx, infile
+               throttle.Go(func() error {
+                       defer log.Infof("%s: done", infile)
+                       seq := map[tileLibRef][]byte{}
+                       cgs := make(map[string]CompactGenome, len(cgnames))
+                       f, err := open(infile)
+                       if err != nil {
+                               return err
+                       }
+                       defer f.Close()
+                       err = DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error {
+                               for _, tv := range ent.TileVariants {
+                                       seq[tileLibRef{tv.Tag, tv.Variant}] = tv.Sequence
+                               }
+                               for _, cg := range ent.CompactGenomes {
+                                       cgs[cg.Name] = cg
+                               }
+                               return nil
+                       })
+                       if err != nil {
+                               return err
+                       }
+
+                       log.Infof("TODO: %s: filtering", infile)
+                       log.Infof("TODO: %s: tidying", infile)
+                       log.Infof("TODO: %s: lowqual to -1", infile)
+
+                       annotationsFilename := fmt.Sprintf("%s/matrix.%04d.annotations.csv", *outputDir, infileIdx)
+                       log.Infof("%s: writing annotations to %s", infile, annotationsFilename)
+                       annow, err := os.Create(annotationsFilename)
+                       if err != nil {
+                               return err
+                       }
+                       // for libref, seq := range seq {
+                       //      // TODO: diff from ref.
+                       // }
+                       err = annow.Close()
+                       if err != nil {
+                               return err
+                       }
+
+                       log.Infof("%s: preparing numpy", infile)
+                       tagstart := cgs[cgnames[0]].StartTag
+                       tagend := cgs[cgnames[0]].EndTag
+                       rows := len(cgnames)
+                       cols := 2 * int(tagend-tagstart)
+                       out := make([]int16, rows*cols)
+                       for row, name := range cgnames {
+                               out := out[row*cols:]
+                               for col, v := range cgs[name].Variants {
+                                       if v == 0 {
+                                               // out[col] = 0
+                                       } else if _, ok := seq[tileLibRef{tagstart + tagID(col/2), v}]; ok {
+                                               out[col] = int16(v)
+                                       } else {
+                                               out[col] = -1
+                                       }
+                               }
+                       }
+
+                       fnm := fmt.Sprintf("%s/matrix.%04d.npy", *outputDir, infileIdx)
+                       output, err := os.Create(fnm)
+                       if err != nil {
+                               return err
+                       }
+                       defer output.Close()
+                       bufw := bufio.NewWriter(output)
+                       npw, err := gonpy.NewWriter(nopCloser{bufw})
+                       if err != nil {
+                               return err
+                       }
+                       log.WithFields(logrus.Fields{
+                               "filename": fnm,
+                               "rows":     rows,
+                               "cols":     cols,
+                       }).Info("writing numpy")
+                       npw.Shape = []int{rows, cols}
+                       npw.WriteInt16(out)
+                       err = bufw.Flush()
+                       if err != nil {
+                               return err
+                       }
+                       err = output.Close()
+                       if err != nil {
+                               return err
+                       }
+                       return nil
+               })
+       }
+       if err = throttle.Wait(); err != nil {
+               return 1
+       }
+       return 0
+}
+
+func (*sliceNumpy) writeLibRefs(fnm string, tilelib *tileLibrary, librefs []tileLibRef) error {
+       f, err := os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0666)
+       if err != nil {
+               return err
+       }
+       defer f.Close()
+       for i, libref := range librefs {
+               _, err = fmt.Fprintf(f, "%d,%d,%d\n", i, libref.Tag, libref.Variant)
+               if err != nil {
+                       return err
+               }
+       }
+       return f.Close()
+}
index 1c7230f738853d3683aa652bc10a3ea5af1b0f6f..ce3279dd3398d76886c9f40cb2c279114d08d205 100644 (file)
@@ -44,3 +44,15 @@ func (t *throttle) Wait() error {
        t.wg.Wait()
        return t.Err()
 }
+
+func (t *throttle) Go(f func() error) error {
+       t.Acquire()
+       if t.Err() != nil {
+               return t.Err()
+       }
+       go func() {
+               t.Report(f())
+               t.Release()
+       }()
+       return nil
+}