9d43abea608d98d9be8aec53934f2383c86fb0b1
[lightning.git] / filter.go
1 // Copyright (C) The Lightning Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package lightning
6
7 import (
8         "bufio"
9         "encoding/gob"
10         "errors"
11         "flag"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "net/http"
16         _ "net/http/pprof"
17         "os"
18         "strings"
19
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         log "github.com/sirupsen/logrus"
22 )
23
24 type filter struct {
25         MaxVariants int
26         MinCoverage float64
27         MaxTag      int
28 }
29
30 func (f *filter) Flags(flags *flag.FlagSet) {
31         flags.IntVar(&f.MaxVariants, "max-variants", -1, "drop tiles with more than `N` variants")
32         flags.Float64Var(&f.MinCoverage, "min-coverage", 0, "drop tiles with coverage less than `P` across all haplotypes (0 < P ≤ 1)")
33         flags.IntVar(&f.MaxTag, "max-tag", -1, "drop tiles with tag ID > `N`")
34 }
35
36 func (f *filter) Args() []string {
37         return []string{
38                 fmt.Sprintf("-max-variants=%d", f.MaxVariants),
39                 fmt.Sprintf("-min-coverage=%f", f.MinCoverage),
40                 fmt.Sprintf("-max-tag=%d", f.MaxTag),
41         }
42 }
43
44 func (f *filter) Apply(tilelib *tileLibrary) {
45         // Zero out variants at tile positions that have more than
46         // f.MaxVariants tile variants.
47         if f.MaxVariants >= 0 {
48                 for tag, variants := range tilelib.variant {
49                         if f.MaxTag >= 0 && tag >= f.MaxTag {
50                                 break
51                         }
52                         if len(variants) <= f.MaxVariants {
53                                 continue
54                         }
55                         for _, cg := range tilelib.compactGenomes {
56                                 if len(cg) > tag*2 {
57                                         cg[tag*2] = 0
58                                         cg[tag*2+1] = 0
59                                 }
60                         }
61                 }
62         }
63
64         // Zero out variants at tile positions that have less than
65         // f.MinCoverage.
66         mincov := int(2*f.MinCoverage*float64(len(tilelib.compactGenomes)) + 1)
67 TAG:
68         for tag := 0; tag < len(tilelib.variant) && (tag < f.MaxTag || f.MaxTag < 0); tag++ {
69                 tagcov := 0
70                 for _, cg := range tilelib.compactGenomes {
71                         if len(cg) < tag*2+2 {
72                                 continue
73                         }
74                         if cg[tag*2] > 0 {
75                                 tagcov++
76                         }
77                         if cg[tag*2+1] > 0 {
78                                 tagcov++
79                         }
80                         if tagcov >= mincov {
81                                 continue TAG
82                         }
83                 }
84                 for _, cg := range tilelib.compactGenomes {
85                         if len(cg) > tag*2 {
86                                 cg[tag*2] = 0
87                                 cg[tag*2+1] = 0
88                         }
89                 }
90         }
91
92         // Truncate genomes and tile data to f.MaxTag (TODO: truncate
93         // refseqs too)
94         if f.MaxTag >= 0 {
95                 if len(tilelib.variant) > f.MaxTag {
96                         tilelib.variant = tilelib.variant[:f.MaxTag]
97                 }
98                 for name, cg := range tilelib.compactGenomes {
99                         if len(cg) > 2*f.MaxTag {
100                                 tilelib.compactGenomes[name] = cg[:2*f.MaxTag]
101                         }
102                 }
103         }
104 }
105
106 type filtercmd struct {
107         output io.Writer
108         filter
109 }
110
111 func (cmd *filtercmd) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
112         var err error
113         defer func() {
114                 if err != nil {
115                         fmt.Fprintf(stderr, "%s\n", err)
116                 }
117         }()
118         flags := flag.NewFlagSet("", flag.ContinueOnError)
119         flags.SetOutput(stderr)
120         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
121         runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
122         projectUUID := flags.String("project", "", "project `UUID` for output data")
123         priority := flags.Int("priority", 500, "container request priority")
124         inputFilename := flags.String("i", "-", "input `file`")
125         outputFilename := flags.String("o", "-", "output `file`")
126         cmd.filter.Flags(flags)
127         err = flags.Parse(args)
128         if err == flag.ErrHelp {
129                 err = nil
130                 return 0
131         } else if err != nil {
132                 return 2
133         }
134         cmd.output = stdout
135
136         if *pprof != "" {
137                 go func() {
138                         log.Println(http.ListenAndServe(*pprof, nil))
139                 }()
140         }
141
142         if !*runlocal {
143                 if *outputFilename != "-" {
144                         err = errors.New("cannot specify output file in container mode: not implemented")
145                         return 1
146                 }
147                 runner := arvadosContainerRunner{
148                         Name:        "lightning filter",
149                         Client:      arvados.NewClientFromEnv(),
150                         ProjectUUID: *projectUUID,
151                         RAM:         64000000000,
152                         VCPUs:       2,
153                         Priority:    *priority,
154                 }
155                 err = runner.TranslatePaths(inputFilename)
156                 if err != nil {
157                         return 1
158                 }
159                 runner.Args = []string{"filter", "-local=true",
160                         "-i", *inputFilename,
161                         "-o", "/mnt/output/library.gob",
162                         "-max-variants", fmt.Sprintf("%d", cmd.MaxVariants),
163                         "-min-coverage", fmt.Sprintf("%f", cmd.MinCoverage),
164                         "-max-tag", fmt.Sprintf("%d", cmd.MaxTag),
165                 }
166                 var output string
167                 output, err = runner.Run()
168                 if err != nil {
169                         return 1
170                 }
171                 fmt.Fprintln(stdout, output+"/library.gob")
172                 return 0
173         }
174
175         var infile io.ReadCloser
176         if *inputFilename == "-" {
177                 infile = ioutil.NopCloser(stdin)
178         } else {
179                 infile, err = os.Open(*inputFilename)
180                 if err != nil {
181                         return 1
182                 }
183                 defer infile.Close()
184         }
185         log.Print("reading")
186         cgs, err := ReadCompactGenomes(infile, strings.HasSuffix(*inputFilename, ".gz"))
187         if err != nil {
188                 return 1
189         }
190         err = infile.Close()
191         if err != nil {
192                 return 1
193         }
194         log.Printf("reading done, %d genomes", len(cgs))
195
196         log.Print("filtering")
197         ntags := 0
198         for _, cg := range cgs {
199                 if ntags < len(cg.Variants)/2 {
200                         ntags = len(cg.Variants) / 2
201                 }
202                 if cmd.MaxVariants < 0 {
203                         continue
204                 }
205                 for idx, variant := range cg.Variants {
206                         if variant > tileVariantID(cmd.MaxVariants) {
207                                 for _, cg := range cgs {
208                                         if len(cg.Variants) > idx {
209                                                 cg.Variants[idx & ^1] = 0
210                                                 cg.Variants[idx|1] = 0
211                                         }
212                                 }
213                         }
214                 }
215         }
216
217         if cmd.MaxTag >= 0 && ntags > cmd.MaxTag {
218                 ntags = cmd.MaxTag
219                 for i, cg := range cgs {
220                         if len(cg.Variants) > cmd.MaxTag*2 {
221                                 cgs[i].Variants = cg.Variants[:cmd.MaxTag*2]
222                         }
223                 }
224         }
225
226         if cmd.MinCoverage > 0 {
227                 mincov := int(cmd.MinCoverage * float64(len(cgs)*2))
228                 cov := make([]int, ntags)
229                 for _, cg := range cgs {
230                         for idx, variant := range cg.Variants {
231                                 if variant > 0 {
232                                         cov[idx>>1]++
233                                 }
234                         }
235                 }
236                 for tag, c := range cov {
237                         if c < mincov {
238                                 for _, cg := range cgs {
239                                         if len(cg.Variants) > tag*2 {
240                                                 cg.Variants[tag*2] = 0
241                                                 cg.Variants[tag*2+1] = 0
242                                         }
243                                 }
244                         }
245                 }
246         }
247
248         log.Print("filtering done")
249
250         var outfile io.WriteCloser
251         if *outputFilename == "-" {
252                 outfile = nopCloser{cmd.output}
253         } else {
254                 outfile, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
255                 if err != nil {
256                         return 1
257                 }
258                 defer outfile.Close()
259         }
260         w := bufio.NewWriter(outfile)
261         enc := gob.NewEncoder(w)
262         log.Print("writing")
263         err = enc.Encode(LibraryEntry{
264                 CompactGenomes: cgs,
265         })
266         if err != nil {
267                 return 1
268         }
269         log.Print("writing done")
270         err = w.Flush()
271         if err != nil {
272                 return 1
273         }
274         err = outfile.Close()
275         if err != nil {
276                 return 1
277         }
278         return 0
279 }