Gzip gob files.
[lightning.git] / exportnumpy.go
1 package main
2
3 import (
4         "bufio"
5         "context"
6         "errors"
7         "flag"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "net/http"
12         _ "net/http/pprof"
13         "os"
14         "sort"
15         "strings"
16
17         "git.arvados.org/arvados.git/sdk/go/arvados"
18         "github.com/kshedden/gonpy"
19         log "github.com/sirupsen/logrus"
20 )
21
22 type exportNumpy struct {
23         filter filter
24 }
25
26 func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
27         var err error
28         defer func() {
29                 if err != nil {
30                         fmt.Fprintf(stderr, "%s\n", err)
31                 }
32         }()
33         flags := flag.NewFlagSet("", flag.ContinueOnError)
34         flags.SetOutput(stderr)
35         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
36         runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
37         projectUUID := flags.String("project", "", "project `UUID` for output data")
38         priority := flags.Int("priority", 500, "container request priority")
39         inputFilename := flags.String("i", "-", "input `file`")
40         outputFilename := flags.String("o", "-", "output `file`")
41         annotationsFilename := flags.String("output-annotations", "", "output `file` for tile variant annotations tsv")
42         librefsFilename := flags.String("output-onehot2tilevar", "", "when using -one-hot, create tsv `file` mapping column# to tag# and variant#")
43         onehot := flags.Bool("one-hot", false, "recode tile variants as one-hot")
44         cmd.filter.Flags(flags)
45         err = flags.Parse(args)
46         if err == flag.ErrHelp {
47                 err = nil
48                 return 0
49         } else if err != nil {
50                 return 2
51         }
52
53         if *pprof != "" {
54                 go func() {
55                         log.Println(http.ListenAndServe(*pprof, nil))
56                 }()
57         }
58
59         if !*runlocal {
60                 if *outputFilename != "-" {
61                         err = errors.New("cannot specify output file in container mode: not implemented")
62                         return 1
63                 }
64                 runner := arvadosContainerRunner{
65                         Name:        "lightning export-numpy",
66                         Client:      arvados.NewClientFromEnv(),
67                         ProjectUUID: *projectUUID,
68                         RAM:         128000000000,
69                         VCPUs:       32,
70                         Priority:    *priority,
71                 }
72                 err = runner.TranslatePaths(inputFilename)
73                 if err != nil {
74                         return 1
75                 }
76                 runner.Args = []string{"export-numpy", "-local=true",
77                         fmt.Sprintf("-one-hot=%v", *onehot),
78                         "-i", *inputFilename,
79                         "-o", "/mnt/output/matrix.npy",
80                         "-output-annotations", "/mnt/output/annotations.tsv",
81                         "-output-onehot2tilevar", "/mnt/output/onehot2tilevar.tsv",
82                         "-max-variants", fmt.Sprintf("%d", cmd.filter.MaxVariants),
83                         "-min-coverage", fmt.Sprintf("%f", cmd.filter.MinCoverage),
84                         "-max-tag", fmt.Sprintf("%d", cmd.filter.MaxTag),
85                 }
86                 var output string
87                 output, err = runner.Run()
88                 if err != nil {
89                         return 1
90                 }
91                 fmt.Fprintln(stdout, output+"/matrix.npy")
92                 return 0
93         }
94
95         var input io.ReadCloser
96         if *inputFilename == "-" {
97                 input = ioutil.NopCloser(stdin)
98         } else {
99                 input, err = os.Open(*inputFilename)
100                 if err != nil {
101                         return 1
102                 }
103                 defer input.Close()
104         }
105         tilelib := &tileLibrary{
106                 retainNoCalls:       true,
107                 retainTileSequences: true,
108                 compactGenomes:      map[string][]tileVariantID{},
109         }
110         err = tilelib.LoadGob(context.Background(), input, strings.HasSuffix(*inputFilename, ".gz"), nil)
111         if err != nil {
112                 return 1
113         }
114         err = input.Close()
115         if err != nil {
116                 return 1
117         }
118
119         log.Info("filtering")
120         cmd.filter.Apply(tilelib)
121         log.Info("tidying")
122         tilelib.Tidy()
123
124         if *annotationsFilename != "" {
125                 log.Infof("writing annotations")
126                 var annow io.WriteCloser
127                 annow, err = os.OpenFile(*annotationsFilename, os.O_CREATE|os.O_WRONLY, 0666)
128                 if err != nil {
129                         return 1
130                 }
131                 defer annow.Close()
132                 err = (&annotatecmd{maxTileSize: 5000}).exportTileDiffs(annow, tilelib)
133                 if err != nil {
134                         return 1
135                 }
136                 err = annow.Close()
137                 if err != nil {
138                         return 1
139                 }
140         }
141
142         log.Info("building numpy array")
143         out, rows, cols := cgs2array(tilelib.compactGenomes)
144         var output io.WriteCloser
145         if *outputFilename == "-" {
146                 output = nopCloser{stdout}
147         } else {
148                 output, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
149                 if err != nil {
150                         return 1
151                 }
152                 defer output.Close()
153         }
154         bufw := bufio.NewWriter(output)
155         npw, err := gonpy.NewWriter(nopCloser{bufw})
156         if err != nil {
157                 return 1
158         }
159         if *onehot {
160                 log.Info("recoding to onehot")
161                 recoded, librefs, recodedcols := recodeOnehot(out, cols)
162                 out, cols = recoded, recodedcols
163                 if *librefsFilename != "" {
164                         log.Infof("writing onehot column mapping")
165                         err = cmd.writeLibRefs(*librefsFilename, tilelib, librefs)
166                         if err != nil {
167                                 return 1
168                         }
169                 }
170         }
171         log.Info("writing numpy")
172         npw.Shape = []int{rows, cols}
173         npw.WriteUint16(out)
174         err = bufw.Flush()
175         if err != nil {
176                 return 1
177         }
178         err = output.Close()
179         if err != nil {
180                 return 1
181         }
182         return 0
183 }
184
185 func (*exportNumpy) writeLibRefs(fnm string, tilelib *tileLibrary, librefs []tileLibRef) error {
186         f, err := os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0666)
187         if err != nil {
188                 return err
189         }
190         defer f.Close()
191         for i, libref := range librefs {
192                 _, err = fmt.Fprintf(f, "%d\t%d\t%d\n", i, libref.Tag, libref.Variant)
193                 if err != nil {
194                         return err
195                 }
196         }
197         return f.Close()
198 }
199
200 func cgs2array(cgs map[string][]tileVariantID) (data []uint16, rows, cols int) {
201         var cgnames []string
202         for name := range cgs {
203                 cgnames = append(cgnames, name)
204         }
205         sort.Strings(cgnames)
206
207         rows = len(cgs)
208         for _, cg := range cgs {
209                 if cols < len(cg) {
210                         cols = len(cg)
211                 }
212         }
213         data = make([]uint16, rows*cols)
214         for row, name := range cgnames {
215                 for i, v := range cgs[name] {
216                         data[row*cols+i] = uint16(v)
217                 }
218         }
219         return
220 }
221
222 func recodeOnehot(in []uint16, incols int) (out []uint16, librefs []tileLibRef, outcols int) {
223         rows := len(in) / incols
224         maxvalue := make([]uint16, incols)
225         for row := 0; row < rows; row++ {
226                 for col := 0; col < incols; col++ {
227                         if v := in[row*incols+col]; maxvalue[col] < v {
228                                 maxvalue[col] = v
229                         }
230                 }
231         }
232         outcol := make([]int, incols)
233         dropped := 0
234         for incol, maxv := range maxvalue {
235                 outcol[incol] = outcols
236                 if maxv == 0 {
237                         dropped++
238                 }
239                 for v := 1; v <= int(maxv); v++ {
240                         librefs = append(librefs, tileLibRef{Tag: tagID(incol), Variant: tileVariantID(v)})
241                         outcols++
242                 }
243         }
244         log.Printf("recodeOnehot: dropped %d input cols with zero maxvalue", dropped)
245
246         out = make([]uint16, rows*outcols)
247         for inidx, row := 0, 0; row < rows; row++ {
248                 outrow := out[row*outcols:]
249                 for col := 0; col < incols; col++ {
250                         if v := in[inidx]; v > 0 {
251                                 outrow[outcol[col]+int(v)-1] = 1
252                         }
253                         inidx++
254                 }
255         }
256         return
257 }
258
259 type nopCloser struct {
260         io.Writer
261 }
262
263 func (nopCloser) Close() error { return nil }