max-tile-size option.
[lightning.git] / annotate.go
1 package main
2
3 import (
4         "bufio"
5         "errors"
6         "flag"
7         "fmt"
8         "io"
9         "io/ioutil"
10         "net/http"
11         _ "net/http/pprof"
12         "os"
13         "runtime"
14         "sort"
15         "strconv"
16         "strings"
17         "sync"
18
19         "git.arvados.org/arvados.git/sdk/go/arvados"
20         "github.com/arvados/lightning/hgvs"
21         log "github.com/sirupsen/logrus"
22 )
23
24 type annotatecmd struct {
25         variantHash bool
26         maxTileSize int
27 }
28
29 func (cmd *annotatecmd) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
30         var err error
31         defer func() {
32                 if err != nil {
33                         fmt.Fprintf(stderr, "%s\n", err)
34                 }
35         }()
36         flags := flag.NewFlagSet("", flag.ContinueOnError)
37         flags.SetOutput(stderr)
38         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
39         runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
40         projectUUID := flags.String("project", "", "project `UUID` for output data")
41         priority := flags.Int("priority", 500, "container request priority")
42         inputFilename := flags.String("i", "-", "input `file` (library)")
43         outputFilename := flags.String("o", "-", "output `file`")
44         flags.BoolVar(&cmd.variantHash, "variant-hash", false, "output variant hash instead of index")
45         flags.IntVar(&cmd.maxTileSize, "max-tile-size", 50000, "don't try to make annotations for tiles bigger than given `size`")
46         err = flags.Parse(args)
47         if err == flag.ErrHelp {
48                 err = nil
49                 return 0
50         } else if err != nil {
51                 return 2
52         }
53
54         if *pprof != "" {
55                 go func() {
56                         log.Println(http.ListenAndServe(*pprof, nil))
57                 }()
58         }
59         if !*runlocal {
60                 if *outputFilename != "-" {
61                         err = errors.New("cannot specify output file in container mode: not implemented")
62                         return 1
63                 }
64                 runner := arvadosContainerRunner{
65                         Name:        "lightning annotate",
66                         Client:      arvados.NewClientFromEnv(),
67                         ProjectUUID: *projectUUID,
68                         RAM:         80000000000,
69                         VCPUs:       16,
70                         Priority:    *priority,
71                 }
72                 err = runner.TranslatePaths(inputFilename)
73                 if err != nil {
74                         return 1
75                 }
76                 runner.Args = []string{"annotate", "-local=true", fmt.Sprintf("-variant-hash=%v", cmd.variantHash), "-max-tile-size", strconv.Itoa(cmd.maxTileSize), "-i", *inputFilename, "-o", "/mnt/output/tilevariants.tsv"}
77                 var output string
78                 output, err = runner.Run()
79                 if err != nil {
80                         return 1
81                 }
82                 fmt.Fprintln(stdout, output+"/tilevariants.tsv")
83                 return 0
84         }
85
86         var input io.ReadCloser
87         if *inputFilename == "-" {
88                 input = ioutil.NopCloser(stdin)
89         } else {
90                 input, err = os.Open(*inputFilename)
91                 if err != nil {
92                         return 1
93                 }
94                 defer input.Close()
95         }
96
97         var output io.WriteCloser
98         if *outputFilename == "-" {
99                 output = nopCloser{stdout}
100         } else {
101                 output, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0666)
102                 if err != nil {
103                         return 1
104                 }
105                 defer output.Close()
106         }
107         bufw := bufio.NewWriter(output)
108
109         err = cmd.exportTileDiffs(bufw, input)
110         if err != nil {
111                 return 1
112         }
113         err = bufw.Flush()
114         if err != nil {
115                 return 1
116         }
117         err = output.Close()
118         if err != nil {
119                 return 1
120         }
121         err = input.Close()
122         if err != nil {
123                 return 1
124         }
125         return 0
126 }
127
128 func (cmd *annotatecmd) exportTileDiffs(outw io.Writer, librdr io.Reader) error {
129         var refs []CompactSequence
130         var tiles [][]TileVariant
131         var tagset [][]byte
132         var taglen int
133         err := DecodeLibrary(librdr, func(ent *LibraryEntry) error {
134                 if len(ent.TagSet) > 0 {
135                         if tagset != nil {
136                                 return errors.New("error: not implemented: input has multiple tagsets")
137                         }
138                         tagset = ent.TagSet
139                         taglen = len(tagset[0])
140                         tiles = make([][]TileVariant, len(tagset))
141                 }
142                 for _, tv := range ent.TileVariants {
143                         if tv.Tag >= tagID(len(tiles)) {
144                                 return fmt.Errorf("error: reading tile variant for tag %d but only %d tags were loaded", tv.Tag, len(tiles))
145                         }
146                         for len(tiles[tv.Tag]) <= int(tv.Variant) {
147                                 tiles[tv.Tag] = append(tiles[tv.Tag], TileVariant{})
148                         }
149                         tiles[tv.Tag][tv.Variant] = tv
150                 }
151                 for _, cs := range ent.CompactSequences {
152                         refs = append(refs, cs)
153                 }
154                 return nil
155         })
156         if err != nil {
157                 return err
158         }
159         tag2tagid := make(map[string]tagID, len(tagset))
160         for tagid, tagseq := range tagset {
161                 tag2tagid[string(tagseq)] = tagID(tagid)
162         }
163         sort.Slice(refs, func(i, j int) bool { return refs[i].Name < refs[j].Name })
164         log.Infof("len(refs) %d", len(refs))
165
166         outch := make(chan string, 1)
167         var outwg sync.WaitGroup
168         defer outwg.Wait()
169         outwg.Add(1)
170         go func() {
171                 defer outwg.Done()
172                 for s := range outch {
173                         io.WriteString(outw, s)
174                 }
175         }()
176         defer close(outch)
177
178         limiter := make(chan bool, runtime.NumCPU()+1)
179         var diffwg sync.WaitGroup
180         defer diffwg.Wait()
181
182         for _, refcs := range refs {
183                 refcs := refcs
184                 var seqnames []string
185                 for seqname := range refcs.TileSequences {
186                         seqnames = append(seqnames, seqname)
187                 }
188                 sort.Strings(seqnames)
189                 for _, seqname := range seqnames {
190                         seqname := seqname
191                         var refseq []byte
192                         // tilestart[123] is the index into refseq
193                         // where the tile for tag 123 was placed.
194                         tilestart := map[tagID]int{}
195                         tileend := map[tagID]int{}
196                         for _, libref := range refcs.TileSequences[seqname] {
197                                 if libref.Variant < 1 {
198                                         return fmt.Errorf("reference %q seq %q uses variant zero at tag %d", refcs.Name, seqname, libref.Tag)
199                                 }
200                                 seq := tiles[libref.Tag][libref.Variant].Sequence
201                                 if len(seq) < taglen {
202                                         return fmt.Errorf("reference %q seq %q uses tile %d variant %d with sequence len %d < taglen %d", refcs.Name, seqname, libref.Tag, libref.Variant, len(seq), taglen)
203                                 }
204                                 overlap := taglen
205                                 if len(refseq) == 0 {
206                                         overlap = 0
207                                 }
208                                 tilestart[libref.Tag] = len(refseq) - overlap
209                                 refseq = append(refseq, seq[overlap:]...)
210                                 tileend[libref.Tag] = len(refseq)
211                         }
212                         log.Infof("seq %s len(refseq) %d len(tilestart) %d", seqname, len(refseq), len(tilestart))
213                         for tag, tvs := range tiles {
214                                 tag := tagID(tag)
215                                 refstart, ok := tilestart[tag]
216                                 if !ok {
217                                         // Tag didn't place on this
218                                         // reference sequence. (It
219                                         // might place on the same
220                                         // chromosome in a genome
221                                         // anyway, but we don't output
222                                         // the annotations that would
223                                         // result.)
224                                         continue
225                                 }
226                                 for variant, tv := range tvs {
227                                         variant, tv := variant, tv
228                                         if variant == 0 {
229                                                 continue
230                                         }
231                                         if len(tv.Sequence) < taglen {
232                                                 return fmt.Errorf("tilevar %d,%d has sequence len %d < taglen %d", tag, variant, len(tv.Sequence), taglen)
233                                         }
234                                         var refpart []byte
235                                         endtag := string(tv.Sequence[len(tv.Sequence)-taglen:])
236                                         if endtagid, ok := tag2tagid[endtag]; !ok {
237                                                 // Tile variant doesn't end on a tag, so it can only place at the end of a chromosome.
238                                                 refpart = refseq[refstart:]
239                                                 log.Warnf("%x tilevar %d,%d endtag not in ref: %s", tv.Blake2b[:13], tag, variant, endtag)
240                                         } else if refendtagstart, ok := tilestart[endtagid]; !ok {
241                                                 // Ref ends a chromsome with a (possibly very large) variant of this tile, but genomes with this tile don't.
242                                                 // Give up. (TODO: something smarter)
243                                                 log.Debugf("%x not annotating tilevar %d,%d because end tag %d is not in ref", tv.Blake2b[:13], tag, variant, endtagid)
244                                                 continue
245                                         } else {
246                                                 // Non-terminal tile vs. non-terminal reference.
247                                                 refpart = refseq[refstart : refendtagstart+taglen]
248                                                 log.Tracef("\n%x tilevar %d,%d endtag %s endtagid %d refendtagstart %d", tv.Blake2b[:13], tag, variant, endtag, endtagid, refendtagstart)
249                                         }
250                                         if len(refpart) > cmd.maxTileSize {
251                                                 log.Warnf("%x tilevar %d,%d skipping long diff, ref %s seq %s ref len %d", tv.Blake2b[:13], tag, variant, refcs.Name, seqname, len(refpart))
252                                                 continue
253                                         }
254                                         if len(tv.Sequence) > cmd.maxTileSize {
255                                                 log.Warnf("%x tilevar %d,%d skipping long diff, ref %s seq %s variant len %d", tv.Blake2b[:13], tag, variant, refcs.Name, seqname, len(tv.Sequence))
256                                                 continue
257                                         }
258                                         // log.Printf("\n%x @ refstart %d \n< %s\n> %s\n", tv.Blake2b, refstart, refpart, tv.Sequence)
259
260                                         diffwg.Add(1)
261                                         limiter <- true
262                                         go func() {
263                                                 defer func() {
264                                                         <-limiter
265                                                         diffwg.Done()
266                                                 }()
267                                                 diffs, _ := hgvs.Diff(strings.ToUpper(string(refpart)), strings.ToUpper(string(tv.Sequence)), 0)
268                                                 for _, diff := range diffs {
269                                                         diff.Position += refstart
270                                                         var varid string
271                                                         if cmd.variantHash {
272                                                                 varid = fmt.Sprintf("%x", tv.Blake2b)[:13]
273                                                         } else {
274                                                                 varid = strconv.Itoa(variant)
275                                                         }
276                                                         outch <- fmt.Sprintf("%d\t%s\t%s\t%s:g.%s\n", tag, varid, refcs.Name, seqname, diff.String())
277                                                 }
278                                         }()
279                                 }
280                         }
281                 }
282         }
283         return nil
284 }