Faster annotate.
[lightning.git] / annotate.go
1 package main
2
3 import (
4         "bufio"
5         "errors"
6         "flag"
7         "fmt"
8         "io"
9         "io/ioutil"
10         "net/http"
11         _ "net/http/pprof"
12         "os"
13         "runtime"
14         "sort"
15         "strconv"
16         "strings"
17         "sync"
18
19         "git.arvados.org/arvados.git/sdk/go/arvados"
20         "github.com/arvados/lightning/hgvs"
21         log "github.com/sirupsen/logrus"
22 )
23
24 type annotatecmd struct {
25         variantHash bool
26 }
27
28 func (cmd *annotatecmd) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
29         var err error
30         defer func() {
31                 if err != nil {
32                         fmt.Fprintf(stderr, "%s\n", err)
33                 }
34         }()
35         flags := flag.NewFlagSet("", flag.ContinueOnError)
36         flags.SetOutput(stderr)
37         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
38         runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
39         projectUUID := flags.String("project", "", "project `UUID` for output data")
40         priority := flags.Int("priority", 500, "container request priority")
41         inputFilename := flags.String("i", "-", "input `file` (library)")
42         outputFilename := flags.String("o", "-", "output `file`")
43         flags.BoolVar(&cmd.variantHash, "variant-hash", false, "output variant hash instead of index")
44         err = flags.Parse(args)
45         if err == flag.ErrHelp {
46                 err = nil
47                 return 0
48         } else if err != nil {
49                 return 2
50         }
51
52         if *pprof != "" {
53                 go func() {
54                         log.Println(http.ListenAndServe(*pprof, nil))
55                 }()
56         }
57         if !*runlocal {
58                 if *outputFilename != "-" {
59                         err = errors.New("cannot specify output file in container mode: not implemented")
60                         return 1
61                 }
62                 runner := arvadosContainerRunner{
63                         Name:        "lightning annotate",
64                         Client:      arvados.NewClientFromEnv(),
65                         ProjectUUID: *projectUUID,
66                         RAM:         120000000000,
67                         VCPUs:       32,
68                         Priority:    *priority,
69                 }
70                 err = runner.TranslatePaths(inputFilename)
71                 if err != nil {
72                         return 1
73                 }
74                 runner.Args = []string{"annotate", "-local=true", fmt.Sprintf("-variant-hash=%v", cmd.variantHash), "-i", *inputFilename, "-o", "/mnt/output/tilevariants.tsv"}
75                 var output string
76                 output, err = runner.Run()
77                 if err != nil {
78                         return 1
79                 }
80                 fmt.Fprintln(stdout, output+"/tilevariants.tsv")
81                 return 0
82         }
83
84         var input io.ReadCloser
85         if *inputFilename == "-" {
86                 input = ioutil.NopCloser(stdin)
87         } else {
88                 input, err = os.Open(*inputFilename)
89                 if err != nil {
90                         return 1
91                 }
92                 defer input.Close()
93         }
94
95         var output io.WriteCloser
96         if *outputFilename == "-" {
97                 output = nopCloser{stdout}
98         } else {
99                 output, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0666)
100                 if err != nil {
101                         return 1
102                 }
103                 defer output.Close()
104         }
105         bufw := bufio.NewWriter(output)
106
107         err = cmd.exportTileDiffs(bufw, input)
108         if err != nil {
109                 return 1
110         }
111         err = bufw.Flush()
112         if err != nil {
113                 return 1
114         }
115         err = output.Close()
116         if err != nil {
117                 return 1
118         }
119         err = input.Close()
120         if err != nil {
121                 return 1
122         }
123         return 0
124 }
125
126 func (cmd *annotatecmd) exportTileDiffs(outw io.Writer, librdr io.Reader) error {
127         var refs []CompactSequence
128         var tiles [][]TileVariant
129         var tagset [][]byte
130         var taglen int
131         err := DecodeLibrary(librdr, func(ent *LibraryEntry) error {
132                 if len(ent.TagSet) > 0 {
133                         if tagset != nil {
134                                 return errors.New("error: not implemented: input has multiple tagsets")
135                         }
136                         tagset = ent.TagSet
137                         taglen = len(tagset[0])
138                         tiles = make([][]TileVariant, len(tagset))
139                 }
140                 for _, tv := range ent.TileVariants {
141                         if tv.Tag >= tagID(len(tiles)) {
142                                 return fmt.Errorf("error: reading tile variant for tag %d but only %d tags were loaded", tv.Tag, len(tiles))
143                         }
144                         for len(tiles[tv.Tag]) <= int(tv.Variant) {
145                                 tiles[tv.Tag] = append(tiles[tv.Tag], TileVariant{})
146                         }
147                         tiles[tv.Tag][tv.Variant] = tv
148                 }
149                 for _, cs := range ent.CompactSequences {
150                         refs = append(refs, cs)
151                 }
152                 return nil
153         })
154         if err != nil {
155                 return err
156         }
157         tag2tagid := make(map[string]tagID, len(tagset))
158         for tagid, tagseq := range tagset {
159                 tag2tagid[string(tagseq)] = tagID(tagid)
160         }
161         sort.Slice(refs, func(i, j int) bool { return refs[i].Name < refs[j].Name })
162         log.Infof("len(refs) %d", len(refs))
163
164         outch := make(chan string, 1)
165         var outwg sync.WaitGroup
166         defer outwg.Wait()
167         outwg.Add(1)
168         go func() {
169                 defer outwg.Done()
170                 for s := range outch {
171                         io.WriteString(outw, s)
172                 }
173         }()
174         defer close(outch)
175
176         limiter := make(chan bool, runtime.NumCPU()+1)
177         var diffwg sync.WaitGroup
178         defer diffwg.Wait()
179
180         for _, refcs := range refs {
181                 refcs := refcs
182                 var seqnames []string
183                 for seqname := range refcs.TileSequences {
184                         seqnames = append(seqnames, seqname)
185                 }
186                 sort.Strings(seqnames)
187                 for _, seqname := range seqnames {
188                         seqname := seqname
189                         var refseq []byte
190                         // tilestart[123] is the index into refseq
191                         // where the tile for tag 123 was placed.
192                         tilestart := map[tagID]int{}
193                         tileend := map[tagID]int{}
194                         for _, libref := range refcs.TileSequences[seqname] {
195                                 if libref.Variant < 1 {
196                                         return fmt.Errorf("reference %q seq %q uses variant zero at tag %d", refcs.Name, seqname, libref.Tag)
197                                 }
198                                 seq := tiles[libref.Tag][libref.Variant].Sequence
199                                 if len(seq) < taglen {
200                                         return fmt.Errorf("reference %q seq %q uses tile %d variant %d with sequence len %d < taglen %d", refcs.Name, seqname, libref.Tag, libref.Variant, len(seq), taglen)
201                                 }
202                                 overlap := taglen
203                                 if len(refseq) == 0 {
204                                         overlap = 0
205                                 }
206                                 tilestart[libref.Tag] = len(refseq) - overlap
207                                 refseq = append(refseq, seq[overlap:]...)
208                                 tileend[libref.Tag] = len(refseq)
209                         }
210                         log.Infof("seq %s len(refseq) %d len(tilestart) %d", seqname, len(refseq), len(tilestart))
211                         for tag, tvs := range tiles {
212                                 tag := tagID(tag)
213                                 refstart, ok := tilestart[tag]
214                                 if !ok {
215                                         // Tag didn't place on this
216                                         // reference sequence. (It
217                                         // might place on the same
218                                         // chromosome in a genome
219                                         // anyway, but we don't output
220                                         // the annotations that would
221                                         // result.)
222                                         continue
223                                 }
224                                 for variant, tv := range tvs {
225                                         variant, tv := variant, tv
226                                         if variant == 0 {
227                                                 continue
228                                         }
229                                         if len(tv.Sequence) < taglen {
230                                                 return fmt.Errorf("tilevar %d,%d has sequence len %d < taglen %d", tag, variant, len(tv.Sequence), taglen)
231                                         }
232                                         var refpart []byte
233                                         endtag := string(tv.Sequence[len(tv.Sequence)-taglen:])
234                                         if endtagid, ok := tag2tagid[endtag]; !ok {
235                                                 // Tile variant doesn't end on a tag, so it can only place at the end of a chromosome.
236                                                 refpart = refseq[refstart:]
237                                                 log.Warnf("%x tilevar %d,%d endtag not in ref: %s", tv.Blake2b[:13], tag, variant, endtag)
238                                         } else if refendtagstart, ok := tilestart[endtagid]; !ok {
239                                                 // Ref ends a chromsome with a (possibly very large) variant of this tile, but genomes with this tile don't.
240                                                 // Give up. (TODO: something smarter)
241                                                 log.Debugf("%x not annotating tilevar %d,%d because end tag %d is not in ref", tv.Blake2b[:13], tag, variant, endtagid)
242                                                 continue
243                                         } else {
244                                                 // Non-terminal tile vs. non-terminal reference.
245                                                 refpart = refseq[refstart : refendtagstart+taglen]
246                                                 log.Tracef("\n%x tilevar %d,%d endtag %s endtagid %d refendtagstart %d", tv.Blake2b[:13], tag, variant, endtag, endtagid, refendtagstart)
247                                         }
248                                         if len(refpart) > 10000 {
249                                                 log.Warnf("%x tilevar %d,%d skipping long diff, ref %s seq %s ref len %d", tv.Blake2b[:13], tag, variant, refcs.Name, seqname, len(refpart))
250                                                 continue
251                                         }
252                                         if len(tv.Sequence) > 10000 {
253                                                 log.Warnf("%x tilevar %d,%d skipping long diff, ref %s seq %s variant len %d", tv.Blake2b[:13], tag, variant, refcs.Name, seqname, len(tv.Sequence))
254                                                 continue
255                                         }
256                                         // log.Printf("\n%x @ refstart %d \n< %s\n> %s\n", tv.Blake2b, refstart, refpart, tv.Sequence)
257
258                                         diffwg.Add(1)
259                                         limiter <- true
260                                         go func() {
261                                                 defer func() {
262                                                         <-limiter
263                                                         diffwg.Done()
264                                                 }()
265                                                 diffs, _ := hgvs.Diff(strings.ToUpper(string(refpart)), strings.ToUpper(string(tv.Sequence)), 0)
266                                                 for _, diff := range diffs {
267                                                         diff.Position += refstart
268                                                         var varid string
269                                                         if cmd.variantHash {
270                                                                 varid = fmt.Sprintf("%x", tv.Blake2b)[:13]
271                                                         } else {
272                                                                 varid = strconv.Itoa(variant)
273                                                         }
274                                                         outch <- fmt.Sprintf("%d\t%s\t%s\t%s:g.%s\n", tag, varid, refcs.Name, seqname, diff.String())
275                                                 }
276                                         }()
277                                 }
278                         }
279                 }
280         }
281         return nil
282 }