Gzip gob files.
[lightning.git] / filter.go
1 package main
2
3 import (
4         "bufio"
5         "encoding/gob"
6         "errors"
7         "flag"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "net/http"
12         _ "net/http/pprof"
13         "os"
14         "strings"
15
16         "git.arvados.org/arvados.git/sdk/go/arvados"
17         log "github.com/sirupsen/logrus"
18 )
19
20 type filter struct {
21         MaxVariants int
22         MinCoverage float64
23         MaxTag      int
24 }
25
26 func (f *filter) Flags(flags *flag.FlagSet) {
27         flags.IntVar(&f.MaxVariants, "max-variants", -1, "drop tiles with more than `N` variants")
28         flags.Float64Var(&f.MinCoverage, "min-coverage", 0, "drop tiles with coverage less than `P` across all haplotypes (0 < P ≤ 1)")
29         flags.IntVar(&f.MaxTag, "max-tag", -1, "drop tiles with tag ID > `N`")
30 }
31
32 func (f *filter) Apply(tilelib *tileLibrary) {
33         // Zero out variants at tile positions that have more than
34         // f.MaxVariants tile variants.
35         if f.MaxVariants >= 0 {
36                 for tag, variants := range tilelib.variant {
37                         if f.MaxTag >= 0 && tag >= f.MaxTag {
38                                 break
39                         }
40                         if len(variants) <= f.MaxVariants {
41                                 continue
42                         }
43                         for _, cg := range tilelib.compactGenomes {
44                                 if len(cg) > tag*2 {
45                                         cg[tag*2] = 0
46                                         cg[tag*2+1] = 0
47                                 }
48                         }
49                 }
50         }
51
52         // Zero out variants at tile positions that have less than
53         // f.MinCoverage.
54         mincov := int(2*f.MinCoverage*float64(len(tilelib.compactGenomes)) + 1)
55 TAG:
56         for tag := 0; tag < len(tilelib.variant) && tag < f.MaxTag; tag++ {
57                 tagcov := 0
58                 for _, cg := range tilelib.compactGenomes {
59                         if cg[tag*2] > 0 {
60                                 tagcov++
61                         }
62                         if cg[tag*2+1] > 0 {
63                                 tagcov++
64                         }
65                         if tagcov >= mincov {
66                                 continue TAG
67                         }
68                 }
69                 for _, cg := range tilelib.compactGenomes {
70                         cg[tag*2] = 0
71                         cg[tag*2+1] = 0
72                 }
73         }
74
75         // Truncate genomes and tile data to f.MaxTag (TODO: truncate
76         // refseqs too)
77         if f.MaxTag >= 0 {
78                 if len(tilelib.variant) > f.MaxTag {
79                         tilelib.variant = tilelib.variant[:f.MaxTag]
80                 }
81                 for name, cg := range tilelib.compactGenomes {
82                         if len(cg) > 2*f.MaxTag {
83                                 tilelib.compactGenomes[name] = cg[:2*f.MaxTag]
84                         }
85                 }
86         }
87 }
88
89 type filtercmd struct {
90         output io.Writer
91         filter
92 }
93
94 func (cmd *filtercmd) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
95         var err error
96         defer func() {
97                 if err != nil {
98                         fmt.Fprintf(stderr, "%s\n", err)
99                 }
100         }()
101         flags := flag.NewFlagSet("", flag.ContinueOnError)
102         flags.SetOutput(stderr)
103         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
104         runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
105         projectUUID := flags.String("project", "", "project `UUID` for output data")
106         priority := flags.Int("priority", 500, "container request priority")
107         inputFilename := flags.String("i", "-", "input `file`")
108         outputFilename := flags.String("o", "-", "output `file`")
109         cmd.filter.Flags(flags)
110         err = flags.Parse(args)
111         if err == flag.ErrHelp {
112                 err = nil
113                 return 0
114         } else if err != nil {
115                 return 2
116         }
117         cmd.output = stdout
118
119         if *pprof != "" {
120                 go func() {
121                         log.Println(http.ListenAndServe(*pprof, nil))
122                 }()
123         }
124
125         if !*runlocal {
126                 if *outputFilename != "-" {
127                         err = errors.New("cannot specify output file in container mode: not implemented")
128                         return 1
129                 }
130                 runner := arvadosContainerRunner{
131                         Name:        "lightning filter",
132                         Client:      arvados.NewClientFromEnv(),
133                         ProjectUUID: *projectUUID,
134                         RAM:         64000000000,
135                         VCPUs:       2,
136                         Priority:    *priority,
137                 }
138                 err = runner.TranslatePaths(inputFilename)
139                 if err != nil {
140                         return 1
141                 }
142                 runner.Args = []string{"filter", "-local=true",
143                         "-i", *inputFilename,
144                         "-o", "/mnt/output/library.gob",
145                         "-max-variants", fmt.Sprintf("%d", cmd.MaxVariants),
146                         "-min-coverage", fmt.Sprintf("%f", cmd.MinCoverage),
147                         "-max-tag", fmt.Sprintf("%d", cmd.MaxTag),
148                 }
149                 var output string
150                 output, err = runner.Run()
151                 if err != nil {
152                         return 1
153                 }
154                 fmt.Fprintln(stdout, output+"/library.gob")
155                 return 0
156         }
157
158         var infile io.ReadCloser
159         if *inputFilename == "-" {
160                 infile = ioutil.NopCloser(stdin)
161         } else {
162                 infile, err = os.Open(*inputFilename)
163                 if err != nil {
164                         return 1
165                 }
166                 defer infile.Close()
167         }
168         log.Print("reading")
169         cgs, err := ReadCompactGenomes(infile, strings.HasSuffix(*inputFilename, ".gz"))
170         if err != nil {
171                 return 1
172         }
173         err = infile.Close()
174         if err != nil {
175                 return 1
176         }
177         log.Printf("reading done, %d genomes", len(cgs))
178
179         log.Print("filtering")
180         ntags := 0
181         for _, cg := range cgs {
182                 if ntags < len(cg.Variants)/2 {
183                         ntags = len(cg.Variants) / 2
184                 }
185                 if cmd.MaxVariants < 0 {
186                         continue
187                 }
188                 for idx, variant := range cg.Variants {
189                         if variant > tileVariantID(cmd.MaxVariants) {
190                                 for _, cg := range cgs {
191                                         if len(cg.Variants) > idx {
192                                                 cg.Variants[idx & ^1] = 0
193                                                 cg.Variants[idx|1] = 0
194                                         }
195                                 }
196                         }
197                 }
198         }
199
200         if cmd.MaxTag >= 0 && ntags > cmd.MaxTag {
201                 ntags = cmd.MaxTag
202                 for i, cg := range cgs {
203                         if len(cg.Variants) > cmd.MaxTag*2 {
204                                 cgs[i].Variants = cg.Variants[:cmd.MaxTag*2]
205                         }
206                 }
207         }
208
209         if cmd.MinCoverage > 0 {
210                 mincov := int(cmd.MinCoverage * float64(len(cgs)*2))
211                 cov := make([]int, ntags)
212                 for _, cg := range cgs {
213                         for idx, variant := range cg.Variants {
214                                 if variant > 0 {
215                                         cov[idx>>1]++
216                                 }
217                         }
218                 }
219                 for tag, c := range cov {
220                         if c < mincov {
221                                 for _, cg := range cgs {
222                                         if len(cg.Variants) > tag*2 {
223                                                 cg.Variants[tag*2] = 0
224                                                 cg.Variants[tag*2+1] = 0
225                                         }
226                                 }
227                         }
228                 }
229         }
230
231         log.Print("filtering done")
232
233         var outfile io.WriteCloser
234         if *outputFilename == "-" {
235                 outfile = nopCloser{cmd.output}
236         } else {
237                 outfile, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
238                 if err != nil {
239                         return 1
240                 }
241                 defer outfile.Close()
242         }
243         w := bufio.NewWriter(outfile)
244         enc := gob.NewEncoder(w)
245         log.Print("writing")
246         err = enc.Encode(LibraryEntry{
247                 CompactGenomes: cgs,
248         })
249         if err != nil {
250                 return 1
251         }
252         log.Print("writing done")
253         err = w.Flush()
254         if err != nil {
255                 return 1
256         }
257         err = outfile.Close()
258         if err != nil {
259                 return 1
260         }
261         return 0
262 }