Merge branch '20645-updating-dockerfiles'
[lightning.git] / filter.go
1 // Copyright (C) The Lightning Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package lightning
6
7 import (
8         "bufio"
9         "encoding/gob"
10         "errors"
11         "flag"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "net/http"
16         _ "net/http/pprof"
17         "os"
18         "regexp"
19         "strings"
20
21         "git.arvados.org/arvados.git/sdk/go/arvados"
22         log "github.com/sirupsen/logrus"
23 )
24
25 type filter struct {
26         MaxVariants int
27         MinCoverage float64
28         MaxTag      int
29         MatchGenome string
30 }
31
32 func (f *filter) Flags(flags *flag.FlagSet) {
33         flags.IntVar(&f.MaxVariants, "max-variants", -1, "drop tiles with more than `N` variants")
34         flags.Float64Var(&f.MinCoverage, "min-coverage", 0, "drop tiles with coverage less than `P` across all haplotypes (0 < P ≤ 1)")
35         flags.IntVar(&f.MaxTag, "max-tag", -1, "drop tiles with tag ID > `N`")
36         flags.StringVar(&f.MatchGenome, "match-genome", "", "keep genomes whose names contain `regexp`, drop the rest")
37 }
38
39 func (f *filter) Args() []string {
40         return []string{
41                 fmt.Sprintf("-max-variants=%d", f.MaxVariants),
42                 fmt.Sprintf("-min-coverage=%f", f.MinCoverage),
43                 fmt.Sprintf("-max-tag=%d", f.MaxTag),
44                 fmt.Sprintf("-match-genome=%s", f.MatchGenome),
45         }
46 }
47
48 func (f *filter) Apply(tilelib *tileLibrary) {
49         // Zero out variants at tile positions that have more than
50         // f.MaxVariants tile variants.
51         if f.MaxVariants >= 0 {
52                 for tag, variants := range tilelib.variant {
53                         if f.MaxTag >= 0 && tag >= f.MaxTag {
54                                 break
55                         }
56                         if len(variants) <= f.MaxVariants {
57                                 continue
58                         }
59                         for _, cg := range tilelib.compactGenomes {
60                                 if len(cg) > tag*2 {
61                                         cg[tag*2] = 0
62                                         cg[tag*2+1] = 0
63                                 }
64                         }
65                 }
66         }
67
68         // Zero out variants at tile positions that have less than
69         // f.MinCoverage.
70         mincov := int(2*f.MinCoverage*float64(len(tilelib.compactGenomes)) + 1)
71 TAG:
72         for tag := 0; tag < len(tilelib.variant) && (tag < f.MaxTag || f.MaxTag < 0); tag++ {
73                 tagcov := 0
74                 for _, cg := range tilelib.compactGenomes {
75                         if len(cg) < tag*2+2 {
76                                 continue
77                         }
78                         if cg[tag*2] > 0 {
79                                 tagcov++
80                         }
81                         if cg[tag*2+1] > 0 {
82                                 tagcov++
83                         }
84                         if tagcov >= mincov {
85                                 continue TAG
86                         }
87                 }
88                 for _, cg := range tilelib.compactGenomes {
89                         if len(cg) > tag*2 {
90                                 cg[tag*2] = 0
91                                 cg[tag*2+1] = 0
92                         }
93                 }
94         }
95
96         // Truncate genomes and tile data to f.MaxTag (TODO: truncate
97         // refseqs too)
98         if f.MaxTag >= 0 {
99                 if len(tilelib.variant) > f.MaxTag {
100                         tilelib.variant = tilelib.variant[:f.MaxTag]
101                 }
102                 for name, cg := range tilelib.compactGenomes {
103                         if len(cg) > 2*f.MaxTag {
104                                 tilelib.compactGenomes[name] = cg[:2*f.MaxTag]
105                         }
106                 }
107         }
108
109         re, err := regexp.Compile(f.MatchGenome)
110         if err != nil {
111                 log.Errorf("invalid regexp %q does not match anything, dropping all genomes", f.MatchGenome)
112         }
113         for name := range tilelib.compactGenomes {
114                 if !re.MatchString(name) {
115                         delete(tilelib.compactGenomes, name)
116                 }
117         }
118 }
119
120 type filtercmd struct {
121         output io.Writer
122         filter
123 }
124
125 func (cmd *filtercmd) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
126         var err error
127         defer func() {
128                 if err != nil {
129                         fmt.Fprintf(stderr, "%s\n", err)
130                 }
131         }()
132         flags := flag.NewFlagSet("", flag.ContinueOnError)
133         flags.SetOutput(stderr)
134         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
135         runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
136         projectUUID := flags.String("project", "", "project `UUID` for output data")
137         priority := flags.Int("priority", 500, "container request priority")
138         inputFilename := flags.String("i", "-", "input `file`")
139         outputFilename := flags.String("o", "-", "output `file`")
140         cmd.filter.Flags(flags)
141         err = flags.Parse(args)
142         if err == flag.ErrHelp {
143                 err = nil
144                 return 0
145         } else if err != nil {
146                 return 2
147         } else if flags.NArg() > 0 {
148                 err = fmt.Errorf("errant command line arguments after parsed flags: %v", flags.Args())
149                 return 2
150         }
151         cmd.output = stdout
152
153         if *pprof != "" {
154                 go func() {
155                         log.Println(http.ListenAndServe(*pprof, nil))
156                 }()
157         }
158
159         if !*runlocal {
160                 if *outputFilename != "-" {
161                         err = errors.New("cannot specify output file in container mode: not implemented")
162                         return 1
163                 }
164                 runner := arvadosContainerRunner{
165                         Name:        "lightning filter",
166                         Client:      arvados.NewClientFromEnv(),
167                         ProjectUUID: *projectUUID,
168                         RAM:         64000000000,
169                         VCPUs:       2,
170                         Priority:    *priority,
171                 }
172                 err = runner.TranslatePaths(inputFilename)
173                 if err != nil {
174                         return 1
175                 }
176                 runner.Args = []string{"filter", "-local=true",
177                         "-i", *inputFilename,
178                         "-o", "/mnt/output/library.gob",
179                         "-max-variants", fmt.Sprintf("%d", cmd.MaxVariants),
180                         "-min-coverage", fmt.Sprintf("%f", cmd.MinCoverage),
181                         "-max-tag", fmt.Sprintf("%d", cmd.MaxTag),
182                 }
183                 var output string
184                 output, err = runner.Run()
185                 if err != nil {
186                         return 1
187                 }
188                 fmt.Fprintln(stdout, output+"/library.gob")
189                 return 0
190         }
191
192         var infile io.ReadCloser
193         if *inputFilename == "-" {
194                 infile = ioutil.NopCloser(stdin)
195         } else {
196                 infile, err = os.Open(*inputFilename)
197                 if err != nil {
198                         return 1
199                 }
200                 defer infile.Close()
201         }
202         log.Print("reading")
203         cgs, err := ReadCompactGenomes(infile, strings.HasSuffix(*inputFilename, ".gz"))
204         if err != nil {
205                 return 1
206         }
207         err = infile.Close()
208         if err != nil {
209                 return 1
210         }
211         log.Printf("reading done, %d genomes", len(cgs))
212
213         log.Print("filtering")
214         ntags := 0
215         for _, cg := range cgs {
216                 if ntags < len(cg.Variants)/2 {
217                         ntags = len(cg.Variants) / 2
218                 }
219                 if cmd.MaxVariants < 0 {
220                         continue
221                 }
222                 for idx, variant := range cg.Variants {
223                         if variant > tileVariantID(cmd.MaxVariants) {
224                                 for _, cg := range cgs {
225                                         if len(cg.Variants) > idx {
226                                                 cg.Variants[idx & ^1] = 0
227                                                 cg.Variants[idx|1] = 0
228                                         }
229                                 }
230                         }
231                 }
232         }
233
234         if cmd.MaxTag >= 0 && ntags > cmd.MaxTag {
235                 ntags = cmd.MaxTag
236                 for i, cg := range cgs {
237                         if len(cg.Variants) > cmd.MaxTag*2 {
238                                 cgs[i].Variants = cg.Variants[:cmd.MaxTag*2]
239                         }
240                 }
241         }
242
243         if cmd.MinCoverage > 0 {
244                 mincov := int(cmd.MinCoverage * float64(len(cgs)*2))
245                 cov := make([]int, ntags)
246                 for _, cg := range cgs {
247                         for idx, variant := range cg.Variants {
248                                 if variant > 0 {
249                                         cov[idx>>1]++
250                                 }
251                         }
252                 }
253                 for tag, c := range cov {
254                         if c < mincov {
255                                 for _, cg := range cgs {
256                                         if len(cg.Variants) > tag*2 {
257                                                 cg.Variants[tag*2] = 0
258                                                 cg.Variants[tag*2+1] = 0
259                                         }
260                                 }
261                         }
262                 }
263         }
264
265         log.Print("filtering done")
266
267         var outfile io.WriteCloser
268         if *outputFilename == "-" {
269                 outfile = nopCloser{cmd.output}
270         } else {
271                 outfile, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
272                 if err != nil {
273                         return 1
274                 }
275                 defer outfile.Close()
276         }
277         w := bufio.NewWriter(outfile)
278         enc := gob.NewEncoder(w)
279         log.Print("writing")
280         err = enc.Encode(LibraryEntry{
281                 CompactGenomes: cgs,
282         })
283         if err != nil {
284                 return 1
285         }
286         log.Print("writing done")
287         err = w.Flush()
288         if err != nil {
289                 return 1
290         }
291         err = outfile.Close()
292         if err != nil {
293                 return 1
294         }
295         return 0
296 }