Gzip gob files.
[lightning.git] / annotate.go
1 package main
2
3 import (
4         "bufio"
5         "context"
6         "errors"
7         "flag"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "net/http"
12         _ "net/http/pprof"
13         "os"
14         "runtime"
15         "sort"
16         "strconv"
17         "strings"
18         "sync"
19
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "github.com/arvados/lightning/hgvs"
22         log "github.com/sirupsen/logrus"
23 )
24
25 type annotatecmd struct {
26         variantHash bool
27         maxTileSize int
28         tag2tagid   map[string]tagID
29 }
30
31 func (cmd *annotatecmd) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
32         var err error
33         defer func() {
34                 if err != nil {
35                         fmt.Fprintf(stderr, "%s\n", err)
36                 }
37         }()
38         flags := flag.NewFlagSet("", flag.ContinueOnError)
39         flags.SetOutput(stderr)
40         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
41         runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
42         projectUUID := flags.String("project", "", "project `UUID` for output data")
43         priority := flags.Int("priority", 500, "container request priority")
44         inputFilename := flags.String("i", "-", "input `file` (library)")
45         outputFilename := flags.String("o", "-", "output `file`")
46         flags.BoolVar(&cmd.variantHash, "variant-hash", false, "output variant hash instead of index")
47         flags.IntVar(&cmd.maxTileSize, "max-tile-size", 50000, "don't try to make annotations for tiles bigger than given `size`")
48         err = flags.Parse(args)
49         if err == flag.ErrHelp {
50                 err = nil
51                 return 0
52         } else if err != nil {
53                 return 2
54         }
55
56         if *pprof != "" {
57                 go func() {
58                         log.Println(http.ListenAndServe(*pprof, nil))
59                 }()
60         }
61         if !*runlocal {
62                 if *outputFilename != "-" {
63                         err = errors.New("cannot specify output file in container mode: not implemented")
64                         return 1
65                 }
66                 runner := arvadosContainerRunner{
67                         Name:        "lightning annotate",
68                         Client:      arvados.NewClientFromEnv(),
69                         ProjectUUID: *projectUUID,
70                         RAM:         80000000000,
71                         VCPUs:       16,
72                         Priority:    *priority,
73                 }
74                 err = runner.TranslatePaths(inputFilename)
75                 if err != nil {
76                         return 1
77                 }
78                 runner.Args = []string{"annotate", "-local=true", fmt.Sprintf("-variant-hash=%v", cmd.variantHash), "-max-tile-size", strconv.Itoa(cmd.maxTileSize), "-i", *inputFilename, "-o", "/mnt/output/tilevariants.tsv"}
79                 var output string
80                 output, err = runner.Run()
81                 if err != nil {
82                         return 1
83                 }
84                 fmt.Fprintln(stdout, output+"/tilevariants.tsv")
85                 return 0
86         }
87
88         var input io.ReadCloser
89         if *inputFilename == "-" {
90                 input = ioutil.NopCloser(stdin)
91         } else {
92                 input, err = os.Open(*inputFilename)
93                 if err != nil {
94                         return 1
95                 }
96                 defer input.Close()
97         }
98
99         var output io.WriteCloser
100         if *outputFilename == "-" {
101                 output = nopCloser{stdout}
102         } else {
103                 output, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0666)
104                 if err != nil {
105                         return 1
106                 }
107                 defer output.Close()
108         }
109         bufw := bufio.NewWriter(output)
110
111         tilelib := &tileLibrary{
112                 retainNoCalls:       true,
113                 retainTileSequences: true,
114         }
115         err = tilelib.LoadGob(context.Background(), input, strings.HasSuffix(*inputFilename, ".gz"), nil)
116         if err != nil {
117                 return 1
118         }
119         err = cmd.exportTileDiffs(bufw, tilelib)
120         if err != nil {
121                 return 1
122         }
123         err = bufw.Flush()
124         if err != nil {
125                 return 1
126         }
127         err = output.Close()
128         if err != nil {
129                 return 1
130         }
131         err = input.Close()
132         if err != nil {
133                 return 1
134         }
135         return 0
136 }
137
138 func (cmd *annotatecmd) exportTileDiffs(outw io.Writer, tilelib *tileLibrary) error {
139         tagset := tilelib.taglib.Tags()
140         if len(tagset) == 0 {
141                 return errors.New("cannot annotate library without tags")
142         }
143         taglen := len(tagset[0])
144         var refs []string
145         for name := range tilelib.refseqs {
146                 refs = append(refs, name)
147         }
148         cmd.tag2tagid = make(map[string]tagID, len(tagset))
149         for tagid, tagseq := range tagset {
150                 cmd.tag2tagid[string(tagseq)] = tagID(tagid)
151         }
152         sort.Strings(refs)
153         log.Infof("len(refs) %d", len(refs))
154
155         outch := make(chan string, runtime.NumCPU()*2)
156         var outwg sync.WaitGroup
157         defer outwg.Wait()
158         outwg.Add(1)
159         go func() {
160                 defer outwg.Done()
161                 for s := range outch {
162                         io.WriteString(outw, s)
163                 }
164         }()
165         defer close(outch)
166
167         nseqs := 0
168         for _, refcs := range tilelib.refseqs {
169                 nseqs += len(refcs)
170         }
171
172         throttle := &throttle{Max: runtime.NumCPU()*2 + nseqs*2 + 1}
173         defer throttle.Wait()
174
175         for _, refname := range refs {
176                 refname := refname
177                 refcs := tilelib.refseqs[refname]
178                 var seqnames []string
179                 for seqname := range refcs {
180                         seqnames = append(seqnames, seqname)
181                 }
182                 sort.Strings(seqnames)
183                 for _, seqname := range seqnames {
184                         seqname := seqname
185                         throttle.Acquire()
186                         go func() {
187                                 defer throttle.Release()
188                                 throttle.Report(cmd.annotateSequence(throttle, outch, tilelib, taglen, refname, seqname, refcs[seqname], len(refs) > 1))
189                         }()
190                 }
191         }
192         throttle.Wait()
193         return throttle.Err()
194 }
195
196 func (cmd *annotatecmd) annotateSequence(throttle *throttle, outch chan<- string, tilelib *tileLibrary, taglen int, refname, seqname string, reftiles []tileLibRef, refnamecol bool) error {
197         var refseq []byte
198         // tilestart[123] is the index into refseq
199         // where the tile for tag 123 was placed.
200         tilestart := map[tagID]int{}
201         tileend := map[tagID]int{}
202         for _, libref := range reftiles {
203                 if libref.Variant < 1 {
204                         return fmt.Errorf("reference %q seq %q uses variant zero at tag %d", refname, seqname, libref.Tag)
205                 }
206                 seq := tilelib.TileVariantSequence(libref)
207                 if len(seq) < taglen {
208                         return fmt.Errorf("reference %q seq %q uses tile %d variant %d with sequence len %d < taglen %d", refname, seqname, libref.Tag, libref.Variant, len(seq), taglen)
209                 }
210                 overlap := taglen
211                 if len(refseq) == 0 {
212                         overlap = 0
213                 }
214                 tilestart[libref.Tag] = len(refseq) - overlap
215                 refseq = append(refseq, seq[overlap:]...)
216                 tileend[libref.Tag] = len(refseq)
217         }
218         log.Infof("seq %s len(refseq) %d len(tilestart) %d", seqname, len(refseq), len(tilestart))
219         for tag, tvs := range tilelib.variant {
220                 tag := tagID(tag)
221                 refstart, ok := tilestart[tag]
222                 if !ok {
223                         // Tag didn't place on this
224                         // reference sequence. (It
225                         // might place on the same
226                         // chromosome in a genome
227                         // anyway, but we don't output
228                         // the annotations that would
229                         // result.)
230                         continue
231                 }
232                 for variant := 1; variant <= len(tvs); variant++ {
233                         variant, hash := tileVariantID(variant), tvs[variant-1]
234                         tileseq := tilelib.TileVariantSequence(tileLibRef{Tag: tag, Variant: variant})
235                         if len(tileseq) == 0 {
236                                 continue
237                         } else if len(tileseq) < taglen {
238                                 return fmt.Errorf("tilevar %d,%d has sequence len %d < taglen %d", tag, variant, len(tileseq), taglen)
239                         }
240                         var refpart []byte
241                         endtag := string(tileseq[len(tileseq)-taglen:])
242                         if endtagid, ok := cmd.tag2tagid[endtag]; !ok {
243                                 // Tile variant doesn't end on a tag, so it can only place at the end of a chromosome.
244                                 refpart = refseq[refstart:]
245                                 log.Warnf("%x tilevar %d,%d endtag not in ref: %s", hash[:13], tag, variant, endtag)
246                         } else if refendtagstart, ok := tilestart[endtagid]; !ok {
247                                 // Ref ends a chromsome with a (possibly very large) variant of this tile, but genomes with this tile don't.
248                                 // Give up. (TODO: something smarter)
249                                 log.Debugf("%x not annotating tilevar %d,%d because end tag %d is not in ref", hash[:13], tag, variant, endtagid)
250                                 continue
251                         } else {
252                                 // Non-terminal tile vs. non-terminal reference.
253                                 refpart = refseq[refstart : refendtagstart+taglen]
254                                 log.Tracef("\n%x tilevar %d,%d endtag %s endtagid %d refendtagstart %d", hash[:13], tag, variant, endtag, endtagid, refendtagstart)
255                         }
256                         if len(refpart) > cmd.maxTileSize {
257                                 log.Warnf("%x tilevar %d,%d skipping long diff, ref %s seq %s pos %d ref len %d", hash[:13], tag, variant, refname, seqname, refstart, len(refpart))
258                                 continue
259                         }
260                         if len(tileseq) > cmd.maxTileSize {
261                                 log.Warnf("%x tilevar %d,%d skipping long diff, ref %s seq %s variant len %d", hash[:13], tag, variant, refname, seqname, len(tileseq))
262                                 continue
263                         }
264                         // log.Printf("\n%x @ refstart %d \n< %s\n> %s\n", tv.Blake2b, refstart, refpart, tileseq)
265
266                         throttle.Acquire()
267                         go func() {
268                                 defer throttle.Release()
269                                 diffs, _ := hgvs.Diff(strings.ToUpper(string(refpart)), strings.ToUpper(string(tileseq)), 0)
270                                 for _, diff := range diffs {
271                                         diff.Position += refstart
272                                         var varid string
273                                         if cmd.variantHash {
274                                                 varid = fmt.Sprintf("%x", hash)[:13]
275                                         } else {
276                                                 varid = fmt.Sprintf("%d", variant)
277                                         }
278                                         refnamefield := ""
279                                         if refnamecol {
280                                                 refnamefield = "\t" + refname
281                                         }
282                                         outch <- fmt.Sprintf("%d\t%s%s\t%s:g.%s\n", tag, varid, refnamefield, seqname, diff.String())
283                                 }
284                         }()
285                 }
286         }
287         return nil
288 }