Sort rows by label shown in csv, not full file path.
[lightning.git] / exportnumpy.go
1 package main
2
3 import (
4         "bufio"
5         "context"
6         "errors"
7         "flag"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "net/http"
12         _ "net/http/pprof"
13         "os"
14         "path"
15         "sort"
16         "strings"
17
18         "git.arvados.org/arvados.git/sdk/go/arvados"
19         "github.com/kshedden/gonpy"
20         log "github.com/sirupsen/logrus"
21 )
22
23 type exportNumpy struct {
24         filter filter
25 }
26
27 func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
28         var err error
29         defer func() {
30                 if err != nil {
31                         fmt.Fprintf(stderr, "%s\n", err)
32                 }
33         }()
34         flags := flag.NewFlagSet("", flag.ContinueOnError)
35         flags.SetOutput(stderr)
36         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
37         runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
38         projectUUID := flags.String("project", "", "project `UUID` for output data")
39         priority := flags.Int("priority", 500, "container request priority")
40         inputFilename := flags.String("i", "-", "input `file`")
41         outputFilename := flags.String("o", "-", "output `file`")
42         annotationsFilename := flags.String("output-annotations", "", "output `file` for tile variant annotations csv")
43         librefsFilename := flags.String("output-onehot2tilevar", "", "when using -one-hot, create csv `file` mapping column# to tag# and variant#")
44         labelsFilename := flags.String("output-labels", "", "output `file` for genome labels csv")
45         onehot := flags.Bool("one-hot", false, "recode tile variants as one-hot")
46         cmd.filter.Flags(flags)
47         err = flags.Parse(args)
48         if err == flag.ErrHelp {
49                 err = nil
50                 return 0
51         } else if err != nil {
52                 return 2
53         }
54
55         if *pprof != "" {
56                 go func() {
57                         log.Println(http.ListenAndServe(*pprof, nil))
58                 }()
59         }
60
61         if !*runlocal {
62                 if *outputFilename != "-" {
63                         err = errors.New("cannot specify output file in container mode: not implemented")
64                         return 1
65                 }
66                 runner := arvadosContainerRunner{
67                         Name:        "lightning export-numpy",
68                         Client:      arvados.NewClientFromEnv(),
69                         ProjectUUID: *projectUUID,
70                         RAM:         240000000000,
71                         VCPUs:       16,
72                         Priority:    *priority,
73                         KeepCache:   1,
74                         APIAccess:   true,
75                 }
76                 err = runner.TranslatePaths(inputFilename)
77                 if err != nil {
78                         return 1
79                 }
80                 runner.Args = []string{"export-numpy", "-local=true",
81                         fmt.Sprintf("-one-hot=%v", *onehot),
82                         "-i", *inputFilename,
83                         "-o", "/mnt/output/matrix.npy",
84                         "-output-annotations", "/mnt/output/annotations.csv",
85                         "-output-onehot2tilevar", "/mnt/output/onehot2tilevar.csv",
86                         "-output-labels", "/mnt/output/labels.csv",
87                         "-max-variants", fmt.Sprintf("%d", cmd.filter.MaxVariants),
88                         "-min-coverage", fmt.Sprintf("%f", cmd.filter.MinCoverage),
89                         "-max-tag", fmt.Sprintf("%d", cmd.filter.MaxTag),
90                 }
91                 var output string
92                 output, err = runner.Run()
93                 if err != nil {
94                         return 1
95                 }
96                 fmt.Fprintln(stdout, output+"/matrix.npy")
97                 return 0
98         }
99
100         var input io.ReadCloser
101         if *inputFilename == "-" {
102                 input = ioutil.NopCloser(stdin)
103         } else {
104                 input, err = open(*inputFilename)
105                 if err != nil {
106                         return 1
107                 }
108                 defer input.Close()
109         }
110         input = ioutil.NopCloser(bufio.NewReaderSize(input, 8*1024*1024))
111         tilelib := &tileLibrary{
112                 retainNoCalls:       true,
113                 retainTileSequences: true,
114                 compactGenomes:      map[string][]tileVariantID{},
115         }
116         err = tilelib.LoadGob(context.Background(), input, strings.HasSuffix(*inputFilename, ".gz"), nil)
117         if err != nil {
118                 return 1
119         }
120         err = input.Close()
121         if err != nil {
122                 return 1
123         }
124
125         log.Info("filtering")
126         cmd.filter.Apply(tilelib)
127         log.Info("tidying")
128         tilelib.Tidy()
129
130         if *annotationsFilename != "" {
131                 log.Infof("writing annotations")
132                 var annow io.WriteCloser
133                 annow, err = os.OpenFile(*annotationsFilename, os.O_CREATE|os.O_WRONLY, 0666)
134                 if err != nil {
135                         return 1
136                 }
137                 defer annow.Close()
138                 err = (&annotatecmd{maxTileSize: 5000}).exportTileDiffs(annow, tilelib)
139                 if err != nil {
140                         return 1
141                 }
142                 err = annow.Close()
143                 if err != nil {
144                         return 1
145                 }
146         }
147
148         log.Info("building numpy array")
149         out, rows, cols, names := cgs2array(tilelib)
150
151         if *labelsFilename != "" {
152                 log.Infof("writing labels to %s", *labelsFilename)
153                 var f *os.File
154                 f, err = os.OpenFile(*labelsFilename, os.O_CREATE|os.O_WRONLY, 0777)
155                 if err != nil {
156                         return 1
157                 }
158                 defer f.Close()
159                 _, outBasename := path.Split(*outputFilename)
160                 for i, name := range names {
161                         _, err = fmt.Fprintf(f, "%d,%q,%q\n", i, trimFilenameForLabel(name), outBasename)
162                         if err != nil {
163                                 err = fmt.Errorf("write %s: %w", *labelsFilename, err)
164                                 return 1
165                         }
166                 }
167                 err = f.Close()
168                 if err != nil {
169                         err = fmt.Errorf("close %s: %w", *labelsFilename, err)
170                         return 1
171                 }
172         }
173
174         log.Info("writing numpy file")
175         var output io.WriteCloser
176         if *outputFilename == "-" {
177                 output = nopCloser{stdout}
178         } else {
179                 output, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
180                 if err != nil {
181                         return 1
182                 }
183                 defer output.Close()
184         }
185         bufw := bufio.NewWriter(output)
186         npw, err := gonpy.NewWriter(nopCloser{bufw})
187         if err != nil {
188                 return 1
189         }
190         if *onehot {
191                 log.Info("recoding to onehot")
192                 recoded, librefs, recodedcols := recodeOnehot(out, cols)
193                 out, cols = recoded, recodedcols
194                 if *librefsFilename != "" {
195                         log.Infof("writing onehot column mapping")
196                         err = cmd.writeLibRefs(*librefsFilename, tilelib, librefs)
197                         if err != nil {
198                                 return 1
199                         }
200                 }
201         }
202         log.Info("writing numpy")
203         npw.Shape = []int{rows, cols}
204         npw.WriteInt16(out)
205         err = bufw.Flush()
206         if err != nil {
207                 return 1
208         }
209         err = output.Close()
210         if err != nil {
211                 return 1
212         }
213         return 0
214 }
215
216 func (*exportNumpy) writeLibRefs(fnm string, tilelib *tileLibrary, librefs []tileLibRef) error {
217         f, err := os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0666)
218         if err != nil {
219                 return err
220         }
221         defer f.Close()
222         for i, libref := range librefs {
223                 _, err = fmt.Fprintf(f, "%d,%d,%d\n", i, libref.Tag, libref.Variant)
224                 if err != nil {
225                         return err
226                 }
227         }
228         return f.Close()
229 }
230
231 func cgs2array(tilelib *tileLibrary) (data []int16, rows, cols int, cgnames []string) {
232         for name := range tilelib.compactGenomes {
233                 cgnames = append(cgnames, name)
234         }
235         sort.Slice(cgnames, func(i, j int) bool {
236                 return trimFilenameForLabel(cgnames[i]) < trimFilenameForLabel(cgnames[j])
237         })
238
239         rows = len(tilelib.compactGenomes)
240         for _, cg := range tilelib.compactGenomes {
241                 if cols < len(cg) {
242                         cols = len(cg)
243                 }
244         }
245
246         // flag low-quality tile variants so we can change to -1 below
247         lowqual := make([]map[tileVariantID]bool, cols/2)
248         for tag, variants := range tilelib.variant {
249                 lq := lowqual[tag]
250                 for varidx, hash := range variants {
251                         if len(tilelib.seq[hash]) == 0 {
252                                 if lq == nil {
253                                         lq = map[tileVariantID]bool{}
254                                         lowqual[tag] = lq
255                                 }
256                                 lq[tileVariantID(varidx+1)] = true
257                         }
258                 }
259         }
260
261         data = make([]int16, rows*cols)
262         for row, name := range cgnames {
263                 for i, v := range tilelib.compactGenomes[name] {
264                         if v > 0 && lowqual[i/2][v] {
265                                 data[row*cols+i] = -1
266                         } else {
267                                 data[row*cols+i] = int16(v)
268                         }
269                 }
270         }
271
272         return
273 }
274
275 func recodeOnehot(in []int16, incols int) (out []int16, librefs []tileLibRef, outcols int) {
276         rows := len(in) / incols
277         maxvalue := make([]int16, incols)
278         for row := 0; row < rows; row++ {
279                 for col := 0; col < incols; col++ {
280                         if v := in[row*incols+col]; maxvalue[col] < v {
281                                 maxvalue[col] = v
282                         }
283                 }
284         }
285         outcol := make([]int, incols)
286         dropped := 0
287         for incol, maxv := range maxvalue {
288                 outcol[incol] = outcols
289                 if maxv == 0 {
290                         dropped++
291                 }
292                 for v := 1; v <= int(maxv); v++ {
293                         librefs = append(librefs, tileLibRef{Tag: tagID(incol), Variant: tileVariantID(v)})
294                         outcols++
295                 }
296         }
297         log.Printf("recodeOnehot: dropped %d input cols with zero maxvalue", dropped)
298
299         out = make([]int16, rows*outcols)
300         for inidx, row := 0, 0; row < rows; row++ {
301                 outrow := out[row*outcols:]
302                 for col := 0; col < incols; col++ {
303                         if v := in[inidx]; v > 0 {
304                                 outrow[outcol[col]+int(v)-1] = 1
305                         }
306                         inidx++
307                 }
308         }
309         return
310 }
311
312 type nopCloser struct {
313         io.Writer
314 }
315
316 func (nopCloser) Close() error { return nil }
317
318 func trimFilenameForLabel(s string) string {
319         if i := strings.LastIndex(s, "/"); i >= 0 {
320                 s = s[i+1:]
321         }
322         s = strings.TrimSuffix(s, ".gz")
323         s = strings.TrimSuffix(s, ".fa")
324         s = strings.TrimSuffix(s, ".fasta")
325         s = strings.TrimSuffix(s, ".1")
326         s = strings.TrimSuffix(s, ".2")
327         s = strings.TrimSuffix(s, ".gz")
328         s = strings.TrimSuffix(s, ".vcf")
329         return s
330 }