21139: Remove excess concurrency to reduce mutex/goroutine overhead.
[lightning.git] / stats.go
1 // Copyright (C) The Lightning Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package lightning
6
7 import (
8         "bufio"
9         "encoding/json"
10         "errors"
11         "flag"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "net/http"
16         _ "net/http/pprof"
17         "os"
18         "strings"
19
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         log "github.com/sirupsen/logrus"
22 )
23
24 type statscmd struct {
25         debugUnplaced bool
26 }
27
28 func (cmd *statscmd) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
29         var err error
30         defer func() {
31                 if err != nil {
32                         fmt.Fprintf(stderr, "%s\n", err)
33                 }
34         }()
35         flags := flag.NewFlagSet("", flag.ContinueOnError)
36         flags.SetOutput(stderr)
37         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
38         runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
39         projectUUID := flags.String("project", "", "project `UUID` for output data")
40         priority := flags.Int("priority", 500, "container request priority")
41         inputFilename := flags.String("i", "-", "input `file`")
42         outputFilename := flags.String("o", "-", "output `file`")
43         flags.BoolVar(&cmd.debugUnplaced, "debug-unplaced", false, "output full list of unplaced tags")
44         err = flags.Parse(args)
45         if err == flag.ErrHelp {
46                 err = nil
47                 return 0
48         } else if err != nil {
49                 return 2
50         } else if flags.NArg() > 0 {
51                 err = fmt.Errorf("errant command line arguments after parsed flags: %v", flags.Args())
52                 return 2
53         }
54
55         if *pprof != "" {
56                 go func() {
57                         log.Println(http.ListenAndServe(*pprof, nil))
58                 }()
59         }
60
61         if !*runlocal {
62                 if *outputFilename != "-" {
63                         err = errors.New("cannot specify output file in container mode: not implemented")
64                         return 1
65                 }
66                 runner := arvadosContainerRunner{
67                         Name:        "lightning stats",
68                         Client:      arvados.NewClientFromEnv(),
69                         ProjectUUID: *projectUUID,
70                         RAM:         16000000000,
71                         VCPUs:       1,
72                         Priority:    *priority,
73                 }
74                 err = runner.TranslatePaths(inputFilename)
75                 if err != nil {
76                         return 1
77                 }
78                 runner.Args = []string{"stats", "-local=true", fmt.Sprintf("-debug-unplaced=%v", cmd.debugUnplaced), "-i", *inputFilename, "-o", "/mnt/output/stats.json"}
79                 var output string
80                 output, err = runner.Run()
81                 if err != nil {
82                         return 1
83                 }
84                 fmt.Fprintln(stdout, output+"/stats.json")
85                 return 0
86         }
87
88         var input io.ReadCloser
89         if *inputFilename == "-" {
90                 input = ioutil.NopCloser(stdin)
91         } else {
92                 input, err = os.Open(*inputFilename)
93                 if err != nil {
94                         return 1
95                 }
96                 defer input.Close()
97         }
98
99         var output io.WriteCloser
100         if *outputFilename == "-" {
101                 output = nopCloser{stdout}
102         } else {
103                 output, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
104                 if err != nil {
105                         return 1
106                 }
107                 defer output.Close()
108         }
109
110         bufw := bufio.NewWriter(output)
111         err = cmd.doStats(input, strings.HasSuffix(*inputFilename, ".gz"), bufw)
112         if err != nil {
113                 return 1
114         }
115         err = bufw.Flush()
116         if err != nil {
117                 return 1
118         }
119         err = output.Close()
120         if err != nil {
121                 return 1
122         }
123         return 0
124 }
125
126 func (cmd *statscmd) doStats(input io.Reader, gz bool, output io.Writer) error {
127         var ret struct {
128                 Genomes          int
129                 CalledBases      []int64
130                 Tags             int
131                 TagsPlacedNTimes []int // a[x]==y means there were y tags that placed x times
132                 TileVariants     int
133                 VariantsBySize   []int
134                 NCVariantsBySize []int
135                 UnplacedTags     []string `json:",omitempty"`
136         }
137
138         var tagSet [][]byte
139         var tagPlacements []int
140         tileVariantCalls := map[tileLibRef]int{}
141         err := DecodeLibrary(input, gz, func(ent *LibraryEntry) error {
142                 ret.Genomes += len(ent.CompactGenomes)
143                 ret.TileVariants += len(ent.TileVariants)
144                 if len(ent.TagSet) > 0 {
145                         if ret.Tags > 0 {
146                                 return errors.New("invalid input: contains multiple tagsets")
147                         }
148                         ret.Tags = len(ent.TagSet)
149                         tagSet = ent.TagSet
150                 }
151                 for _, tv := range ent.TileVariants {
152                         if need := 1 + len(tv.Sequence) - len(ret.VariantsBySize); need > 0 {
153                                 ret.VariantsBySize = append(ret.VariantsBySize, make([]int, need)...)
154                                 ret.NCVariantsBySize = append(ret.NCVariantsBySize, make([]int, need)...)
155                         }
156
157                         calls := 0
158                         hasNoCalls := false
159                         for _, b := range tv.Sequence {
160                                 if b == 'a' || b == 'c' || b == 'g' || b == 't' {
161                                         calls++
162                                 } else {
163                                         hasNoCalls = true
164                                 }
165                         }
166
167                         if hasNoCalls {
168                                 ret.NCVariantsBySize[len(tv.Sequence)]++
169                         } else {
170                                 ret.VariantsBySize[len(tv.Sequence)]++
171                         }
172
173                         tileVariantCalls[tileLibRef{Tag: tv.Tag, Variant: tv.Variant}] = calls
174                 }
175                 for _, g := range ent.CompactGenomes {
176                         if need := (len(g.Variants)+1)/2 - len(tagPlacements); need > 0 {
177                                 tagPlacements = append(tagPlacements, make([]int, need)...)
178                         }
179                         calledBases := int64(0)
180                         for idx, v := range g.Variants {
181                                 if v > 0 {
182                                         tagPlacements[idx/2]++
183                                         calledBases += int64(tileVariantCalls[tileLibRef{Tag: tagID(idx / 2), Variant: v}])
184                                 }
185                         }
186                         ret.CalledBases = append(ret.CalledBases, calledBases)
187                 }
188                 return nil
189         })
190         if err != nil {
191                 return err
192         }
193         for id, p := range tagPlacements {
194                 for len(ret.TagsPlacedNTimes) <= p {
195                         ret.TagsPlacedNTimes = append(ret.TagsPlacedNTimes, 0)
196                 }
197                 ret.TagsPlacedNTimes[p]++
198                 if cmd.debugUnplaced && p == 0 {
199                         ret.UnplacedTags = append(ret.UnplacedTags, fmt.Sprintf("%d %s", id, tagSet[id]))
200                 }
201         }
202
203         return json.NewEncoder(output).Encode(ret)
204 }