Move command line tool to subdir.
[lightning.git] / exportnumpy.go
1 package lightning
2
3 import (
4         "bufio"
5         "context"
6         "errors"
7         "flag"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "net/http"
12         _ "net/http/pprof"
13         "os"
14         "path"
15         "sort"
16         "strings"
17
18         "git.arvados.org/arvados.git/sdk/go/arvados"
19         "github.com/kshedden/gonpy"
20         "github.com/sirupsen/logrus"
21         log "github.com/sirupsen/logrus"
22 )
23
24 type exportNumpy struct {
25         filter filter
26 }
27
28 func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
29         var err error
30         defer func() {
31                 if err != nil {
32                         fmt.Fprintf(stderr, "%s\n", err)
33                 }
34         }()
35         flags := flag.NewFlagSet("", flag.ContinueOnError)
36         flags.SetOutput(stderr)
37         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
38         runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
39         projectUUID := flags.String("project", "", "project `UUID` for output data")
40         priority := flags.Int("priority", 500, "container request priority")
41         inputFilename := flags.String("i", "-", "input `file`")
42         outputFilename := flags.String("o", "-", "output `file`")
43         annotationsFilename := flags.String("output-annotations", "", "output `file` for tile variant annotations csv")
44         librefsFilename := flags.String("output-onehot2tilevar", "", "when using -one-hot, create csv `file` mapping column# to tag# and variant#")
45         labelsFilename := flags.String("output-labels", "", "output `file` for genome labels csv")
46         onehot := flags.Bool("one-hot", false, "recode tile variants as one-hot")
47         cmd.filter.Flags(flags)
48         err = flags.Parse(args)
49         if err == flag.ErrHelp {
50                 err = nil
51                 return 0
52         } else if err != nil {
53                 return 2
54         }
55
56         if *pprof != "" {
57                 go func() {
58                         log.Println(http.ListenAndServe(*pprof, nil))
59                 }()
60         }
61
62         if !*runlocal {
63                 if *outputFilename != "-" {
64                         err = errors.New("cannot specify output file in container mode: not implemented")
65                         return 1
66                 }
67                 runner := arvadosContainerRunner{
68                         Name:        "lightning export-numpy",
69                         Client:      arvados.NewClientFromEnv(),
70                         ProjectUUID: *projectUUID,
71                         RAM:         450000000000,
72                         VCPUs:       32,
73                         Priority:    *priority,
74                         KeepCache:   1,
75                         APIAccess:   true,
76                 }
77                 err = runner.TranslatePaths(inputFilename)
78                 if err != nil {
79                         return 1
80                 }
81                 runner.Args = []string{"export-numpy", "-local=true",
82                         fmt.Sprintf("-one-hot=%v", *onehot),
83                         "-i", *inputFilename,
84                         "-o", "/mnt/output/matrix.npy",
85                         "-output-annotations", "/mnt/output/annotations.csv",
86                         "-output-onehot2tilevar", "/mnt/output/onehot2tilevar.csv",
87                         "-output-labels", "/mnt/output/labels.csv",
88                         "-max-variants", fmt.Sprintf("%d", cmd.filter.MaxVariants),
89                         "-min-coverage", fmt.Sprintf("%f", cmd.filter.MinCoverage),
90                         "-max-tag", fmt.Sprintf("%d", cmd.filter.MaxTag),
91                 }
92                 var output string
93                 output, err = runner.Run()
94                 if err != nil {
95                         return 1
96                 }
97                 fmt.Fprintln(stdout, output+"/matrix.npy")
98                 return 0
99         }
100
101         var input io.ReadCloser
102         if *inputFilename == "-" {
103                 input = ioutil.NopCloser(stdin)
104         } else {
105                 input, err = open(*inputFilename)
106                 if err != nil {
107                         return 1
108                 }
109                 defer input.Close()
110         }
111         input = ioutil.NopCloser(bufio.NewReaderSize(input, 8*1024*1024))
112         tilelib := &tileLibrary{
113                 retainNoCalls:       true,
114                 retainTileSequences: true,
115                 compactGenomes:      map[string][]tileVariantID{},
116         }
117         err = tilelib.LoadGob(context.Background(), input, strings.HasSuffix(*inputFilename, ".gz"), nil)
118         if err != nil {
119                 return 1
120         }
121         err = input.Close()
122         if err != nil {
123                 return 1
124         }
125
126         log.Info("filtering")
127         cmd.filter.Apply(tilelib)
128         log.Info("tidying")
129         tilelib.Tidy()
130
131         if *annotationsFilename != "" {
132                 log.Infof("writing annotations")
133                 var annow io.WriteCloser
134                 annow, err = os.OpenFile(*annotationsFilename, os.O_CREATE|os.O_WRONLY, 0666)
135                 if err != nil {
136                         return 1
137                 }
138                 defer annow.Close()
139                 err = (&annotatecmd{maxTileSize: 5000}).exportTileDiffs(annow, tilelib)
140                 if err != nil {
141                         return 1
142                 }
143                 err = annow.Close()
144                 if err != nil {
145                         return 1
146                 }
147         }
148
149         log.Info("building numpy array")
150         out, rows, cols, names := cgs2array(tilelib)
151
152         if *labelsFilename != "" {
153                 log.Infof("writing labels to %s", *labelsFilename)
154                 var f *os.File
155                 f, err = os.OpenFile(*labelsFilename, os.O_CREATE|os.O_WRONLY, 0777)
156                 if err != nil {
157                         return 1
158                 }
159                 defer f.Close()
160                 _, outBasename := path.Split(*outputFilename)
161                 for i, name := range names {
162                         _, err = fmt.Fprintf(f, "%d,%q,%q\n", i, trimFilenameForLabel(name), outBasename)
163                         if err != nil {
164                                 err = fmt.Errorf("write %s: %w", *labelsFilename, err)
165                                 return 1
166                         }
167                 }
168                 err = f.Close()
169                 if err != nil {
170                         err = fmt.Errorf("close %s: %w", *labelsFilename, err)
171                         return 1
172                 }
173         }
174
175         log.Info("writing numpy file")
176         var output io.WriteCloser
177         if *outputFilename == "-" {
178                 output = nopCloser{stdout}
179         } else {
180                 output, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
181                 if err != nil {
182                         return 1
183                 }
184                 defer output.Close()
185         }
186         bufw := bufio.NewWriter(output)
187         npw, err := gonpy.NewWriter(nopCloser{bufw})
188         if err != nil {
189                 return 1
190         }
191         if *onehot {
192                 log.Info("recoding to onehot")
193                 recoded, librefs, recodedcols := recodeOnehot(out, cols)
194                 out, cols = recoded, recodedcols
195                 if *librefsFilename != "" {
196                         log.Infof("writing onehot column mapping")
197                         err = cmd.writeLibRefs(*librefsFilename, tilelib, librefs)
198                         if err != nil {
199                                 return 1
200                         }
201                 }
202         }
203         log.WithFields(logrus.Fields{
204                 "rows": rows,
205                 "cols": cols,
206         }).Info("writing numpy")
207         npw.Shape = []int{rows, cols}
208         npw.WriteInt16(out)
209         err = bufw.Flush()
210         if err != nil {
211                 return 1
212         }
213         err = output.Close()
214         if err != nil {
215                 return 1
216         }
217         return 0
218 }
219
220 func (*exportNumpy) writeLibRefs(fnm string, tilelib *tileLibrary, librefs []tileLibRef) error {
221         f, err := os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0666)
222         if err != nil {
223                 return err
224         }
225         defer f.Close()
226         for i, libref := range librefs {
227                 _, err = fmt.Fprintf(f, "%d,%d,%d\n", i, libref.Tag, libref.Variant)
228                 if err != nil {
229                         return err
230                 }
231         }
232         return f.Close()
233 }
234
235 func cgs2array(tilelib *tileLibrary) (data []int16, rows, cols int, cgnames []string) {
236         for name := range tilelib.compactGenomes {
237                 cgnames = append(cgnames, name)
238         }
239         sort.Slice(cgnames, func(i, j int) bool {
240                 return trimFilenameForLabel(cgnames[i]) < trimFilenameForLabel(cgnames[j])
241         })
242
243         rows = len(tilelib.compactGenomes)
244         for _, cg := range tilelib.compactGenomes {
245                 if cols < len(cg) {
246                         cols = len(cg)
247                 }
248         }
249
250         // flag low-quality tile variants so we can change to -1 below
251         lowqual := make([]map[tileVariantID]bool, cols/2)
252         for tag, variants := range tilelib.variant {
253                 lq := lowqual[tag]
254                 for varidx, hash := range variants {
255                         if len(tilelib.seq[hash]) == 0 {
256                                 if lq == nil {
257                                         lq = map[tileVariantID]bool{}
258                                         lowqual[tag] = lq
259                                 }
260                                 lq[tileVariantID(varidx+1)] = true
261                         }
262                 }
263         }
264
265         data = make([]int16, rows*cols)
266         for row, name := range cgnames {
267                 for i, v := range tilelib.compactGenomes[name] {
268                         if v > 0 && lowqual[i/2][v] {
269                                 data[row*cols+i] = -1
270                         } else {
271                                 data[row*cols+i] = int16(v)
272                         }
273                 }
274         }
275
276         return
277 }
278
279 func recodeOnehot(in []int16, incols int) (out []int16, librefs []tileLibRef, outcols int) {
280         rows := len(in) / incols
281         maxvalue := make([]int16, incols)
282         for row := 0; row < rows; row++ {
283                 for col := 0; col < incols; col++ {
284                         if v := in[row*incols+col]; maxvalue[col] < v {
285                                 maxvalue[col] = v
286                         }
287                 }
288         }
289         outcol := make([]int, incols)
290         dropped := 0
291         for incol, maxv := range maxvalue {
292                 outcol[incol] = outcols
293                 if maxv == 0 {
294                         dropped++
295                 }
296                 for v := 1; v <= int(maxv); v++ {
297                         librefs = append(librefs, tileLibRef{Tag: tagID(incol), Variant: tileVariantID(v)})
298                         outcols++
299                 }
300         }
301         log.Printf("recodeOnehot: dropped %d input cols with zero maxvalue", dropped)
302
303         out = make([]int16, rows*outcols)
304         for inidx, row := 0, 0; row < rows; row++ {
305                 outrow := out[row*outcols:]
306                 for col := 0; col < incols; col++ {
307                         if v := in[inidx]; v > 0 {
308                                 outrow[outcol[col]+int(v)-1] = 1
309                         }
310                         inidx++
311                 }
312         }
313         return
314 }
315
316 type nopCloser struct {
317         io.Writer
318 }
319
320 func (nopCloser) Close() error { return nil }
321
322 func trimFilenameForLabel(s string) string {
323         if i := strings.LastIndex(s, "/"); i >= 0 {
324                 s = s[i+1:]
325         }
326         s = strings.TrimSuffix(s, ".gz")
327         s = strings.TrimSuffix(s, ".fa")
328         s = strings.TrimSuffix(s, ".fasta")
329         s = strings.TrimSuffix(s, ".1")
330         s = strings.TrimSuffix(s, ".2")
331         s = strings.TrimSuffix(s, ".gz")
332         s = strings.TrimSuffix(s, ".vcf")
333         return s
334 }