Gzip gob files.
[lightning.git] / stats.go
1 package main
2
3 import (
4         "bufio"
5         "encoding/json"
6         "errors"
7         "flag"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "net/http"
12         _ "net/http/pprof"
13         "os"
14         "strings"
15
16         "git.arvados.org/arvados.git/sdk/go/arvados"
17         log "github.com/sirupsen/logrus"
18 )
19
20 type statscmd struct {
21         debugUnplaced bool
22 }
23
24 func (cmd *statscmd) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
25         var err error
26         defer func() {
27                 if err != nil {
28                         fmt.Fprintf(stderr, "%s\n", err)
29                 }
30         }()
31         flags := flag.NewFlagSet("", flag.ContinueOnError)
32         flags.SetOutput(stderr)
33         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
34         runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
35         projectUUID := flags.String("project", "", "project `UUID` for output data")
36         priority := flags.Int("priority", 500, "container request priority")
37         inputFilename := flags.String("i", "-", "input `file`")
38         outputFilename := flags.String("o", "-", "output `file`")
39         flags.BoolVar(&cmd.debugUnplaced, "debug-unplaced", false, "output full list of unplaced tags")
40         err = flags.Parse(args)
41         if err == flag.ErrHelp {
42                 err = nil
43                 return 0
44         } else if err != nil {
45                 return 2
46         }
47
48         if *pprof != "" {
49                 go func() {
50                         log.Println(http.ListenAndServe(*pprof, nil))
51                 }()
52         }
53
54         if !*runlocal {
55                 if *outputFilename != "-" {
56                         err = errors.New("cannot specify output file in container mode: not implemented")
57                         return 1
58                 }
59                 runner := arvadosContainerRunner{
60                         Name:        "lightning stats",
61                         Client:      arvados.NewClientFromEnv(),
62                         ProjectUUID: *projectUUID,
63                         RAM:         16000000000,
64                         VCPUs:       1,
65                         Priority:    *priority,
66                 }
67                 err = runner.TranslatePaths(inputFilename)
68                 if err != nil {
69                         return 1
70                 }
71                 runner.Args = []string{"stats", "-local=true", fmt.Sprintf("-debug-unplaced=%v", cmd.debugUnplaced), "-i", *inputFilename, "-o", "/mnt/output/stats.json"}
72                 var output string
73                 output, err = runner.Run()
74                 if err != nil {
75                         return 1
76                 }
77                 fmt.Fprintln(stdout, output+"/stats.json")
78                 return 0
79         }
80
81         var input io.ReadCloser
82         if *inputFilename == "-" {
83                 input = ioutil.NopCloser(stdin)
84         } else {
85                 input, err = os.Open(*inputFilename)
86                 if err != nil {
87                         return 1
88                 }
89                 defer input.Close()
90         }
91
92         var output io.WriteCloser
93         if *outputFilename == "-" {
94                 output = nopCloser{stdout}
95         } else {
96                 output, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
97                 if err != nil {
98                         return 1
99                 }
100                 defer output.Close()
101         }
102
103         bufw := bufio.NewWriter(output)
104         err = cmd.doStats(input, strings.HasSuffix(*inputFilename, ".gz"), bufw)
105         if err != nil {
106                 return 1
107         }
108         err = bufw.Flush()
109         if err != nil {
110                 return 1
111         }
112         err = output.Close()
113         if err != nil {
114                 return 1
115         }
116         return 0
117 }
118
119 func (cmd *statscmd) doStats(input io.Reader, gz bool, output io.Writer) error {
120         var ret struct {
121                 Genomes          int
122                 CalledBases      []int64
123                 Tags             int
124                 TagsPlacedNTimes []int // a[x]==y means there were y tags that placed x times
125                 TileVariants     int
126                 VariantsBySize   []int
127                 NCVariantsBySize []int
128                 UnplacedTags     []string `json:",omitempty"`
129         }
130
131         var tagSet [][]byte
132         var tagPlacements []int
133         tileVariantCalls := map[tileLibRef]int{}
134         err := DecodeLibrary(input, gz, func(ent *LibraryEntry) error {
135                 ret.Genomes += len(ent.CompactGenomes)
136                 ret.TileVariants += len(ent.TileVariants)
137                 if len(ent.TagSet) > 0 {
138                         if ret.Tags > 0 {
139                                 return errors.New("invalid input: contains multiple tagsets")
140                         }
141                         ret.Tags = len(ent.TagSet)
142                         tagSet = ent.TagSet
143                 }
144                 for _, tv := range ent.TileVariants {
145                         if need := 1 + len(tv.Sequence) - len(ret.VariantsBySize); need > 0 {
146                                 ret.VariantsBySize = append(ret.VariantsBySize, make([]int, need)...)
147                                 ret.NCVariantsBySize = append(ret.NCVariantsBySize, make([]int, need)...)
148                         }
149
150                         calls := 0
151                         hasNoCalls := false
152                         for _, b := range tv.Sequence {
153                                 if b == 'a' || b == 'c' || b == 'g' || b == 't' {
154                                         calls++
155                                 } else {
156                                         hasNoCalls = true
157                                 }
158                         }
159
160                         if hasNoCalls {
161                                 ret.NCVariantsBySize[len(tv.Sequence)]++
162                         } else {
163                                 ret.VariantsBySize[len(tv.Sequence)]++
164                         }
165
166                         tileVariantCalls[tileLibRef{Tag: tv.Tag, Variant: tv.Variant}] = calls
167                 }
168                 for _, g := range ent.CompactGenomes {
169                         if need := (len(g.Variants)+1)/2 - len(tagPlacements); need > 0 {
170                                 tagPlacements = append(tagPlacements, make([]int, need)...)
171                         }
172                         calledBases := int64(0)
173                         for idx, v := range g.Variants {
174                                 if v > 0 {
175                                         tagPlacements[idx/2]++
176                                         calledBases += int64(tileVariantCalls[tileLibRef{Tag: tagID(idx / 2), Variant: v}])
177                                 }
178                         }
179                         ret.CalledBases = append(ret.CalledBases, calledBases)
180                 }
181                 return nil
182         })
183         if err != nil {
184                 return err
185         }
186         for id, p := range tagPlacements {
187                 for len(ret.TagsPlacedNTimes) <= p {
188                         ret.TagsPlacedNTimes = append(ret.TagsPlacedNTimes, 0)
189                 }
190                 ret.TagsPlacedNTimes[p]++
191                 if cmd.debugUnplaced && p == 0 {
192                         ret.UnplacedTags = append(ret.UnplacedTags, fmt.Sprintf("%d %s", id, tagSet[id]))
193                 }
194         }
195
196         return json.NewEncoder(output).Encode(ret)
197 }