Improve concurrency in export-diff.
[lightning.git] / filter.go
1 package lightning
2
3 import (
4         "bufio"
5         "encoding/gob"
6         "errors"
7         "flag"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "net/http"
12         _ "net/http/pprof"
13         "os"
14         "strings"
15
16         "git.arvados.org/arvados.git/sdk/go/arvados"
17         log "github.com/sirupsen/logrus"
18 )
19
20 type filter struct {
21         MaxVariants int
22         MinCoverage float64
23         MaxTag      int
24 }
25
26 func (f *filter) Flags(flags *flag.FlagSet) {
27         flags.IntVar(&f.MaxVariants, "max-variants", -1, "drop tiles with more than `N` variants")
28         flags.Float64Var(&f.MinCoverage, "min-coverage", 0, "drop tiles with coverage less than `P` across all haplotypes (0 < P ≤ 1)")
29         flags.IntVar(&f.MaxTag, "max-tag", -1, "drop tiles with tag ID > `N`")
30 }
31
32 func (f *filter) Args() []string {
33         return []string{
34                 fmt.Sprintf("-max-variants=%d", f.MaxVariants),
35                 fmt.Sprintf("-min-coverage=%f", f.MinCoverage),
36                 fmt.Sprintf("-max-tag=%d", f.MaxTag),
37         }
38 }
39
40 func (f *filter) Apply(tilelib *tileLibrary) {
41         // Zero out variants at tile positions that have more than
42         // f.MaxVariants tile variants.
43         if f.MaxVariants >= 0 {
44                 for tag, variants := range tilelib.variant {
45                         if f.MaxTag >= 0 && tag >= f.MaxTag {
46                                 break
47                         }
48                         if len(variants) <= f.MaxVariants {
49                                 continue
50                         }
51                         for _, cg := range tilelib.compactGenomes {
52                                 if len(cg) > tag*2 {
53                                         cg[tag*2] = 0
54                                         cg[tag*2+1] = 0
55                                 }
56                         }
57                 }
58         }
59
60         // Zero out variants at tile positions that have less than
61         // f.MinCoverage.
62         mincov := int(2*f.MinCoverage*float64(len(tilelib.compactGenomes)) + 1)
63 TAG:
64         for tag := 0; tag < len(tilelib.variant) && tag < f.MaxTag; tag++ {
65                 tagcov := 0
66                 for _, cg := range tilelib.compactGenomes {
67                         if cg[tag*2] > 0 {
68                                 tagcov++
69                         }
70                         if cg[tag*2+1] > 0 {
71                                 tagcov++
72                         }
73                         if tagcov >= mincov {
74                                 continue TAG
75                         }
76                 }
77                 for _, cg := range tilelib.compactGenomes {
78                         cg[tag*2] = 0
79                         cg[tag*2+1] = 0
80                 }
81         }
82
83         // Truncate genomes and tile data to f.MaxTag (TODO: truncate
84         // refseqs too)
85         if f.MaxTag >= 0 {
86                 if len(tilelib.variant) > f.MaxTag {
87                         tilelib.variant = tilelib.variant[:f.MaxTag]
88                 }
89                 for name, cg := range tilelib.compactGenomes {
90                         if len(cg) > 2*f.MaxTag {
91                                 tilelib.compactGenomes[name] = cg[:2*f.MaxTag]
92                         }
93                 }
94         }
95 }
96
97 type filtercmd struct {
98         output io.Writer
99         filter
100 }
101
102 func (cmd *filtercmd) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
103         var err error
104         defer func() {
105                 if err != nil {
106                         fmt.Fprintf(stderr, "%s\n", err)
107                 }
108         }()
109         flags := flag.NewFlagSet("", flag.ContinueOnError)
110         flags.SetOutput(stderr)
111         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
112         runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
113         projectUUID := flags.String("project", "", "project `UUID` for output data")
114         priority := flags.Int("priority", 500, "container request priority")
115         inputFilename := flags.String("i", "-", "input `file`")
116         outputFilename := flags.String("o", "-", "output `file`")
117         cmd.filter.Flags(flags)
118         err = flags.Parse(args)
119         if err == flag.ErrHelp {
120                 err = nil
121                 return 0
122         } else if err != nil {
123                 return 2
124         }
125         cmd.output = stdout
126
127         if *pprof != "" {
128                 go func() {
129                         log.Println(http.ListenAndServe(*pprof, nil))
130                 }()
131         }
132
133         if !*runlocal {
134                 if *outputFilename != "-" {
135                         err = errors.New("cannot specify output file in container mode: not implemented")
136                         return 1
137                 }
138                 runner := arvadosContainerRunner{
139                         Name:        "lightning filter",
140                         Client:      arvados.NewClientFromEnv(),
141                         ProjectUUID: *projectUUID,
142                         RAM:         64000000000,
143                         VCPUs:       2,
144                         Priority:    *priority,
145                 }
146                 err = runner.TranslatePaths(inputFilename)
147                 if err != nil {
148                         return 1
149                 }
150                 runner.Args = []string{"filter", "-local=true",
151                         "-i", *inputFilename,
152                         "-o", "/mnt/output/library.gob",
153                         "-max-variants", fmt.Sprintf("%d", cmd.MaxVariants),
154                         "-min-coverage", fmt.Sprintf("%f", cmd.MinCoverage),
155                         "-max-tag", fmt.Sprintf("%d", cmd.MaxTag),
156                 }
157                 var output string
158                 output, err = runner.Run()
159                 if err != nil {
160                         return 1
161                 }
162                 fmt.Fprintln(stdout, output+"/library.gob")
163                 return 0
164         }
165
166         var infile io.ReadCloser
167         if *inputFilename == "-" {
168                 infile = ioutil.NopCloser(stdin)
169         } else {
170                 infile, err = os.Open(*inputFilename)
171                 if err != nil {
172                         return 1
173                 }
174                 defer infile.Close()
175         }
176         log.Print("reading")
177         cgs, err := ReadCompactGenomes(infile, strings.HasSuffix(*inputFilename, ".gz"))
178         if err != nil {
179                 return 1
180         }
181         err = infile.Close()
182         if err != nil {
183                 return 1
184         }
185         log.Printf("reading done, %d genomes", len(cgs))
186
187         log.Print("filtering")
188         ntags := 0
189         for _, cg := range cgs {
190                 if ntags < len(cg.Variants)/2 {
191                         ntags = len(cg.Variants) / 2
192                 }
193                 if cmd.MaxVariants < 0 {
194                         continue
195                 }
196                 for idx, variant := range cg.Variants {
197                         if variant > tileVariantID(cmd.MaxVariants) {
198                                 for _, cg := range cgs {
199                                         if len(cg.Variants) > idx {
200                                                 cg.Variants[idx & ^1] = 0
201                                                 cg.Variants[idx|1] = 0
202                                         }
203                                 }
204                         }
205                 }
206         }
207
208         if cmd.MaxTag >= 0 && ntags > cmd.MaxTag {
209                 ntags = cmd.MaxTag
210                 for i, cg := range cgs {
211                         if len(cg.Variants) > cmd.MaxTag*2 {
212                                 cgs[i].Variants = cg.Variants[:cmd.MaxTag*2]
213                         }
214                 }
215         }
216
217         if cmd.MinCoverage > 0 {
218                 mincov := int(cmd.MinCoverage * float64(len(cgs)*2))
219                 cov := make([]int, ntags)
220                 for _, cg := range cgs {
221                         for idx, variant := range cg.Variants {
222                                 if variant > 0 {
223                                         cov[idx>>1]++
224                                 }
225                         }
226                 }
227                 for tag, c := range cov {
228                         if c < mincov {
229                                 for _, cg := range cgs {
230                                         if len(cg.Variants) > tag*2 {
231                                                 cg.Variants[tag*2] = 0
232                                                 cg.Variants[tag*2+1] = 0
233                                         }
234                                 }
235                         }
236                 }
237         }
238
239         log.Print("filtering done")
240
241         var outfile io.WriteCloser
242         if *outputFilename == "-" {
243                 outfile = nopCloser{cmd.output}
244         } else {
245                 outfile, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
246                 if err != nil {
247                         return 1
248                 }
249                 defer outfile.Close()
250         }
251         w := bufio.NewWriter(outfile)
252         enc := gob.NewEncoder(w)
253         log.Print("writing")
254         err = enc.Encode(LibraryEntry{
255                 CompactGenomes: cgs,
256         })
257         if err != nil {
258                 return 1
259         }
260         log.Print("writing done")
261         err = w.Flush()
262         if err != nil {
263                 return 1
264         }
265         err = outfile.Close()
266         if err != nil {
267                 return 1
268         }
269         return 0
270 }