Container request priority flag.
[lightning.git] / filter.go
1 package main
2
3 import (
4         "bufio"
5         "encoding/gob"
6         "errors"
7         "flag"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "net/http"
12         _ "net/http/pprof"
13         "os"
14
15         "git.arvados.org/arvados.git/sdk/go/arvados"
16         log "github.com/sirupsen/logrus"
17 )
18
19 type filterer struct {
20         output io.Writer
21 }
22
23 func (cmd *filterer) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
24         var err error
25         defer func() {
26                 if err != nil {
27                         fmt.Fprintf(stderr, "%s\n", err)
28                 }
29         }()
30         flags := flag.NewFlagSet("", flag.ContinueOnError)
31         flags.SetOutput(stderr)
32         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
33         runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
34         projectUUID := flags.String("project", "", "project `UUID` for output data")
35         priority := flags.Int("priority", 500, "container request priority")
36         inputFilename := flags.String("i", "-", "input `file`")
37         outputFilename := flags.String("o", "-", "output `file`")
38         maxvariants := flags.Int("max-variants", -1, "drop tiles with more than `N` variants")
39         mincoverage := flags.Float64("min-coverage", 1, "drop tiles with coverage less than `P` across all haplotypes (0 < P ≤ 1)")
40         maxtag := flags.Int("max-tag", -1, "drop tiles with tag ID > `N`")
41         err = flags.Parse(args)
42         if err == flag.ErrHelp {
43                 err = nil
44                 return 0
45         } else if err != nil {
46                 return 2
47         }
48         cmd.output = stdout
49
50         if *pprof != "" {
51                 go func() {
52                         log.Println(http.ListenAndServe(*pprof, nil))
53                 }()
54         }
55
56         if !*runlocal {
57                 if *outputFilename != "-" {
58                         err = errors.New("cannot specify output file in container mode: not implemented")
59                         return 1
60                 }
61                 runner := arvadosContainerRunner{
62                         Name:        "lightning filter",
63                         Client:      arvados.NewClientFromEnv(),
64                         ProjectUUID: *projectUUID,
65                         RAM:         64000000000,
66                         VCPUs:       2,
67                         Priority:    *priority,
68                 }
69                 err = runner.TranslatePaths(inputFilename)
70                 if err != nil {
71                         return 1
72                 }
73                 runner.Args = []string{"filter", "-local=true",
74                         "-i", *inputFilename,
75                         "-o", "/mnt/output/library.gob",
76                         "-max-variants", fmt.Sprintf("%d", *maxvariants),
77                         "-min-coverage", fmt.Sprintf("%f", *mincoverage),
78                         "-max-tag", fmt.Sprintf("%d", *maxtag),
79                 }
80                 var output string
81                 output, err = runner.Run()
82                 if err != nil {
83                         return 1
84                 }
85                 fmt.Fprintln(stdout, output+"/library.gob")
86                 return 0
87         }
88
89         var infile io.ReadCloser
90         if *inputFilename == "-" {
91                 infile = ioutil.NopCloser(stdin)
92         } else {
93                 infile, err = os.Open(*inputFilename)
94                 if err != nil {
95                         return 1
96                 }
97                 defer infile.Close()
98         }
99         log.Print("reading")
100         cgs, err := ReadCompactGenomes(infile)
101         if err != nil {
102                 return 1
103         }
104         err = infile.Close()
105         if err != nil {
106                 return 1
107         }
108         log.Printf("reading done, %d genomes", len(cgs))
109
110         log.Print("filtering")
111         ntags := 0
112         for _, cg := range cgs {
113                 if ntags < len(cg.Variants)/2 {
114                         ntags = len(cg.Variants) / 2
115                 }
116                 if *maxvariants < 0 {
117                         continue
118                 }
119                 maxVariantID := tileVariantID(*maxvariants)
120                 for idx, variant := range cg.Variants {
121                         if variant > maxVariantID {
122                                 for _, cg := range cgs {
123                                         if len(cg.Variants) > idx {
124                                                 cg.Variants[idx & ^1] = 0
125                                                 cg.Variants[idx|1] = 0
126                                         }
127                                 }
128                         }
129                 }
130         }
131
132         if *maxtag >= 0 && ntags > *maxtag {
133                 ntags = *maxtag
134                 for i, cg := range cgs {
135                         if len(cg.Variants) > *maxtag*2 {
136                                 cgs[i].Variants = cg.Variants[:*maxtag*2]
137                         }
138                 }
139         }
140
141         if *mincoverage < 1 {
142                 mincov := int(*mincoverage * float64(len(cgs)*2))
143                 cov := make([]int, ntags)
144                 for _, cg := range cgs {
145                         for idx, variant := range cg.Variants {
146                                 if variant > 0 {
147                                         cov[idx>>1]++
148                                 }
149                         }
150                 }
151                 for tag, c := range cov {
152                         if c < mincov {
153                                 for _, cg := range cgs {
154                                         if len(cg.Variants) > tag*2 {
155                                                 cg.Variants[tag*2] = 0
156                                                 cg.Variants[tag*2+1] = 0
157                                         }
158                                 }
159                         }
160                 }
161         }
162
163         log.Print("filtering done")
164
165         var outfile io.WriteCloser
166         if *outputFilename == "-" {
167                 outfile = nopCloser{cmd.output}
168         } else {
169                 outfile, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
170                 if err != nil {
171                         return 1
172                 }
173                 defer outfile.Close()
174         }
175         w := bufio.NewWriter(outfile)
176         enc := gob.NewEncoder(w)
177         log.Print("writing")
178         err = enc.Encode(LibraryEntry{
179                 CompactGenomes: cgs,
180         })
181         if err != nil {
182                 return 1
183         }
184         log.Print("writing done")
185         err = w.Flush()
186         if err != nil {
187                 return 1
188         }
189         err = outfile.Close()
190         if err != nil {
191                 return 1
192         }
193         return 0
194 }