Merge branch '19527-training-set'
[lightning.git] / filter.go
1 // Copyright (C) The Lightning Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package lightning
6
7 import (
8         "bufio"
9         "encoding/gob"
10         "errors"
11         "flag"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "net/http"
16         _ "net/http/pprof"
17         "os"
18         "regexp"
19         "strings"
20
21         "git.arvados.org/arvados.git/sdk/go/arvados"
22         log "github.com/sirupsen/logrus"
23 )
24
25 type filter struct {
26         MaxVariants int
27         MinCoverage float64
28         MaxTag      int
29         MatchGenome string
30 }
31
32 func (f *filter) Flags(flags *flag.FlagSet) {
33         flags.IntVar(&f.MaxVariants, "max-variants", -1, "drop tiles with more than `N` variants")
34         flags.Float64Var(&f.MinCoverage, "min-coverage", 0, "drop tiles with coverage less than `P` across all haplotypes (0 < P ≤ 1)")
35         flags.IntVar(&f.MaxTag, "max-tag", -1, "drop tiles with tag ID > `N`")
36         flags.StringVar(&f.MatchGenome, "match-genome", "", "keep genomes whose names contain `regexp`, drop the rest")
37 }
38
39 func (f *filter) Args() []string {
40         return []string{
41                 fmt.Sprintf("-max-variants=%d", f.MaxVariants),
42                 fmt.Sprintf("-min-coverage=%f", f.MinCoverage),
43                 fmt.Sprintf("-max-tag=%d", f.MaxTag),
44                 fmt.Sprintf("-match-genome=%s", f.MatchGenome),
45         }
46 }
47
48 func (f *filter) Apply(tilelib *tileLibrary) {
49         // Zero out variants at tile positions that have more than
50         // f.MaxVariants tile variants.
51         if f.MaxVariants >= 0 {
52                 for tag, variants := range tilelib.variant {
53                         if f.MaxTag >= 0 && tag >= f.MaxTag {
54                                 break
55                         }
56                         if len(variants) <= f.MaxVariants {
57                                 continue
58                         }
59                         for _, cg := range tilelib.compactGenomes {
60                                 if len(cg) > tag*2 {
61                                         cg[tag*2] = 0
62                                         cg[tag*2+1] = 0
63                                 }
64                         }
65                 }
66         }
67
68         // Zero out variants at tile positions that have less than
69         // f.MinCoverage.
70         mincov := int(2*f.MinCoverage*float64(len(tilelib.compactGenomes)) + 1)
71 TAG:
72         for tag := 0; tag < len(tilelib.variant) && (tag < f.MaxTag || f.MaxTag < 0); tag++ {
73                 tagcov := 0
74                 for _, cg := range tilelib.compactGenomes {
75                         if len(cg) < tag*2+2 {
76                                 continue
77                         }
78                         if cg[tag*2] > 0 {
79                                 tagcov++
80                         }
81                         if cg[tag*2+1] > 0 {
82                                 tagcov++
83                         }
84                         if tagcov >= mincov {
85                                 continue TAG
86                         }
87                 }
88                 for _, cg := range tilelib.compactGenomes {
89                         if len(cg) > tag*2 {
90                                 cg[tag*2] = 0
91                                 cg[tag*2+1] = 0
92                         }
93                 }
94         }
95
96         // Truncate genomes and tile data to f.MaxTag (TODO: truncate
97         // refseqs too)
98         if f.MaxTag >= 0 {
99                 if len(tilelib.variant) > f.MaxTag {
100                         tilelib.variant = tilelib.variant[:f.MaxTag]
101                 }
102                 for name, cg := range tilelib.compactGenomes {
103                         if len(cg) > 2*f.MaxTag {
104                                 tilelib.compactGenomes[name] = cg[:2*f.MaxTag]
105                         }
106                 }
107         }
108
109         re, err := regexp.Compile(f.MatchGenome)
110         if err != nil {
111                 log.Errorf("invalid regexp %q does not match anything, dropping all genomes", f.MatchGenome)
112         }
113         for name := range tilelib.compactGenomes {
114                 if !re.MatchString(name) {
115                         delete(tilelib.compactGenomes, name)
116                 }
117         }
118 }
119
120 type filtercmd struct {
121         output io.Writer
122         filter
123 }
124
125 func (cmd *filtercmd) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
126         var err error
127         defer func() {
128                 if err != nil {
129                         fmt.Fprintf(stderr, "%s\n", err)
130                 }
131         }()
132         flags := flag.NewFlagSet("", flag.ContinueOnError)
133         flags.SetOutput(stderr)
134         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
135         runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
136         projectUUID := flags.String("project", "", "project `UUID` for output data")
137         priority := flags.Int("priority", 500, "container request priority")
138         inputFilename := flags.String("i", "-", "input `file`")
139         outputFilename := flags.String("o", "-", "output `file`")
140         cmd.filter.Flags(flags)
141         err = flags.Parse(args)
142         if err == flag.ErrHelp {
143                 err = nil
144                 return 0
145         } else if err != nil {
146                 return 2
147         }
148         cmd.output = stdout
149
150         if *pprof != "" {
151                 go func() {
152                         log.Println(http.ListenAndServe(*pprof, nil))
153                 }()
154         }
155
156         if !*runlocal {
157                 if *outputFilename != "-" {
158                         err = errors.New("cannot specify output file in container mode: not implemented")
159                         return 1
160                 }
161                 runner := arvadosContainerRunner{
162                         Name:        "lightning filter",
163                         Client:      arvados.NewClientFromEnv(),
164                         ProjectUUID: *projectUUID,
165                         RAM:         64000000000,
166                         VCPUs:       2,
167                         Priority:    *priority,
168                 }
169                 err = runner.TranslatePaths(inputFilename)
170                 if err != nil {
171                         return 1
172                 }
173                 runner.Args = []string{"filter", "-local=true",
174                         "-i", *inputFilename,
175                         "-o", "/mnt/output/library.gob",
176                         "-max-variants", fmt.Sprintf("%d", cmd.MaxVariants),
177                         "-min-coverage", fmt.Sprintf("%f", cmd.MinCoverage),
178                         "-max-tag", fmt.Sprintf("%d", cmd.MaxTag),
179                 }
180                 var output string
181                 output, err = runner.Run()
182                 if err != nil {
183                         return 1
184                 }
185                 fmt.Fprintln(stdout, output+"/library.gob")
186                 return 0
187         }
188
189         var infile io.ReadCloser
190         if *inputFilename == "-" {
191                 infile = ioutil.NopCloser(stdin)
192         } else {
193                 infile, err = os.Open(*inputFilename)
194                 if err != nil {
195                         return 1
196                 }
197                 defer infile.Close()
198         }
199         log.Print("reading")
200         cgs, err := ReadCompactGenomes(infile, strings.HasSuffix(*inputFilename, ".gz"))
201         if err != nil {
202                 return 1
203         }
204         err = infile.Close()
205         if err != nil {
206                 return 1
207         }
208         log.Printf("reading done, %d genomes", len(cgs))
209
210         log.Print("filtering")
211         ntags := 0
212         for _, cg := range cgs {
213                 if ntags < len(cg.Variants)/2 {
214                         ntags = len(cg.Variants) / 2
215                 }
216                 if cmd.MaxVariants < 0 {
217                         continue
218                 }
219                 for idx, variant := range cg.Variants {
220                         if variant > tileVariantID(cmd.MaxVariants) {
221                                 for _, cg := range cgs {
222                                         if len(cg.Variants) > idx {
223                                                 cg.Variants[idx & ^1] = 0
224                                                 cg.Variants[idx|1] = 0
225                                         }
226                                 }
227                         }
228                 }
229         }
230
231         if cmd.MaxTag >= 0 && ntags > cmd.MaxTag {
232                 ntags = cmd.MaxTag
233                 for i, cg := range cgs {
234                         if len(cg.Variants) > cmd.MaxTag*2 {
235                                 cgs[i].Variants = cg.Variants[:cmd.MaxTag*2]
236                         }
237                 }
238         }
239
240         if cmd.MinCoverage > 0 {
241                 mincov := int(cmd.MinCoverage * float64(len(cgs)*2))
242                 cov := make([]int, ntags)
243                 for _, cg := range cgs {
244                         for idx, variant := range cg.Variants {
245                                 if variant > 0 {
246                                         cov[idx>>1]++
247                                 }
248                         }
249                 }
250                 for tag, c := range cov {
251                         if c < mincov {
252                                 for _, cg := range cgs {
253                                         if len(cg.Variants) > tag*2 {
254                                                 cg.Variants[tag*2] = 0
255                                                 cg.Variants[tag*2+1] = 0
256                                         }
257                                 }
258                         }
259                 }
260         }
261
262         log.Print("filtering done")
263
264         var outfile io.WriteCloser
265         if *outputFilename == "-" {
266                 outfile = nopCloser{cmd.output}
267         } else {
268                 outfile, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
269                 if err != nil {
270                         return 1
271                 }
272                 defer outfile.Close()
273         }
274         w := bufio.NewWriter(outfile)
275         enc := gob.NewEncoder(w)
276         log.Print("writing")
277         err = enc.Encode(LibraryEntry{
278                 CompactGenomes: cgs,
279         })
280         if err != nil {
281                 return 1
282         }
283         log.Print("writing done")
284         err = w.Flush()
285         if err != nil {
286                 return 1
287         }
288         err = outfile.Close()
289         if err != nil {
290                 return 1
291         }
292         return 0
293 }