Generate numpy matrices from slices.
[lightning.git] / slicenumpy.go
1 // Copyright (C) The Lightning Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package lightning
6
7 import (
8         "bufio"
9         "flag"
10         "fmt"
11         "io"
12         "net/http"
13         _ "net/http/pprof"
14         "os"
15         "runtime"
16         "sort"
17         "strings"
18
19         "git.arvados.org/arvados.git/sdk/go/arvados"
20         "github.com/kshedden/gonpy"
21         "github.com/sirupsen/logrus"
22         log "github.com/sirupsen/logrus"
23 )
24
25 type sliceNumpy struct {
26         filter filter
27 }
28
29 func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
30         var err error
31         defer func() {
32                 if err != nil {
33                         fmt.Fprintf(stderr, "%s\n", err)
34                 }
35         }()
36         flags := flag.NewFlagSet("", flag.ContinueOnError)
37         flags.SetOutput(stderr)
38         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
39         runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
40         projectUUID := flags.String("project", "", "project `UUID` for output data")
41         priority := flags.Int("priority", 500, "container request priority")
42         inputDir := flags.String("input-dir", "./in", "input `directory`")
43         outputDir := flags.String("output-dir", "./out", "output `directory`")
44         regionsFilename := flags.String("regions", "", "only output columns/annotations that intersect regions in specified bed `file`")
45         expandRegions := flags.Int("expand-regions", 0, "expand specified regions by `N` base pairs on each side`")
46         cmd.filter.Flags(flags)
47         err = flags.Parse(args)
48         if err == flag.ErrHelp {
49                 err = nil
50                 return 0
51         } else if err != nil {
52                 return 2
53         }
54
55         if *pprof != "" {
56                 go func() {
57                         log.Println(http.ListenAndServe(*pprof, nil))
58                 }()
59         }
60
61         if !*runlocal {
62                 runner := arvadosContainerRunner{
63                         Name:        "lightning slice-numpy",
64                         Client:      arvados.NewClientFromEnv(),
65                         ProjectUUID: *projectUUID,
66                         RAM:         150000000000,
67                         VCPUs:       32,
68                         Priority:    *priority,
69                         KeepCache:   1,
70                         APIAccess:   true,
71                 }
72                 err = runner.TranslatePaths(inputDir, regionsFilename)
73                 if err != nil {
74                         return 1
75                 }
76                 runner.Args = []string{"slice-numpy", "-local=true",
77                         "-pprof", ":6060",
78                         "-input-dir", *inputDir,
79                         "-output-dir", "/mnt/output",
80                         "-regions", *regionsFilename,
81                         "-expand-regions", fmt.Sprintf("%d", *expandRegions),
82                 }
83                 runner.Args = append(runner.Args, cmd.filter.Args()...)
84                 var output string
85                 output, err = runner.Run()
86                 if err != nil {
87                         return 1
88                 }
89                 fmt.Fprintln(stdout, output)
90                 return 0
91         }
92
93         infiles, err := allGobFiles(*inputDir)
94         if err != nil {
95                 return 1
96         }
97         if len(infiles) == 0 {
98                 err = fmt.Errorf("no input files found in %s", *inputDir)
99                 return 1
100         }
101         sort.Strings(infiles)
102
103         var cgnames []string
104         refseqs := map[string]map[string][]tileLibRef{}
105         in0, err := open(infiles[0])
106         if err != nil {
107                 return 1
108         }
109         DecodeLibrary(in0, strings.HasSuffix(infiles[0], ".gz"), func(ent *LibraryEntry) error {
110                 for _, cseq := range ent.CompactSequences {
111                         refseqs[cseq.Name] = cseq.TileSequences
112                 }
113                 for _, cg := range ent.CompactGenomes {
114                         cgnames = append(cgnames, cg.Name)
115                 }
116                 return nil
117         })
118         if err != nil {
119                 return 1
120         }
121         in0.Close()
122         sort.Strings(cgnames)
123
124         {
125                 labelsFilename := *outputDir + "/labels.csv"
126                 log.Infof("writing labels to %s", labelsFilename)
127                 var f *os.File
128                 f, err = os.Create(labelsFilename)
129                 if err != nil {
130                         return 1
131                 }
132                 defer f.Close()
133                 for i, name := range cgnames {
134                         _, err = fmt.Fprintf(f, "%d,%q\n", i, trimFilenameForLabel(name))
135                         if err != nil {
136                                 err = fmt.Errorf("write %s: %w", labelsFilename, err)
137                                 return 1
138                         }
139                 }
140                 err = f.Close()
141                 if err != nil {
142                         err = fmt.Errorf("close %s: %w", labelsFilename, err)
143                         return 1
144                 }
145         }
146
147         log.Info("building list of reference tiles to load") // TODO: more efficient if we had saved all ref tiles in slice0
148         reftiledata := map[tileLibRef]*[]byte{}
149         for _, ref := range refseqs {
150                 for _, cseq := range ref {
151                         for _, libref := range cseq {
152                                 reftiledata[libref] = new([]byte)
153                         }
154                 }
155         }
156         log.Info("loading reference tiles from all slices")
157         throttle := throttle{Max: runtime.NumCPU()}
158         for _, infile := range infiles {
159                 infile := infile
160                 throttle.Go(func() error {
161                         defer log.Infof("%s: done", infile)
162                         f, err := open(infile)
163                         if err != nil {
164                                 return err
165                         }
166                         defer f.Close()
167                         return DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error {
168                                 for _, tv := range ent.TileVariants {
169                                         libref := tileLibRef{tv.Tag, tv.Variant}
170                                         if dst, ok := reftiledata[libref]; ok {
171                                                 *dst = tv.Sequence
172                                         }
173                                 }
174                                 return nil
175                         })
176                 })
177         }
178         throttle.Wait()
179
180         log.Info("TODO: determining which tiles intersect given regions")
181
182         log.Info("generating annotations and numpy matrix for each slice")
183         for infileIdx, infile := range infiles {
184                 infileIdx, infile := infileIdx, infile
185                 throttle.Go(func() error {
186                         defer log.Infof("%s: done", infile)
187                         seq := map[tileLibRef][]byte{}
188                         cgs := make(map[string]CompactGenome, len(cgnames))
189                         f, err := open(infile)
190                         if err != nil {
191                                 return err
192                         }
193                         defer f.Close()
194                         err = DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error {
195                                 for _, tv := range ent.TileVariants {
196                                         seq[tileLibRef{tv.Tag, tv.Variant}] = tv.Sequence
197                                 }
198                                 for _, cg := range ent.CompactGenomes {
199                                         cgs[cg.Name] = cg
200                                 }
201                                 return nil
202                         })
203                         if err != nil {
204                                 return err
205                         }
206
207                         log.Infof("TODO: %s: filtering", infile)
208                         log.Infof("TODO: %s: tidying", infile)
209                         log.Infof("TODO: %s: lowqual to -1", infile)
210
211                         annotationsFilename := fmt.Sprintf("%s/matrix.%04d.annotations.csv", *outputDir, infileIdx)
212                         log.Infof("%s: writing annotations to %s", infile, annotationsFilename)
213                         annow, err := os.Create(annotationsFilename)
214                         if err != nil {
215                                 return err
216                         }
217                         // for libref, seq := range seq {
218                         //      // TODO: diff from ref.
219                         // }
220                         err = annow.Close()
221                         if err != nil {
222                                 return err
223                         }
224
225                         log.Infof("%s: preparing numpy", infile)
226                         tagstart := cgs[cgnames[0]].StartTag
227                         tagend := cgs[cgnames[0]].EndTag
228                         rows := len(cgnames)
229                         cols := 2 * int(tagend-tagstart)
230                         out := make([]int16, rows*cols)
231                         for row, name := range cgnames {
232                                 out := out[row*cols:]
233                                 for col, v := range cgs[name].Variants {
234                                         if v == 0 {
235                                                 // out[col] = 0
236                                         } else if _, ok := seq[tileLibRef{tagstart + tagID(col/2), v}]; ok {
237                                                 out[col] = int16(v)
238                                         } else {
239                                                 out[col] = -1
240                                         }
241                                 }
242                         }
243
244                         fnm := fmt.Sprintf("%s/matrix.%04d.npy", *outputDir, infileIdx)
245                         output, err := os.Create(fnm)
246                         if err != nil {
247                                 return err
248                         }
249                         defer output.Close()
250                         bufw := bufio.NewWriter(output)
251                         npw, err := gonpy.NewWriter(nopCloser{bufw})
252                         if err != nil {
253                                 return err
254                         }
255                         log.WithFields(logrus.Fields{
256                                 "filename": fnm,
257                                 "rows":     rows,
258                                 "cols":     cols,
259                         }).Info("writing numpy")
260                         npw.Shape = []int{rows, cols}
261                         npw.WriteInt16(out)
262                         err = bufw.Flush()
263                         if err != nil {
264                                 return err
265                         }
266                         err = output.Close()
267                         if err != nil {
268                                 return err
269                         }
270                         return nil
271                 })
272         }
273         if err = throttle.Wait(); err != nil {
274                 return 1
275         }
276         return 0
277 }
278
279 func (*sliceNumpy) writeLibRefs(fnm string, tilelib *tileLibrary, librefs []tileLibRef) error {
280         f, err := os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0666)
281         if err != nil {
282                 return err
283         }
284         defer f.Close()
285         for i, libref := range librefs {
286                 _, err = fmt.Fprintf(f, "%d,%d,%d\n", i, libref.Tag, libref.Variant)
287                 if err != nil {
288                         return err
289                 }
290         }
291         return f.Close()
292 }