Merge branch 'main' into 19868-pca-in-ml
[lightning.git] / annotate.go
1 // Copyright (C) The Lightning Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package lightning
6
7 import (
8         "bufio"
9         "context"
10         "errors"
11         "flag"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "net/http"
16         _ "net/http/pprof"
17         "os"
18         "runtime"
19         "sort"
20         "strconv"
21         "strings"
22         "sync"
23
24         "git.arvados.org/arvados.git/sdk/go/arvados"
25         "github.com/arvados/lightning/hgvs"
26         log "github.com/sirupsen/logrus"
27 )
28
29 type annotatecmd struct {
30         dropTiles        []bool
31         variantHash      bool
32         maxTileSize      int
33         tag2tagid        map[string]tagID
34         reportAnnotation func(tag tagID, outcol int, variant tileVariantID, refname string, seqname string, pdi hgvs.Variant)
35 }
36
37 func (cmd *annotatecmd) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
38         var err error
39         defer func() {
40                 if err != nil {
41                         fmt.Fprintf(stderr, "%s\n", err)
42                 }
43         }()
44         flags := flag.NewFlagSet("", flag.ContinueOnError)
45         flags.SetOutput(stderr)
46         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
47         runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
48         projectUUID := flags.String("project", "", "project `UUID` for output data")
49         priority := flags.Int("priority", 500, "container request priority")
50         inputFilename := flags.String("i", "-", "input `file` (library)")
51         outputFilename := flags.String("o", "-", "output `file`")
52         flags.BoolVar(&cmd.variantHash, "variant-hash", false, "output variant hash instead of index")
53         flags.IntVar(&cmd.maxTileSize, "max-tile-size", 50000, "don't try to make annotations for tiles bigger than given `size`")
54         err = flags.Parse(args)
55         if err == flag.ErrHelp {
56                 err = nil
57                 return 0
58         } else if err != nil {
59                 return 2
60         } else if flags.NArg() > 0 {
61                 err = fmt.Errorf("errant command line arguments after parsed flags: %v", flags.Args())
62                 return 2
63         }
64
65         if *pprof != "" {
66                 go func() {
67                         log.Println(http.ListenAndServe(*pprof, nil))
68                 }()
69         }
70         if !*runlocal {
71                 if *outputFilename != "-" {
72                         err = errors.New("cannot specify output file in container mode: not implemented")
73                         return 1
74                 }
75                 runner := arvadosContainerRunner{
76                         Name:        "lightning annotate",
77                         Client:      arvados.NewClientFromEnv(),
78                         ProjectUUID: *projectUUID,
79                         RAM:         80000000000,
80                         VCPUs:       16,
81                         Priority:    *priority,
82                 }
83                 err = runner.TranslatePaths(inputFilename)
84                 if err != nil {
85                         return 1
86                 }
87                 runner.Args = []string{"annotate", "-local=true", fmt.Sprintf("-variant-hash=%v", cmd.variantHash), "-max-tile-size", strconv.Itoa(cmd.maxTileSize), "-i", *inputFilename, "-o", "/mnt/output/tilevariants.csv"}
88                 var output string
89                 output, err = runner.Run()
90                 if err != nil {
91                         return 1
92                 }
93                 fmt.Fprintln(stdout, output+"/tilevariants.csv")
94                 return 0
95         }
96
97         var input io.ReadCloser
98         if *inputFilename == "-" {
99                 input = ioutil.NopCloser(stdin)
100         } else {
101                 input, err = os.Open(*inputFilename)
102                 if err != nil {
103                         return 1
104                 }
105                 defer input.Close()
106         }
107
108         var output io.WriteCloser
109         if *outputFilename == "-" {
110                 output = nopCloser{stdout}
111         } else {
112                 output, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0666)
113                 if err != nil {
114                         return 1
115                 }
116                 defer output.Close()
117         }
118         bufw := bufio.NewWriterSize(output, 4*1024*1024)
119
120         tilelib := &tileLibrary{
121                 retainNoCalls:       true,
122                 retainTileSequences: true,
123         }
124         err = tilelib.LoadGob(context.Background(), input, strings.HasSuffix(*inputFilename, ".gz"))
125         if err != nil {
126                 return 1
127         }
128         err = cmd.exportTileDiffs(bufw, tilelib)
129         if err != nil {
130                 return 1
131         }
132         err = bufw.Flush()
133         if err != nil {
134                 return 1
135         }
136         err = output.Close()
137         if err != nil {
138                 return 1
139         }
140         err = input.Close()
141         if err != nil {
142                 return 1
143         }
144         return 0
145 }
146
147 func (cmd *annotatecmd) exportTileDiffs(outw io.Writer, tilelib *tileLibrary) error {
148         tagset := tilelib.taglib.Tags()
149         if len(tagset) == 0 {
150                 return errors.New("cannot annotate library without tags")
151         }
152         taglen := len(tagset[0])
153         var refs []string
154         for name := range tilelib.refseqs {
155                 refs = append(refs, name)
156         }
157         cmd.tag2tagid = make(map[string]tagID, len(tagset))
158         for tagid, tagseq := range tagset {
159                 cmd.tag2tagid[string(tagseq)] = tagID(tagid)
160         }
161         sort.Strings(refs)
162         log.Infof("len(refs) %d", len(refs))
163
164         outch := make(chan string, runtime.NumCPU()*2)
165         var outwg sync.WaitGroup
166         defer outwg.Wait()
167         outwg.Add(1)
168         go func() {
169                 defer outwg.Done()
170                 for s := range outch {
171                         io.WriteString(outw, s)
172                 }
173         }()
174         defer close(outch)
175
176         nseqs := 0
177         for _, refcs := range tilelib.refseqs {
178                 nseqs += len(refcs)
179         }
180
181         throttle := &throttle{Max: runtime.NumCPU()*2 + nseqs*2 + 1}
182         defer throttle.Wait()
183
184         for _, refname := range refs {
185                 refname := refname
186                 refcs := tilelib.refseqs[refname]
187                 var seqnames []string
188                 for seqname := range refcs {
189                         seqnames = append(seqnames, seqname)
190                 }
191                 sort.Strings(seqnames)
192                 for _, seqname := range seqnames {
193                         seqname := seqname
194                         throttle.Acquire()
195                         if throttle.Err() != nil {
196                                 break
197                         }
198                         go func() {
199                                 defer throttle.Release()
200                                 throttle.Report(cmd.annotateSequence(throttle, outch, tilelib, taglen, refname, seqname, refcs[seqname], len(refs) > 1))
201                         }()
202                 }
203         }
204         throttle.Wait()
205         return throttle.Err()
206 }
207
208 func (cmd *annotatecmd) annotateSequence(throttle *throttle, outch chan<- string, tilelib *tileLibrary, taglen int, refname, seqname string, reftiles []tileLibRef, refnamecol bool) error {
209         refnamefield := ""
210         if refnamecol {
211                 refnamefield = "," + trimFilenameForLabel(refname)
212         }
213         var refseq []byte
214         // tilestart[123] is the index into refseq
215         // where the tile for tag 123 was placed.
216         tilestart := map[tagID]int{}
217         tileend := map[tagID]int{}
218         for _, libref := range reftiles {
219                 if libref.Variant < 1 {
220                         return fmt.Errorf("reference %q seq %q uses variant zero at tag %d", refname, seqname, libref.Tag)
221                 }
222                 seq := tilelib.TileVariantSequence(libref)
223                 if len(seq) < taglen {
224                         return fmt.Errorf("reference %q seq %q uses tile %d variant %d with sequence len %d < taglen %d", refname, seqname, libref.Tag, libref.Variant, len(seq), taglen)
225                 }
226                 overlap := taglen
227                 if len(refseq) == 0 {
228                         overlap = 0
229                 }
230                 tilestart[libref.Tag] = len(refseq) - overlap
231                 refseq = append(refseq, seq[overlap:]...)
232                 tileend[libref.Tag] = len(refseq)
233         }
234         log.Infof("seq %s len(refseq) %d len(tilestart) %d", seqname, len(refseq), len(tilestart))
235         // outtag is tag's index in the subset of tags that aren't
236         // dropped. If there are 10M tags and half are dropped by
237         // dropTiles, tag ranges from 0 to 10M-1 and outtag ranges
238         // from 0 to 5M-1.
239         //
240         // IOW, in the matrix built by cgs2array(), {tag} is
241         // represented by columns {outtag}*2 and {outtag}*2+1.
242         outcol := -1
243         for tag, tvs := range tilelib.variant {
244                 if len(cmd.dropTiles) > tag && cmd.dropTiles[tag] {
245                         continue
246                 }
247                 tag := tagID(tag)
248                 outcol++
249                 // Must shadow outcol var to use safely in goroutine below.
250                 outcol := outcol
251                 refstart, ok := tilestart[tag]
252                 if !ok {
253                         // Tag didn't place on this reference
254                         // sequence. (It might place on the same
255                         // chromosome in a genome anyway, but we don't
256                         // output the annotations that would result.)
257                         // outch <- fmt.Sprintf("%d,%d,-1%s\n", tag, outcol, refnamefield)
258                         continue
259                 }
260                 for variant := 1; variant <= len(tvs); variant++ {
261                         variant, hash := tileVariantID(variant), tvs[variant-1]
262                         tileseq := tilelib.TileVariantSequence(tileLibRef{Tag: tag, Variant: variant})
263                         if len(tileseq) == 0 {
264                                 continue
265                         } else if len(tileseq) < taglen {
266                                 return fmt.Errorf("tilevar %d,%d has sequence len %d < taglen %d", tag, variant, len(tileseq), taglen)
267                         }
268                         var refpart []byte
269                         endtag := string(tileseq[len(tileseq)-taglen:])
270                         if endtagid, ok := cmd.tag2tagid[endtag]; !ok {
271                                 // Tile variant doesn't end on a tag, so it can only place at the end of a chromosome.
272                                 refpart = refseq[refstart:]
273                                 log.Warnf("%x tilevar %d,%d endtag not in ref: %s", hash[:13], tag, variant, endtag)
274                         } else if refendtagstart, ok := tilestart[endtagid]; !ok {
275                                 // Ref ends a chromsome with a (possibly very large) variant of this tile, but genomes with this tile don't.
276                                 // Give up. (TODO: something smarter)
277                                 log.Debugf("%x not annotating tilevar %d,%d because end tag %d is not in ref", hash[:13], tag, variant, endtagid)
278                                 continue
279                         } else {
280                                 // Non-terminal tile vs. non-terminal reference.
281                                 refpart = refseq[refstart : refendtagstart+taglen]
282                                 log.Tracef("\n%x tilevar %d,%d endtag %s endtagid %d refendtagstart %d", hash[:13], tag, variant, endtag, endtagid, refendtagstart)
283                         }
284                         if len(refpart) > cmd.maxTileSize {
285                                 log.Warnf("%x tilevar %d,%d skipping long diff, ref %s seq %s pos %d ref len %d", hash[:13], tag, variant, refname, seqname, refstart, len(refpart))
286                                 continue
287                         }
288                         if len(tileseq) > cmd.maxTileSize {
289                                 log.Warnf("%x tilevar %d,%d skipping long diff, ref %s seq %s variant len %d", hash[:13], tag, variant, refname, seqname, len(tileseq))
290                                 continue
291                         }
292                         // log.Printf("\n%x @ refstart %d \n< %s\n> %s\n", tv.Blake2b, refstart, refpart, tileseq)
293
294                         throttle.Acquire()
295                         go func() {
296                                 defer throttle.Release()
297                                 diffs, _ := hgvs.Diff(strings.ToUpper(string(refpart)), strings.ToUpper(string(tileseq)), 0)
298                                 for _, diff := range diffs {
299                                         diff.Position += refstart
300                                         var varid string
301                                         if cmd.variantHash {
302                                                 varid = fmt.Sprintf("%x", hash)[:13]
303                                         } else {
304                                                 varid = fmt.Sprintf("%d", variant)
305                                         }
306                                         outch <- fmt.Sprintf("%d,%d,%s%s,%s:g.%s\n", tag, outcol, varid, refnamefield, seqname, diff.String())
307                                         if cmd.reportAnnotation != nil {
308                                                 cmd.reportAnnotation(tag, outcol, variant, refname, seqname, diff)
309                                         }
310                                 }
311                         }()
312                 }
313         }
314         return nil
315 }