Bump merge memory.
[lightning.git] / exportnumpy.go
1 package lightning
2
3 import (
4         "bufio"
5         "bytes"
6         "context"
7         "errors"
8         "flag"
9         "fmt"
10         "io"
11         "io/ioutil"
12         "net/http"
13         _ "net/http/pprof"
14         "os"
15         "path"
16         "sort"
17         "strconv"
18         "strings"
19
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "github.com/kshedden/gonpy"
22         "github.com/sirupsen/logrus"
23         log "github.com/sirupsen/logrus"
24 )
25
26 type exportNumpy struct {
27         filter filter
28 }
29
30 func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
31         var err error
32         defer func() {
33                 if err != nil {
34                         fmt.Fprintf(stderr, "%s\n", err)
35                 }
36         }()
37         flags := flag.NewFlagSet("", flag.ContinueOnError)
38         flags.SetOutput(stderr)
39         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
40         runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
41         projectUUID := flags.String("project", "", "project `UUID` for output data")
42         priority := flags.Int("priority", 500, "container request priority")
43         inputFilename := flags.String("i", "-", "input `file`")
44         outputFilename := flags.String("o", "-", "output `file`")
45         annotationsFilename := flags.String("output-annotations", "", "output `file` for tile variant annotations csv")
46         librefsFilename := flags.String("output-onehot2tilevar", "", "when using -one-hot, create csv `file` mapping column# to tag# and variant#")
47         labelsFilename := flags.String("output-labels", "", "output `file` for genome labels csv")
48         regionsFilename := flags.String("regions", "", "only output columns/annotations that intersect regions in specified bed `file`")
49         expandRegions := flags.Int("expand-regions", 0, "expand specified regions by `N` base pairs on each side`")
50         onehot := flags.Bool("one-hot", false, "recode tile variants as one-hot")
51         chunks := flags.Int("chunks", 1, "split output into `N` numpy files")
52         cmd.filter.Flags(flags)
53         err = flags.Parse(args)
54         if err == flag.ErrHelp {
55                 err = nil
56                 return 0
57         } else if err != nil {
58                 return 2
59         }
60
61         if *pprof != "" {
62                 go func() {
63                         log.Println(http.ListenAndServe(*pprof, nil))
64                 }()
65         }
66
67         if !*runlocal {
68                 if *outputFilename != "-" {
69                         err = errors.New("cannot specify output file in container mode: not implemented")
70                         return 1
71                 }
72                 runner := arvadosContainerRunner{
73                         Name:        "lightning export-numpy",
74                         Client:      arvados.NewClientFromEnv(),
75                         ProjectUUID: *projectUUID,
76                         RAM:         450000000000,
77                         VCPUs:       32,
78                         Priority:    *priority,
79                         KeepCache:   1,
80                         APIAccess:   true,
81                 }
82                 err = runner.TranslatePaths(inputFilename, regionsFilename)
83                 if err != nil {
84                         return 1
85                 }
86                 runner.Args = []string{"export-numpy", "-local=true",
87                         fmt.Sprintf("-one-hot=%v", *onehot),
88                         "-i", *inputFilename,
89                         "-o", "/mnt/output/matrix.npy",
90                         "-output-annotations", "/mnt/output/annotations.csv",
91                         "-output-onehot2tilevar", "/mnt/output/onehot2tilevar.csv",
92                         "-output-labels", "/mnt/output/labels.csv",
93                         "-regions", *regionsFilename,
94                         "-expand-regions", fmt.Sprintf("%d", *expandRegions),
95                         "-max-variants", fmt.Sprintf("%d", cmd.filter.MaxVariants),
96                         "-min-coverage", fmt.Sprintf("%f", cmd.filter.MinCoverage),
97                         "-max-tag", fmt.Sprintf("%d", cmd.filter.MaxTag),
98                         "-chunks", fmt.Sprintf("%d", *chunks),
99                 }
100                 var output string
101                 output, err = runner.Run()
102                 if err != nil {
103                         return 1
104                 }
105                 fmt.Fprintln(stdout, output+"/matrix.npy")
106                 return 0
107         }
108
109         var input io.ReadCloser
110         if *inputFilename == "-" {
111                 input = ioutil.NopCloser(stdin)
112         } else {
113                 input, err = open(*inputFilename)
114                 if err != nil {
115                         return 1
116                 }
117                 defer input.Close()
118         }
119         input = ioutil.NopCloser(bufio.NewReaderSize(input, 8*1024*1024))
120         tilelib := &tileLibrary{
121                 retainNoCalls:       true,
122                 retainTileSequences: true,
123                 compactGenomes:      map[string][]tileVariantID{},
124         }
125         err = tilelib.LoadGob(context.Background(), input, strings.HasSuffix(*inputFilename, ".gz"), nil)
126         if err != nil {
127                 return 1
128         }
129         err = input.Close()
130         if err != nil {
131                 return 1
132         }
133
134         log.Info("filtering")
135         cmd.filter.Apply(tilelib)
136         log.Info("tidying")
137         tilelib.Tidy()
138
139         log.Info("building lowqual map")
140         lowqual := lowqual(tilelib)
141         names := cgnames(tilelib)
142
143         if *labelsFilename != "" {
144                 log.Infof("writing labels to %s", *labelsFilename)
145                 var f *os.File
146                 f, err = os.OpenFile(*labelsFilename, os.O_CREATE|os.O_WRONLY, 0777)
147                 if err != nil {
148                         return 1
149                 }
150                 defer f.Close()
151                 _, outBasename := path.Split(*outputFilename)
152                 for i, name := range names {
153                         _, err = fmt.Fprintf(f, "%d,%q,%q\n", i, trimFilenameForLabel(name), outBasename)
154                         if err != nil {
155                                 err = fmt.Errorf("write %s: %w", *labelsFilename, err)
156                                 return 1
157                         }
158                 }
159                 err = f.Close()
160                 if err != nil {
161                         err = fmt.Errorf("close %s: %w", *labelsFilename, err)
162                         return 1
163                 }
164         }
165
166         log.Info("determining which tiles intersect given regions")
167         dropTiles, err := chooseTiles(tilelib, *regionsFilename, *expandRegions)
168         if err != nil {
169                 return 1
170         }
171
172         if *annotationsFilename != "" {
173                 log.Info("writing annotations")
174                 var annow io.WriteCloser
175                 annow, err = os.OpenFile(*annotationsFilename, os.O_CREATE|os.O_WRONLY, 0666)
176                 if err != nil {
177                         return 1
178                 }
179                 defer annow.Close()
180                 err = (&annotatecmd{maxTileSize: 5000, dropTiles: dropTiles}).exportTileDiffs(annow, tilelib)
181                 if err != nil {
182                         return 1
183                 }
184                 err = annow.Close()
185                 if err != nil {
186                         return 1
187                 }
188         }
189
190         chunksize := (len(tilelib.variant) + *chunks - 1) / *chunks
191         for chunk := 0; chunk < *chunks; chunk++ {
192                 log.Infof("preparing chunk %d of %d", chunk+1, *chunks)
193                 tagstart := chunk * chunksize
194                 tagend := tagstart + chunksize
195                 if tagend > len(tilelib.variant) {
196                         tagend = len(tilelib.variant)
197                 }
198                 out, rows, cols := cgs2array(tilelib, names, lowqual, dropTiles, tagstart, tagend)
199
200                 var npw *gonpy.NpyWriter
201                 var output io.WriteCloser
202                 fnm := *outputFilename
203                 if fnm == "-" {
204                         output = nopCloser{stdout}
205                 } else {
206                         if *chunks > 1 {
207                                 if strings.HasSuffix(fnm, ".npy") {
208                                         fnm = fmt.Sprintf("%s.%d.npy", fnm[:len(fnm)-4], chunk)
209                                 } else {
210                                         fnm = fmt.Sprintf("%s.%d", fnm, chunk)
211                                 }
212                         }
213                         output, err = os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0777)
214                         if err != nil {
215                                 return 1
216                         }
217                         defer output.Close()
218                 }
219                 bufw := bufio.NewWriter(output)
220                 npw, err = gonpy.NewWriter(nopCloser{bufw})
221                 if err != nil {
222                         return 1
223                 }
224                 if *onehot {
225                         log.Info("recoding to onehot")
226                         recoded, librefs, recodedcols := recodeOnehot(out, cols)
227                         out, cols = recoded, recodedcols
228                         if *librefsFilename != "" {
229                                 log.Infof("writing onehot column mapping")
230                                 err = cmd.writeLibRefs(*librefsFilename, tilelib, librefs)
231                                 if err != nil {
232                                         return 1
233                                 }
234                         }
235                 }
236                 log.WithFields(logrus.Fields{
237                         "filename": fnm,
238                         "rows":     rows,
239                         "cols":     cols,
240                 }).Info("writing numpy")
241                 npw.Shape = []int{rows, cols}
242                 npw.WriteInt16(out)
243                 err = bufw.Flush()
244                 if err != nil {
245                         return 1
246                 }
247                 err = output.Close()
248                 if err != nil {
249                         return 1
250                 }
251         }
252         return 0
253 }
254
255 func (*exportNumpy) writeLibRefs(fnm string, tilelib *tileLibrary, librefs []tileLibRef) error {
256         f, err := os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0666)
257         if err != nil {
258                 return err
259         }
260         defer f.Close()
261         for i, libref := range librefs {
262                 _, err = fmt.Fprintf(f, "%d,%d,%d\n", i, libref.Tag, libref.Variant)
263                 if err != nil {
264                         return err
265                 }
266         }
267         return f.Close()
268 }
269
270 func cgnames(tilelib *tileLibrary) (cgnames []string) {
271         for name := range tilelib.compactGenomes {
272                 cgnames = append(cgnames, name)
273         }
274         sort.Slice(cgnames, func(i, j int) bool {
275                 return trimFilenameForLabel(cgnames[i]) < trimFilenameForLabel(cgnames[j])
276         })
277         return
278 }
279
280 func lowqual(tilelib *tileLibrary) (lowqual []map[tileVariantID]bool) {
281         lowqual = make([]map[tileVariantID]bool, len(tilelib.variant))
282         for tag, variants := range tilelib.variant {
283                 lq := lowqual[tag]
284                 for varidx, hash := range variants {
285                         if len(tilelib.seq[hash]) == 0 {
286                                 if lq == nil {
287                                         lq = map[tileVariantID]bool{}
288                                         lowqual[tag] = lq
289                                 }
290                                 lq[tileVariantID(varidx+1)] = true
291                         }
292                 }
293         }
294         return
295 }
296
297 func cgs2array(tilelib *tileLibrary, names []string, lowqual []map[tileVariantID]bool, dropTiles []bool, tagstart, tagend int) (data []int16, rows, cols int) {
298         rows = len(tilelib.compactGenomes)
299         for tag := tagstart; tag < tagend; tag++ {
300                 if len(dropTiles) <= tag || !dropTiles[tag] {
301                         cols += 2
302                 }
303         }
304         data = make([]int16, rows*cols)
305         for row, name := range names {
306                 cg := tilelib.compactGenomes[name]
307                 outidx := 0
308                 for tag := tagstart; tag < tagend && tag*2+1 < len(cg); tag++ {
309                         if len(dropTiles) > tag && dropTiles[tag] {
310                                 continue
311                         }
312                         for phase := 0; phase < 2; phase++ {
313                                 v := cg[tag*2+phase]
314                                 if v > 0 && lowqual[tag][v] {
315                                         data[row*cols+outidx] = -1
316                                 } else {
317                                         data[row*cols+outidx] = int16(v)
318                                 }
319                                 outidx++
320                         }
321                 }
322         }
323         return
324 }
325
326 func chooseTiles(tilelib *tileLibrary, regionsFilename string, expandRegions int) (drop []bool, err error) {
327         if regionsFilename == "" {
328                 return
329         }
330         rfile, err := zopen(regionsFilename)
331         if err != nil {
332                 return
333         }
334         defer rfile.Close()
335         regions, err := ioutil.ReadAll(rfile)
336         if err != nil {
337                 return
338         }
339
340         log.Print("chooseTiles: building mask")
341         mask := &mask{}
342         for _, line := range bytes.Split(regions, []byte{'\n'}) {
343                 if bytes.HasPrefix(line, []byte{'#'}) {
344                         continue
345                 }
346                 fields := bytes.Split(line, []byte{'\t'})
347                 if len(fields) < 3 {
348                         continue
349                 }
350                 refseqname := string(fields[0])
351                 if strings.HasPrefix(refseqname, "chr") {
352                         refseqname = refseqname[3:]
353                 }
354                 start, err1 := strconv.Atoi(string(fields[1]))
355                 end, err2 := strconv.Atoi(string(fields[2]))
356                 if err1 == nil && err2 == nil {
357                         // BED
358                 } else {
359                         start, err1 = strconv.Atoi(string(fields[3]))
360                         end, err2 = strconv.Atoi(string(fields[4]))
361                         if err1 == nil && err2 == nil {
362                                 // GFF/GTF
363                                 end++
364                         } else {
365                                 err = fmt.Errorf("cannot parse input line as BED or GFF/GTF: %q", line)
366                                 return
367                         }
368                 }
369                 mask.Add(refseqname, start-expandRegions, end+expandRegions)
370         }
371         log.Print("chooseTiles: mask.Freeze")
372         mask.Freeze()
373
374         tagset := tilelib.taglib.Tags()
375         if len(tagset) == 0 {
376                 err = errors.New("cannot choose tiles by region in a library without tags")
377                 return
378         }
379         taglen := len(tagset[0])
380
381         log.Print("chooseTiles: check ref tiles")
382         // Find position+size of each reference tile, and if it
383         // intersects any of the desired regions, set drop[tag]=false.
384         //
385         // (Note it doesn't quite work to do the more obvious thing --
386         // start with drop=false and change to true when ref tiles
387         // intersect target regions -- because that would give us
388         // drop=false for tiles that don't appear at all in the
389         // reference.)
390         //
391         // TODO: (optionally?) don't drop tags for which some tile
392         // variants are spanning tiles, i.e., where the reference tile
393         // does not intersect the desired regions, but a spanning tile
394         // from a genome does.
395         drop = make([]bool, len(tilelib.variant))
396         for i := range drop {
397                 drop[i] = true
398         }
399         for refname, refseqs := range tilelib.refseqs {
400                 for refseqname, reftiles := range refseqs {
401                         if strings.HasPrefix(refseqname, "chr") {
402                                 refseqname = refseqname[3:]
403                         }
404                         tileend := 0
405                         for _, libref := range reftiles {
406                                 if libref.Variant < 1 {
407                                         err = fmt.Errorf("reference %q seq %q uses variant zero at tag %d", refname, refseqname, libref.Tag)
408                                         return
409                                 }
410                                 seq := tilelib.TileVariantSequence(libref)
411                                 if len(seq) < taglen {
412                                         err = fmt.Errorf("reference %q seq %q uses tile %d variant %d with sequence len %d < taglen %d", refname, refseqname, libref.Tag, libref.Variant, len(seq), taglen)
413                                         return
414                                 }
415                                 tilestart := tileend
416                                 tileend = tilestart + len(seq) - taglen
417                                 if mask.Check(refseqname, tilestart, tileend) {
418                                         drop[libref.Tag] = false
419                                 }
420                         }
421                 }
422         }
423
424         log.Print("chooseTiles: done")
425         return
426 }
427
428 func recodeOnehot(in []int16, incols int) (out []int16, librefs []tileLibRef, outcols int) {
429         rows := len(in) / incols
430         maxvalue := make([]int16, incols)
431         for row := 0; row < rows; row++ {
432                 for col := 0; col < incols; col++ {
433                         if v := in[row*incols+col]; maxvalue[col] < v {
434                                 maxvalue[col] = v
435                         }
436                 }
437         }
438         outcol := make([]int, incols)
439         dropped := 0
440         for incol, maxv := range maxvalue {
441                 outcol[incol] = outcols
442                 if maxv == 0 {
443                         dropped++
444                 }
445                 for v := 1; v <= int(maxv); v++ {
446                         librefs = append(librefs, tileLibRef{Tag: tagID(incol), Variant: tileVariantID(v)})
447                         outcols++
448                 }
449         }
450         log.Printf("recodeOnehot: dropped %d input cols with zero maxvalue", dropped)
451
452         out = make([]int16, rows*outcols)
453         for inidx, row := 0, 0; row < rows; row++ {
454                 outrow := out[row*outcols:]
455                 for col := 0; col < incols; col++ {
456                         if v := in[inidx]; v > 0 {
457                                 outrow[outcol[col]+int(v)-1] = 1
458                         }
459                         inidx++
460                 }
461         }
462         return
463 }
464
465 type nopCloser struct {
466         io.Writer
467 }
468
469 func (nopCloser) Close() error { return nil }
470
471 func trimFilenameForLabel(s string) string {
472         if i := strings.LastIndex(s, "/"); i >= 0 {
473                 s = s[i+1:]
474         }
475         s = strings.TrimSuffix(s, ".gz")
476         s = strings.TrimSuffix(s, ".fa")
477         s = strings.TrimSuffix(s, ".fasta")
478         s = strings.TrimSuffix(s, ".1")
479         s = strings.TrimSuffix(s, ".2")
480         s = strings.TrimSuffix(s, ".gz")
481         s = strings.TrimSuffix(s, ".vcf")
482         return s
483 }