208d7a63231f97e7adfad49c32528a1574b221ff
[lightning.git] / exportnumpy.go
1 package main
2
3 import (
4         "bufio"
5         "context"
6         "errors"
7         "flag"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "net/http"
12         _ "net/http/pprof"
13         "os"
14         "sort"
15
16         "git.arvados.org/arvados.git/sdk/go/arvados"
17         "github.com/kshedden/gonpy"
18         log "github.com/sirupsen/logrus"
19 )
20
21 type exportNumpy struct {
22         filter filter
23 }
24
25 func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
26         var err error
27         defer func() {
28                 if err != nil {
29                         fmt.Fprintf(stderr, "%s\n", err)
30                 }
31         }()
32         flags := flag.NewFlagSet("", flag.ContinueOnError)
33         flags.SetOutput(stderr)
34         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
35         runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
36         projectUUID := flags.String("project", "", "project `UUID` for output data")
37         priority := flags.Int("priority", 500, "container request priority")
38         inputFilename := flags.String("i", "-", "input `file`")
39         outputFilename := flags.String("o", "-", "output `file`")
40         annotationsFilename := flags.String("output-annotations", "", "output `file` for tile variant annotations tsv")
41         librefsFilename := flags.String("output-onehot2tilevar", "", "when using -one-hot, create tsv `file` mapping column# to tag# and variant#")
42         onehot := flags.Bool("one-hot", false, "recode tile variants as one-hot")
43         cmd.filter.Flags(flags)
44         err = flags.Parse(args)
45         if err == flag.ErrHelp {
46                 err = nil
47                 return 0
48         } else if err != nil {
49                 return 2
50         }
51
52         if *pprof != "" {
53                 go func() {
54                         log.Println(http.ListenAndServe(*pprof, nil))
55                 }()
56         }
57
58         if !*runlocal {
59                 if *outputFilename != "-" {
60                         err = errors.New("cannot specify output file in container mode: not implemented")
61                         return 1
62                 }
63                 runner := arvadosContainerRunner{
64                         Name:        "lightning export-numpy",
65                         Client:      arvados.NewClientFromEnv(),
66                         ProjectUUID: *projectUUID,
67                         RAM:         128000000000,
68                         VCPUs:       32,
69                         Priority:    *priority,
70                 }
71                 err = runner.TranslatePaths(inputFilename)
72                 if err != nil {
73                         return 1
74                 }
75                 runner.Args = []string{"export-numpy", "-local=true",
76                         fmt.Sprintf("-one-hot=%v", *onehot),
77                         "-i", *inputFilename,
78                         "-o", "/mnt/output/matrix.npy",
79                         "-output-annotations", "/mnt/output/annotations.tsv",
80                         "-output-onehot2tilevar", "/mnt/output/onehot2tilevar.tsv",
81                         "-max-variants", fmt.Sprintf("%d", cmd.filter.MaxVariants),
82                         "-min-coverage", fmt.Sprintf("%f", cmd.filter.MinCoverage),
83                         "-max-tag", fmt.Sprintf("%d", cmd.filter.MaxTag),
84                 }
85                 var output string
86                 output, err = runner.Run()
87                 if err != nil {
88                         return 1
89                 }
90                 fmt.Fprintln(stdout, output+"/matrix.npy")
91                 return 0
92         }
93
94         var input io.ReadCloser
95         if *inputFilename == "-" {
96                 input = ioutil.NopCloser(stdin)
97         } else {
98                 input, err = os.Open(*inputFilename)
99                 if err != nil {
100                         return 1
101                 }
102                 defer input.Close()
103         }
104         tilelib := &tileLibrary{
105                 includeNoCalls:      true,
106                 retainTileSequences: true,
107                 compactGenomes:      map[string][]tileVariantID{},
108         }
109         err = tilelib.LoadGob(context.Background(), input, nil)
110         if err != nil {
111                 return 1
112         }
113         err = input.Close()
114         if err != nil {
115                 return 1
116         }
117
118         log.Info("filtering")
119         cmd.filter.Apply(tilelib)
120         log.Info("tidying")
121         tilelib.Tidy()
122
123         if *annotationsFilename != "" {
124                 log.Infof("writing annotations")
125                 var annow io.WriteCloser
126                 annow, err = os.OpenFile(*annotationsFilename, os.O_CREATE|os.O_WRONLY, 0666)
127                 if err != nil {
128                         return 1
129                 }
130                 defer annow.Close()
131                 err = (&annotatecmd{maxTileSize: 5000}).exportTileDiffs(annow, tilelib)
132                 if err != nil {
133                         return 1
134                 }
135                 err = annow.Close()
136                 if err != nil {
137                         return 1
138                 }
139         }
140
141         log.Info("building numpy array")
142         out, rows, cols := cgs2array(tilelib.compactGenomes)
143         var output io.WriteCloser
144         if *outputFilename == "-" {
145                 output = nopCloser{stdout}
146         } else {
147                 output, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
148                 if err != nil {
149                         return 1
150                 }
151                 defer output.Close()
152         }
153         bufw := bufio.NewWriter(output)
154         npw, err := gonpy.NewWriter(nopCloser{bufw})
155         if err != nil {
156                 return 1
157         }
158         if *onehot {
159                 log.Info("recoding to onehot")
160                 recoded, librefs, recodedcols := recodeOnehot(out, cols)
161                 out, cols = recoded, recodedcols
162                 if *librefsFilename != "" {
163                         log.Infof("writing onehot column mapping")
164                         err = cmd.writeLibRefs(*librefsFilename, tilelib, librefs)
165                         if err != nil {
166                                 return 1
167                         }
168                 }
169         }
170         log.Info("writing numpy")
171         npw.Shape = []int{rows, cols}
172         npw.WriteUint16(out)
173         err = bufw.Flush()
174         if err != nil {
175                 return 1
176         }
177         err = output.Close()
178         if err != nil {
179                 return 1
180         }
181         return 0
182 }
183
184 func (*exportNumpy) writeLibRefs(fnm string, tilelib *tileLibrary, librefs []tileLibRef) error {
185         f, err := os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0666)
186         if err != nil {
187                 return err
188         }
189         defer f.Close()
190         for i, libref := range librefs {
191                 _, err = fmt.Fprintf(f, "%d\t%d\t%d\n", i, libref.Tag, libref.Variant)
192                 if err != nil {
193                         return err
194                 }
195         }
196         return f.Close()
197 }
198
199 func cgs2array(cgs map[string][]tileVariantID) (data []uint16, rows, cols int) {
200         var cgnames []string
201         for name := range cgs {
202                 cgnames = append(cgnames, name)
203         }
204         sort.Strings(cgnames)
205
206         rows = len(cgs)
207         for _, cg := range cgs {
208                 if cols < len(cg) {
209                         cols = len(cg)
210                 }
211         }
212         data = make([]uint16, rows*cols)
213         for row, name := range cgnames {
214                 for i, v := range cgs[name] {
215                         data[row*cols+i] = uint16(v)
216                 }
217         }
218         return
219 }
220
221 func recodeOnehot(in []uint16, incols int) (out []uint16, librefs []tileLibRef, outcols int) {
222         rows := len(in) / incols
223         maxvalue := make([]uint16, incols)
224         for row := 0; row < rows; row++ {
225                 for col := 0; col < incols; col++ {
226                         if v := in[row*incols+col]; maxvalue[col] < v {
227                                 maxvalue[col] = v
228                         }
229                 }
230         }
231         outcol := make([]int, incols)
232         dropped := 0
233         for incol, maxv := range maxvalue {
234                 outcol[incol] = outcols
235                 if maxv == 0 {
236                         dropped++
237                 }
238                 for v := 1; v <= int(maxv); v++ {
239                         librefs = append(librefs, tileLibRef{Tag: tagID(incol), Variant: tileVariantID(v)})
240                         outcols++
241                 }
242         }
243         log.Printf("recodeOnehot: dropped %d input cols with zero maxvalue", dropped)
244
245         out = make([]uint16, rows*outcols)
246         for inidx, row := 0, 0; row < rows; row++ {
247                 outrow := out[row*outcols:]
248                 for col := 0; col < incols; col++ {
249                         if v := in[inidx]; v > 0 {
250                                 outrow[outcol[col]+int(v)-1] = 1
251                         }
252                         inidx++
253                 }
254         }
255         return
256 }
257
258 type nopCloser struct {
259         io.Writer
260 }
261
262 func (nopCloser) Close() error { return nil }