b792119b20f84fd281639642a00d4d053a935d77
[lightning.git] / annotate.go
1 package lightning
2
3 import (
4         "bufio"
5         "context"
6         "errors"
7         "flag"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "net/http"
12         _ "net/http/pprof"
13         "os"
14         "runtime"
15         "sort"
16         "strconv"
17         "strings"
18         "sync"
19
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "github.com/arvados/lightning/hgvs"
22         log "github.com/sirupsen/logrus"
23 )
24
25 type annotatecmd struct {
26         dropTiles   []bool
27         variantHash bool
28         maxTileSize int
29         tag2tagid   map[string]tagID
30 }
31
32 func (cmd *annotatecmd) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
33         var err error
34         defer func() {
35                 if err != nil {
36                         fmt.Fprintf(stderr, "%s\n", err)
37                 }
38         }()
39         flags := flag.NewFlagSet("", flag.ContinueOnError)
40         flags.SetOutput(stderr)
41         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
42         runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
43         projectUUID := flags.String("project", "", "project `UUID` for output data")
44         priority := flags.Int("priority", 500, "container request priority")
45         inputFilename := flags.String("i", "-", "input `file` (library)")
46         outputFilename := flags.String("o", "-", "output `file`")
47         flags.BoolVar(&cmd.variantHash, "variant-hash", false, "output variant hash instead of index")
48         flags.IntVar(&cmd.maxTileSize, "max-tile-size", 50000, "don't try to make annotations for tiles bigger than given `size`")
49         err = flags.Parse(args)
50         if err == flag.ErrHelp {
51                 err = nil
52                 return 0
53         } else if err != nil {
54                 return 2
55         }
56
57         if *pprof != "" {
58                 go func() {
59                         log.Println(http.ListenAndServe(*pprof, nil))
60                 }()
61         }
62         if !*runlocal {
63                 if *outputFilename != "-" {
64                         err = errors.New("cannot specify output file in container mode: not implemented")
65                         return 1
66                 }
67                 runner := arvadosContainerRunner{
68                         Name:        "lightning annotate",
69                         Client:      arvados.NewClientFromEnv(),
70                         ProjectUUID: *projectUUID,
71                         RAM:         80000000000,
72                         VCPUs:       16,
73                         Priority:    *priority,
74                 }
75                 err = runner.TranslatePaths(inputFilename)
76                 if err != nil {
77                         return 1
78                 }
79                 runner.Args = []string{"annotate", "-local=true", fmt.Sprintf("-variant-hash=%v", cmd.variantHash), "-max-tile-size", strconv.Itoa(cmd.maxTileSize), "-i", *inputFilename, "-o", "/mnt/output/tilevariants.csv"}
80                 var output string
81                 output, err = runner.Run()
82                 if err != nil {
83                         return 1
84                 }
85                 fmt.Fprintln(stdout, output+"/tilevariants.csv")
86                 return 0
87         }
88
89         var input io.ReadCloser
90         if *inputFilename == "-" {
91                 input = ioutil.NopCloser(stdin)
92         } else {
93                 input, err = os.Open(*inputFilename)
94                 if err != nil {
95                         return 1
96                 }
97                 defer input.Close()
98         }
99
100         var output io.WriteCloser
101         if *outputFilename == "-" {
102                 output = nopCloser{stdout}
103         } else {
104                 output, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0666)
105                 if err != nil {
106                         return 1
107                 }
108                 defer output.Close()
109         }
110         bufw := bufio.NewWriterSize(output, 4*1024*1024)
111
112         tilelib := &tileLibrary{
113                 retainNoCalls:       true,
114                 retainTileSequences: true,
115         }
116         err = tilelib.LoadGob(context.Background(), input, strings.HasSuffix(*inputFilename, ".gz"), nil)
117         if err != nil {
118                 return 1
119         }
120         err = cmd.exportTileDiffs(bufw, tilelib)
121         if err != nil {
122                 return 1
123         }
124         err = bufw.Flush()
125         if err != nil {
126                 return 1
127         }
128         err = output.Close()
129         if err != nil {
130                 return 1
131         }
132         err = input.Close()
133         if err != nil {
134                 return 1
135         }
136         return 0
137 }
138
139 func (cmd *annotatecmd) exportTileDiffs(outw io.Writer, tilelib *tileLibrary) error {
140         tagset := tilelib.taglib.Tags()
141         if len(tagset) == 0 {
142                 return errors.New("cannot annotate library without tags")
143         }
144         taglen := len(tagset[0])
145         var refs []string
146         for name := range tilelib.refseqs {
147                 refs = append(refs, name)
148         }
149         cmd.tag2tagid = make(map[string]tagID, len(tagset))
150         for tagid, tagseq := range tagset {
151                 cmd.tag2tagid[string(tagseq)] = tagID(tagid)
152         }
153         sort.Strings(refs)
154         log.Infof("len(refs) %d", len(refs))
155
156         outch := make(chan string, runtime.NumCPU()*2)
157         var outwg sync.WaitGroup
158         defer outwg.Wait()
159         outwg.Add(1)
160         go func() {
161                 defer outwg.Done()
162                 for s := range outch {
163                         io.WriteString(outw, s)
164                 }
165         }()
166         defer close(outch)
167
168         nseqs := 0
169         for _, refcs := range tilelib.refseqs {
170                 nseqs += len(refcs)
171         }
172
173         throttle := &throttle{Max: runtime.NumCPU()*2 + nseqs*2 + 1}
174         defer throttle.Wait()
175
176         for _, refname := range refs {
177                 refname := refname
178                 refcs := tilelib.refseqs[refname]
179                 var seqnames []string
180                 for seqname := range refcs {
181                         seqnames = append(seqnames, seqname)
182                 }
183                 sort.Strings(seqnames)
184                 for _, seqname := range seqnames {
185                         seqname := seqname
186                         throttle.Acquire()
187                         if throttle.Err() != nil {
188                                 break
189                         }
190                         go func() {
191                                 defer throttle.Release()
192                                 throttle.Report(cmd.annotateSequence(throttle, outch, tilelib, taglen, refname, seqname, refcs[seqname], len(refs) > 1))
193                         }()
194                 }
195         }
196         throttle.Wait()
197         return throttle.Err()
198 }
199
200 func (cmd *annotatecmd) annotateSequence(throttle *throttle, outch chan<- string, tilelib *tileLibrary, taglen int, refname, seqname string, reftiles []tileLibRef, refnamecol bool) error {
201         refnamefield := ""
202         if refnamecol {
203                 refnamefield = "," + trimFilenameForLabel(refname)
204         }
205         var refseq []byte
206         // tilestart[123] is the index into refseq
207         // where the tile for tag 123 was placed.
208         tilestart := map[tagID]int{}
209         tileend := map[tagID]int{}
210         for _, libref := range reftiles {
211                 if libref.Variant < 1 {
212                         return fmt.Errorf("reference %q seq %q uses variant zero at tag %d", refname, seqname, libref.Tag)
213                 }
214                 seq := tilelib.TileVariantSequence(libref)
215                 if len(seq) < taglen {
216                         return fmt.Errorf("reference %q seq %q uses tile %d variant %d with sequence len %d < taglen %d", refname, seqname, libref.Tag, libref.Variant, len(seq), taglen)
217                 }
218                 overlap := taglen
219                 if len(refseq) == 0 {
220                         overlap = 0
221                 }
222                 tilestart[libref.Tag] = len(refseq) - overlap
223                 refseq = append(refseq, seq[overlap:]...)
224                 tileend[libref.Tag] = len(refseq)
225         }
226         log.Infof("seq %s len(refseq) %d len(tilestart) %d", seqname, len(refseq), len(tilestart))
227         // outtag is tag's index in the subset of tags that aren't
228         // dropped. If there are 10M tags and half are dropped by
229         // dropTiles, tag ranges from 0 to 10M-1 and outtag ranges
230         // from 0 to 5M-1.
231         //
232         // IOW, in the matrix built by cgs2array(), {tag} is
233         // represented by columns {outtag}*2 and {outtag}*2+1.
234         outcol := -1
235         for tag, tvs := range tilelib.variant {
236                 if len(cmd.dropTiles) > tag && cmd.dropTiles[tag] {
237                         continue
238                 }
239                 tag := tagID(tag)
240                 outcol++
241                 // Must shadow outcol var to use safely in goroutine below.
242                 outcol := outcol
243                 refstart, ok := tilestart[tag]
244                 if !ok {
245                         // Tag didn't place on this
246                         // reference sequence. (It
247                         // might place on the same
248                         // chromosome in a genome
249                         // anyway, but we don't output
250                         // the annotations that would
251                         // result.)
252                         // outch <- fmt.Sprintf("%d,%d,-1%s\n", tag, outcol, refnamefield)
253                         continue
254                 }
255                 for variant := 1; variant <= len(tvs); variant++ {
256                         variant, hash := tileVariantID(variant), tvs[variant-1]
257                         tileseq := tilelib.TileVariantSequence(tileLibRef{Tag: tag, Variant: variant})
258                         if len(tileseq) == 0 {
259                                 continue
260                         } else if len(tileseq) < taglen {
261                                 return fmt.Errorf("tilevar %d,%d has sequence len %d < taglen %d", tag, variant, len(tileseq), taglen)
262                         }
263                         var refpart []byte
264                         endtag := string(tileseq[len(tileseq)-taglen:])
265                         if endtagid, ok := cmd.tag2tagid[endtag]; !ok {
266                                 // Tile variant doesn't end on a tag, so it can only place at the end of a chromosome.
267                                 refpart = refseq[refstart:]
268                                 log.Warnf("%x tilevar %d,%d endtag not in ref: %s", hash[:13], tag, variant, endtag)
269                         } else if refendtagstart, ok := tilestart[endtagid]; !ok {
270                                 // Ref ends a chromsome with a (possibly very large) variant of this tile, but genomes with this tile don't.
271                                 // Give up. (TODO: something smarter)
272                                 log.Debugf("%x not annotating tilevar %d,%d because end tag %d is not in ref", hash[:13], tag, variant, endtagid)
273                                 continue
274                         } else {
275                                 // Non-terminal tile vs. non-terminal reference.
276                                 refpart = refseq[refstart : refendtagstart+taglen]
277                                 log.Tracef("\n%x tilevar %d,%d endtag %s endtagid %d refendtagstart %d", hash[:13], tag, variant, endtag, endtagid, refendtagstart)
278                         }
279                         if len(refpart) > cmd.maxTileSize {
280                                 log.Warnf("%x tilevar %d,%d skipping long diff, ref %s seq %s pos %d ref len %d", hash[:13], tag, variant, refname, seqname, refstart, len(refpart))
281                                 continue
282                         }
283                         if len(tileseq) > cmd.maxTileSize {
284                                 log.Warnf("%x tilevar %d,%d skipping long diff, ref %s seq %s variant len %d", hash[:13], tag, variant, refname, seqname, len(tileseq))
285                                 continue
286                         }
287                         // log.Printf("\n%x @ refstart %d \n< %s\n> %s\n", tv.Blake2b, refstart, refpart, tileseq)
288
289                         throttle.Acquire()
290                         go func() {
291                                 defer throttle.Release()
292                                 diffs, _ := hgvs.Diff(strings.ToUpper(string(refpart)), strings.ToUpper(string(tileseq)), 0)
293                                 for _, diff := range diffs {
294                                         diff.Position += refstart
295                                         var varid string
296                                         if cmd.variantHash {
297                                                 varid = fmt.Sprintf("%x", hash)[:13]
298                                         } else {
299                                                 varid = fmt.Sprintf("%d", variant)
300                                         }
301                                         outch <- fmt.Sprintf("%d,%d,%s%s,%s:g.%s\n", tag, outcol, varid, refnamefield, seqname, diff.String())
302                                 }
303                         }()
304                 }
305         }
306         return nil
307 }