Let caller set container output name.
[lightning.git] / exportnumpy.go
1 package main
2
3 import (
4         "bufio"
5         "context"
6         "errors"
7         "flag"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "net/http"
12         _ "net/http/pprof"
13         "os"
14         "sort"
15         "strings"
16
17         "git.arvados.org/arvados.git/sdk/go/arvados"
18         "github.com/kshedden/gonpy"
19         log "github.com/sirupsen/logrus"
20 )
21
22 type exportNumpy struct {
23         filter filter
24 }
25
26 func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
27         var err error
28         defer func() {
29                 if err != nil {
30                         fmt.Fprintf(stderr, "%s\n", err)
31                 }
32         }()
33         flags := flag.NewFlagSet("", flag.ContinueOnError)
34         flags.SetOutput(stderr)
35         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
36         runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
37         projectUUID := flags.String("project", "", "project `UUID` for output data")
38         priority := flags.Int("priority", 500, "container request priority")
39         inputFilename := flags.String("i", "-", "input `file`")
40         outputFilename := flags.String("o", "-", "output `file`")
41         annotationsFilename := flags.String("output-annotations", "", "output `file` for tile variant annotations tsv")
42         librefsFilename := flags.String("output-onehot2tilevar", "", "when using -one-hot, create tsv `file` mapping column# to tag# and variant#")
43         onehot := flags.Bool("one-hot", false, "recode tile variants as one-hot")
44         cmd.filter.Flags(flags)
45         err = flags.Parse(args)
46         if err == flag.ErrHelp {
47                 err = nil
48                 return 0
49         } else if err != nil {
50                 return 2
51         }
52
53         if *pprof != "" {
54                 go func() {
55                         log.Println(http.ListenAndServe(*pprof, nil))
56                 }()
57         }
58
59         if !*runlocal {
60                 if *outputFilename != "-" {
61                         err = errors.New("cannot specify output file in container mode: not implemented")
62                         return 1
63                 }
64                 runner := arvadosContainerRunner{
65                         Name:        "lightning export-numpy",
66                         Client:      arvados.NewClientFromEnv(),
67                         ProjectUUID: *projectUUID,
68                         RAM:         128000000000,
69                         VCPUs:       32,
70                         Priority:    *priority,
71                 }
72                 err = runner.TranslatePaths(inputFilename)
73                 if err != nil {
74                         return 1
75                 }
76                 runner.Args = []string{"export-numpy", "-local=true",
77                         fmt.Sprintf("-one-hot=%v", *onehot),
78                         "-i", *inputFilename,
79                         "-o", "/mnt/output/matrix.npy",
80                         "-output-annotations", "/mnt/output/annotations.tsv",
81                         "-output-onehot2tilevar", "/mnt/output/onehot2tilevar.tsv",
82                         "-max-variants", fmt.Sprintf("%d", cmd.filter.MaxVariants),
83                         "-min-coverage", fmt.Sprintf("%f", cmd.filter.MinCoverage),
84                         "-max-tag", fmt.Sprintf("%d", cmd.filter.MaxTag),
85                 }
86                 var output string
87                 output, err = runner.Run()
88                 if err != nil {
89                         return 1
90                 }
91                 fmt.Fprintln(stdout, output+"/matrix.npy")
92                 return 0
93         }
94
95         var input io.ReadCloser
96         if *inputFilename == "-" {
97                 input = ioutil.NopCloser(stdin)
98         } else {
99                 input, err = os.Open(*inputFilename)
100                 if err != nil {
101                         return 1
102                 }
103                 defer input.Close()
104         }
105         tilelib := &tileLibrary{
106                 retainNoCalls:       true,
107                 retainTileSequences: true,
108                 compactGenomes:      map[string][]tileVariantID{},
109         }
110         err = tilelib.LoadGob(context.Background(), input, strings.HasSuffix(*inputFilename, ".gz"), nil)
111         if err != nil {
112                 return 1
113         }
114         err = input.Close()
115         if err != nil {
116                 return 1
117         }
118
119         log.Info("filtering")
120         cmd.filter.Apply(tilelib)
121         log.Info("tidying")
122         tilelib.Tidy()
123
124         if *annotationsFilename != "" {
125                 log.Infof("writing annotations")
126                 var annow io.WriteCloser
127                 annow, err = os.OpenFile(*annotationsFilename, os.O_CREATE|os.O_WRONLY, 0666)
128                 if err != nil {
129                         return 1
130                 }
131                 defer annow.Close()
132                 err = (&annotatecmd{maxTileSize: 5000}).exportTileDiffs(annow, tilelib)
133                 if err != nil {
134                         return 1
135                 }
136                 err = annow.Close()
137                 if err != nil {
138                         return 1
139                 }
140         }
141
142         log.Info("building numpy array")
143         out, rows, cols := cgs2array(tilelib)
144
145         log.Info("writing numpy file")
146         var output io.WriteCloser
147         if *outputFilename == "-" {
148                 output = nopCloser{stdout}
149         } else {
150                 output, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
151                 if err != nil {
152                         return 1
153                 }
154                 defer output.Close()
155         }
156         bufw := bufio.NewWriter(output)
157         npw, err := gonpy.NewWriter(nopCloser{bufw})
158         if err != nil {
159                 return 1
160         }
161         if *onehot {
162                 log.Info("recoding to onehot")
163                 recoded, librefs, recodedcols := recodeOnehot(out, cols)
164                 out, cols = recoded, recodedcols
165                 if *librefsFilename != "" {
166                         log.Infof("writing onehot column mapping")
167                         err = cmd.writeLibRefs(*librefsFilename, tilelib, librefs)
168                         if err != nil {
169                                 return 1
170                         }
171                 }
172         }
173         log.Info("writing numpy")
174         npw.Shape = []int{rows, cols}
175         npw.WriteInt16(out)
176         err = bufw.Flush()
177         if err != nil {
178                 return 1
179         }
180         err = output.Close()
181         if err != nil {
182                 return 1
183         }
184         return 0
185 }
186
187 func (*exportNumpy) writeLibRefs(fnm string, tilelib *tileLibrary, librefs []tileLibRef) error {
188         f, err := os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0666)
189         if err != nil {
190                 return err
191         }
192         defer f.Close()
193         for i, libref := range librefs {
194                 _, err = fmt.Fprintf(f, "%d\t%d\t%d\n", i, libref.Tag, libref.Variant)
195                 if err != nil {
196                         return err
197                 }
198         }
199         return f.Close()
200 }
201
202 func cgs2array(tilelib *tileLibrary) (data []int16, rows, cols int) {
203         var cgnames []string
204         for name := range tilelib.compactGenomes {
205                 cgnames = append(cgnames, name)
206         }
207         sort.Strings(cgnames)
208
209         rows = len(tilelib.compactGenomes)
210         for _, cg := range tilelib.compactGenomes {
211                 if cols < len(cg) {
212                         cols = len(cg)
213                 }
214         }
215
216         // flag low-quality tile variants so we can change to -1 below
217         lowqual := make([]map[tileVariantID]bool, cols/2)
218         for tag, variants := range tilelib.variant {
219                 lq := lowqual[tag]
220                 for varidx, hash := range variants {
221                         if len(tilelib.seq[hash]) == 0 {
222                                 if lq == nil {
223                                         lq = map[tileVariantID]bool{}
224                                         lowqual[tag] = lq
225                                 }
226                                 lq[tileVariantID(varidx+1)] = true
227                         }
228                 }
229         }
230
231         data = make([]int16, rows*cols)
232         for row, name := range cgnames {
233                 for i, v := range tilelib.compactGenomes[name] {
234                         if v > 0 && lowqual[i/2][v] {
235                                 data[row*cols+i] = -1
236                         } else {
237                                 data[row*cols+i] = int16(v)
238                         }
239                 }
240         }
241
242         return
243 }
244
245 func recodeOnehot(in []int16, incols int) (out []int16, librefs []tileLibRef, outcols int) {
246         rows := len(in) / incols
247         maxvalue := make([]int16, incols)
248         for row := 0; row < rows; row++ {
249                 for col := 0; col < incols; col++ {
250                         if v := in[row*incols+col]; maxvalue[col] < v {
251                                 maxvalue[col] = v
252                         }
253                 }
254         }
255         outcol := make([]int, incols)
256         dropped := 0
257         for incol, maxv := range maxvalue {
258                 outcol[incol] = outcols
259                 if maxv == 0 {
260                         dropped++
261                 }
262                 for v := 1; v <= int(maxv); v++ {
263                         librefs = append(librefs, tileLibRef{Tag: tagID(incol), Variant: tileVariantID(v)})
264                         outcols++
265                 }
266         }
267         log.Printf("recodeOnehot: dropped %d input cols with zero maxvalue", dropped)
268
269         out = make([]int16, rows*outcols)
270         for inidx, row := 0, 0; row < rows; row++ {
271                 outrow := out[row*outcols:]
272                 for col := 0; col < incols; col++ {
273                         if v := in[inidx]; v > 0 {
274                                 outrow[outcol[col]+int(v)-1] = 1
275                         }
276                         inidx++
277                 }
278         }
279         return
280 }
281
282 type nopCloser struct {
283         io.Writer
284 }
285
286 func (nopCloser) Close() error { return nil }