Test numpy annotations.
[lightning.git] / exportnumpy.go
1 package lightning
2
3 import (
4         "bufio"
5         "context"
6         "errors"
7         "flag"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "net/http"
12         _ "net/http/pprof"
13         "os"
14         "path"
15         "sort"
16         "strings"
17
18         "git.arvados.org/arvados.git/sdk/go/arvados"
19         "github.com/kshedden/gonpy"
20         "github.com/sirupsen/logrus"
21         log "github.com/sirupsen/logrus"
22 )
23
24 type exportNumpy struct {
25         filter filter
26 }
27
28 func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
29         var err error
30         defer func() {
31                 if err != nil {
32                         fmt.Fprintf(stderr, "%s\n", err)
33                 }
34         }()
35         flags := flag.NewFlagSet("", flag.ContinueOnError)
36         flags.SetOutput(stderr)
37         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
38         runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
39         projectUUID := flags.String("project", "", "project `UUID` for output data")
40         priority := flags.Int("priority", 500, "container request priority")
41         inputFilename := flags.String("i", "-", "input `file`")
42         outputFilename := flags.String("o", "-", "output `file`")
43         annotationsFilename := flags.String("output-annotations", "", "output `file` for tile variant annotations csv")
44         librefsFilename := flags.String("output-onehot2tilevar", "", "when using -one-hot, create csv `file` mapping column# to tag# and variant#")
45         labelsFilename := flags.String("output-labels", "", "output `file` for genome labels csv")
46         onehot := flags.Bool("one-hot", false, "recode tile variants as one-hot")
47         chunks := flags.Int("chunks", 1, "split output into `N` numpy files")
48         cmd.filter.Flags(flags)
49         err = flags.Parse(args)
50         if err == flag.ErrHelp {
51                 err = nil
52                 return 0
53         } else if err != nil {
54                 return 2
55         }
56
57         if *pprof != "" {
58                 go func() {
59                         log.Println(http.ListenAndServe(*pprof, nil))
60                 }()
61         }
62
63         if !*runlocal {
64                 if *outputFilename != "-" {
65                         err = errors.New("cannot specify output file in container mode: not implemented")
66                         return 1
67                 }
68                 runner := arvadosContainerRunner{
69                         Name:        "lightning export-numpy",
70                         Client:      arvados.NewClientFromEnv(),
71                         ProjectUUID: *projectUUID,
72                         RAM:         450000000000,
73                         VCPUs:       32,
74                         Priority:    *priority,
75                         KeepCache:   1,
76                         APIAccess:   true,
77                 }
78                 err = runner.TranslatePaths(inputFilename)
79                 if err != nil {
80                         return 1
81                 }
82                 runner.Args = []string{"export-numpy", "-local=true",
83                         fmt.Sprintf("-one-hot=%v", *onehot),
84                         "-i", *inputFilename,
85                         "-o", "/mnt/output/matrix.npy",
86                         "-output-annotations", "/mnt/output/annotations.csv",
87                         "-output-onehot2tilevar", "/mnt/output/onehot2tilevar.csv",
88                         "-output-labels", "/mnt/output/labels.csv",
89                         "-max-variants", fmt.Sprintf("%d", cmd.filter.MaxVariants),
90                         "-min-coverage", fmt.Sprintf("%f", cmd.filter.MinCoverage),
91                         "-max-tag", fmt.Sprintf("%d", cmd.filter.MaxTag),
92                         "-chunks", fmt.Sprintf("%d", *chunks),
93                 }
94                 var output string
95                 output, err = runner.Run()
96                 if err != nil {
97                         return 1
98                 }
99                 fmt.Fprintln(stdout, output+"/matrix.npy")
100                 return 0
101         }
102
103         var input io.ReadCloser
104         if *inputFilename == "-" {
105                 input = ioutil.NopCloser(stdin)
106         } else {
107                 input, err = open(*inputFilename)
108                 if err != nil {
109                         return 1
110                 }
111                 defer input.Close()
112         }
113         input = ioutil.NopCloser(bufio.NewReaderSize(input, 8*1024*1024))
114         tilelib := &tileLibrary{
115                 retainNoCalls:       true,
116                 retainTileSequences: true,
117                 compactGenomes:      map[string][]tileVariantID{},
118         }
119         err = tilelib.LoadGob(context.Background(), input, strings.HasSuffix(*inputFilename, ".gz"), nil)
120         if err != nil {
121                 return 1
122         }
123         err = input.Close()
124         if err != nil {
125                 return 1
126         }
127
128         log.Info("filtering")
129         cmd.filter.Apply(tilelib)
130         log.Info("tidying")
131         tilelib.Tidy()
132
133         if *annotationsFilename != "" {
134                 log.Infof("writing annotations")
135                 var annow io.WriteCloser
136                 annow, err = os.OpenFile(*annotationsFilename, os.O_CREATE|os.O_WRONLY, 0666)
137                 if err != nil {
138                         return 1
139                 }
140                 defer annow.Close()
141                 err = (&annotatecmd{maxTileSize: 5000}).exportTileDiffs(annow, tilelib)
142                 if err != nil {
143                         return 1
144                 }
145                 err = annow.Close()
146                 if err != nil {
147                         return 1
148                 }
149         }
150
151         log.Info("building lowqual map")
152         lowqual := lowqual(tilelib)
153         names := cgnames(tilelib)
154
155         if *labelsFilename != "" {
156                 log.Infof("writing labels to %s", *labelsFilename)
157                 var f *os.File
158                 f, err = os.OpenFile(*labelsFilename, os.O_CREATE|os.O_WRONLY, 0777)
159                 if err != nil {
160                         return 1
161                 }
162                 defer f.Close()
163                 _, outBasename := path.Split(*outputFilename)
164                 for i, name := range names {
165                         _, err = fmt.Fprintf(f, "%d,%q,%q\n", i, trimFilenameForLabel(name), outBasename)
166                         if err != nil {
167                                 err = fmt.Errorf("write %s: %w", *labelsFilename, err)
168                                 return 1
169                         }
170                 }
171                 err = f.Close()
172                 if err != nil {
173                         err = fmt.Errorf("close %s: %w", *labelsFilename, err)
174                         return 1
175                 }
176         }
177
178         chunksize := (len(tilelib.variant) + *chunks - 1) / *chunks
179         for chunk := 0; chunk < *chunks; chunk++ {
180                 log.Infof("preparing chunk %d of %d", chunk+1, *chunks+1)
181                 tagstart := chunk * chunksize
182                 tagend := tagstart + chunksize
183                 if tagend > len(tilelib.variant) {
184                         tagend = len(tilelib.variant)
185                 }
186                 out, rows, cols := cgs2array(tilelib, names, lowqual, tagstart, tagend)
187
188                 var npw *gonpy.NpyWriter
189                 var output io.WriteCloser
190                 fnm := *outputFilename
191                 if fnm == "-" {
192                         output = nopCloser{stdout}
193                 } else {
194                         if *chunks > 1 {
195                                 if strings.HasSuffix(fnm, ".npy") {
196                                         fnm = fmt.Sprintf("%s.%d.npy", fnm[:len(fnm)-4], chunk)
197                                 } else {
198                                         fnm = fmt.Sprintf("%s.%d", fnm, chunk)
199                                 }
200                         }
201                         output, err = os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0777)
202                         if err != nil {
203                                 return 1
204                         }
205                         defer output.Close()
206                 }
207                 bufw := bufio.NewWriter(output)
208                 npw, err = gonpy.NewWriter(nopCloser{bufw})
209                 if err != nil {
210                         return 1
211                 }
212                 if *onehot {
213                         log.Info("recoding to onehot")
214                         recoded, librefs, recodedcols := recodeOnehot(out, cols)
215                         out, cols = recoded, recodedcols
216                         if *librefsFilename != "" {
217                                 log.Infof("writing onehot column mapping")
218                                 err = cmd.writeLibRefs(*librefsFilename, tilelib, librefs)
219                                 if err != nil {
220                                         return 1
221                                 }
222                         }
223                 }
224                 log.WithFields(logrus.Fields{
225                         "filename": fnm,
226                         "rows":     rows,
227                         "cols":     cols,
228                 }).Info("writing numpy")
229                 npw.Shape = []int{rows, cols}
230                 npw.WriteInt16(out)
231                 err = bufw.Flush()
232                 if err != nil {
233                         return 1
234                 }
235                 err = output.Close()
236                 if err != nil {
237                         return 1
238                 }
239         }
240         return 0
241 }
242
243 func (*exportNumpy) writeLibRefs(fnm string, tilelib *tileLibrary, librefs []tileLibRef) error {
244         f, err := os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0666)
245         if err != nil {
246                 return err
247         }
248         defer f.Close()
249         for i, libref := range librefs {
250                 _, err = fmt.Fprintf(f, "%d,%d,%d\n", i, libref.Tag, libref.Variant)
251                 if err != nil {
252                         return err
253                 }
254         }
255         return f.Close()
256 }
257
258 func cgnames(tilelib *tileLibrary) (cgnames []string) {
259         for name := range tilelib.compactGenomes {
260                 cgnames = append(cgnames, name)
261         }
262         sort.Slice(cgnames, func(i, j int) bool {
263                 return trimFilenameForLabel(cgnames[i]) < trimFilenameForLabel(cgnames[j])
264         })
265         return
266 }
267
268 func lowqual(tilelib *tileLibrary) (lowqual []map[tileVariantID]bool) {
269         lowqual = make([]map[tileVariantID]bool, len(tilelib.variant))
270         for tag, variants := range tilelib.variant {
271                 lq := lowqual[tag]
272                 for varidx, hash := range variants {
273                         if len(tilelib.seq[hash]) == 0 {
274                                 if lq == nil {
275                                         lq = map[tileVariantID]bool{}
276                                         lowqual[tag] = lq
277                                 }
278                                 lq[tileVariantID(varidx+1)] = true
279                         }
280                 }
281         }
282         return
283 }
284
285 func cgs2array(tilelib *tileLibrary, names []string, lowqual []map[tileVariantID]bool, tagstart, tagend int) (data []int16, rows, cols int) {
286         rows = len(tilelib.compactGenomes)
287         cols = (tagend - tagstart) * 2
288         data = make([]int16, rows*cols)
289         for row, name := range names {
290                 cg := tilelib.compactGenomes[name]
291                 if tagstart*2 >= len(cg) {
292                         continue
293                 }
294                 cg = cg[tagstart*2:]
295                 if cols < len(cg) {
296                         cg = cg[:cols]
297                 }
298                 for i, v := range cg {
299                         if v > 0 && lowqual[tagstart+i/2][v] {
300                                 data[row*cols+i] = -1
301                         } else {
302                                 data[row*cols+i] = int16(v)
303                         }
304                 }
305         }
306         return
307 }
308
309 func recodeOnehot(in []int16, incols int) (out []int16, librefs []tileLibRef, outcols int) {
310         rows := len(in) / incols
311         maxvalue := make([]int16, incols)
312         for row := 0; row < rows; row++ {
313                 for col := 0; col < incols; col++ {
314                         if v := in[row*incols+col]; maxvalue[col] < v {
315                                 maxvalue[col] = v
316                         }
317                 }
318         }
319         outcol := make([]int, incols)
320         dropped := 0
321         for incol, maxv := range maxvalue {
322                 outcol[incol] = outcols
323                 if maxv == 0 {
324                         dropped++
325                 }
326                 for v := 1; v <= int(maxv); v++ {
327                         librefs = append(librefs, tileLibRef{Tag: tagID(incol), Variant: tileVariantID(v)})
328                         outcols++
329                 }
330         }
331         log.Printf("recodeOnehot: dropped %d input cols with zero maxvalue", dropped)
332
333         out = make([]int16, rows*outcols)
334         for inidx, row := 0, 0; row < rows; row++ {
335                 outrow := out[row*outcols:]
336                 for col := 0; col < incols; col++ {
337                         if v := in[inidx]; v > 0 {
338                                 outrow[outcol[col]+int(v)-1] = 1
339                         }
340                         inidx++
341                 }
342         }
343         return
344 }
345
346 type nopCloser struct {
347         io.Writer
348 }
349
350 func (nopCloser) Close() error { return nil }
351
352 func trimFilenameForLabel(s string) string {
353         if i := strings.LastIndex(s, "/"); i >= 0 {
354                 s = s[i+1:]
355         }
356         s = strings.TrimSuffix(s, ".gz")
357         s = strings.TrimSuffix(s, ".fa")
358         s = strings.TrimSuffix(s, ".fasta")
359         s = strings.TrimSuffix(s, ".1")
360         s = strings.TrimSuffix(s, ".2")
361         s = strings.TrimSuffix(s, ".gz")
362         s = strings.TrimSuffix(s, ".vcf")
363         return s
364 }