Write annotations along with numpy.
[lightning.git] / annotate.go
1 package main
2
3 import (
4         "bufio"
5         "context"
6         "errors"
7         "flag"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "net/http"
12         _ "net/http/pprof"
13         "os"
14         "runtime"
15         "sort"
16         "strconv"
17         "strings"
18         "sync"
19
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "github.com/arvados/lightning/hgvs"
22         log "github.com/sirupsen/logrus"
23 )
24
25 type annotatecmd struct {
26         variantHash bool
27         maxTileSize int
28 }
29
30 func (cmd *annotatecmd) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
31         var err error
32         defer func() {
33                 if err != nil {
34                         fmt.Fprintf(stderr, "%s\n", err)
35                 }
36         }()
37         flags := flag.NewFlagSet("", flag.ContinueOnError)
38         flags.SetOutput(stderr)
39         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
40         runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
41         projectUUID := flags.String("project", "", "project `UUID` for output data")
42         priority := flags.Int("priority", 500, "container request priority")
43         inputFilename := flags.String("i", "-", "input `file` (library)")
44         outputFilename := flags.String("o", "-", "output `file`")
45         flags.BoolVar(&cmd.variantHash, "variant-hash", false, "output variant hash instead of index")
46         flags.IntVar(&cmd.maxTileSize, "max-tile-size", 50000, "don't try to make annotations for tiles bigger than given `size`")
47         err = flags.Parse(args)
48         if err == flag.ErrHelp {
49                 err = nil
50                 return 0
51         } else if err != nil {
52                 return 2
53         }
54
55         if *pprof != "" {
56                 go func() {
57                         log.Println(http.ListenAndServe(*pprof, nil))
58                 }()
59         }
60         if !*runlocal {
61                 if *outputFilename != "-" {
62                         err = errors.New("cannot specify output file in container mode: not implemented")
63                         return 1
64                 }
65                 runner := arvadosContainerRunner{
66                         Name:        "lightning annotate",
67                         Client:      arvados.NewClientFromEnv(),
68                         ProjectUUID: *projectUUID,
69                         RAM:         80000000000,
70                         VCPUs:       16,
71                         Priority:    *priority,
72                 }
73                 err = runner.TranslatePaths(inputFilename)
74                 if err != nil {
75                         return 1
76                 }
77                 runner.Args = []string{"annotate", "-local=true", fmt.Sprintf("-variant-hash=%v", cmd.variantHash), "-max-tile-size", strconv.Itoa(cmd.maxTileSize), "-i", *inputFilename, "-o", "/mnt/output/tilevariants.tsv"}
78                 var output string
79                 output, err = runner.Run()
80                 if err != nil {
81                         return 1
82                 }
83                 fmt.Fprintln(stdout, output+"/tilevariants.tsv")
84                 return 0
85         }
86
87         var input io.ReadCloser
88         if *inputFilename == "-" {
89                 input = ioutil.NopCloser(stdin)
90         } else {
91                 input, err = os.Open(*inputFilename)
92                 if err != nil {
93                         return 1
94                 }
95                 defer input.Close()
96         }
97
98         var output io.WriteCloser
99         if *outputFilename == "-" {
100                 output = nopCloser{stdout}
101         } else {
102                 output, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0666)
103                 if err != nil {
104                         return 1
105                 }
106                 defer output.Close()
107         }
108         bufw := bufio.NewWriter(output)
109
110         tilelib := &tileLibrary{
111                 includeNoCalls:      true,
112                 retainTileSequences: true,
113         }
114         err = tilelib.LoadGob(context.Background(), input, nil)
115         if err != nil {
116                 return 1
117         }
118         err = cmd.exportTileDiffs(bufw, tilelib)
119         if err != nil {
120                 return 1
121         }
122         err = bufw.Flush()
123         if err != nil {
124                 return 1
125         }
126         err = output.Close()
127         if err != nil {
128                 return 1
129         }
130         err = input.Close()
131         if err != nil {
132                 return 1
133         }
134         return 0
135 }
136
137 func (cmd *annotatecmd) exportTileDiffs(outw io.Writer, tilelib *tileLibrary) error {
138         tagset := tilelib.taglib.Tags()
139         if len(tagset) == 0 {
140                 return errors.New("cannot annotate library without tags")
141         }
142         taglen := len(tagset[0])
143         var refs []string
144         for name := range tilelib.refseqs {
145                 refs = append(refs, name)
146         }
147         tag2tagid := make(map[string]tagID, len(tagset))
148         for tagid, tagseq := range tagset {
149                 tag2tagid[string(tagseq)] = tagID(tagid)
150         }
151         sort.Strings(refs)
152         log.Infof("len(refs) %d", len(refs))
153
154         outch := make(chan string, 1)
155         var outwg sync.WaitGroup
156         defer outwg.Wait()
157         outwg.Add(1)
158         go func() {
159                 defer outwg.Done()
160                 for s := range outch {
161                         io.WriteString(outw, s)
162                 }
163         }()
164         defer close(outch)
165
166         limiter := make(chan bool, runtime.NumCPU()+1)
167         var diffwg sync.WaitGroup
168         defer diffwg.Wait()
169
170         for _, refname := range refs {
171                 refname := refname
172                 refcs := tilelib.refseqs[refname]
173                 var seqnames []string
174                 for seqname := range refcs {
175                         seqnames = append(seqnames, seqname)
176                 }
177                 sort.Strings(seqnames)
178                 for _, seqname := range seqnames {
179                         seqname := seqname
180                         var refseq []byte
181                         // tilestart[123] is the index into refseq
182                         // where the tile for tag 123 was placed.
183                         tilestart := map[tagID]int{}
184                         tileend := map[tagID]int{}
185                         for _, libref := range refcs[seqname] {
186                                 if libref.Variant < 1 {
187                                         return fmt.Errorf("reference %q seq %q uses variant zero at tag %d", refname, seqname, libref.Tag)
188                                 }
189                                 seq := tilelib.TileVariantSequence(libref)
190                                 if len(seq) < taglen {
191                                         return fmt.Errorf("reference %q seq %q uses tile %d variant %d with sequence len %d < taglen %d", refname, seqname, libref.Tag, libref.Variant, len(seq), taglen)
192                                 }
193                                 overlap := taglen
194                                 if len(refseq) == 0 {
195                                         overlap = 0
196                                 }
197                                 tilestart[libref.Tag] = len(refseq) - overlap
198                                 refseq = append(refseq, seq[overlap:]...)
199                                 tileend[libref.Tag] = len(refseq)
200                         }
201                         log.Infof("seq %s len(refseq) %d len(tilestart) %d", seqname, len(refseq), len(tilestart))
202                         for tag, tvs := range tilelib.variant {
203                                 tag := tagID(tag)
204                                 refstart, ok := tilestart[tag]
205                                 if !ok {
206                                         // Tag didn't place on this
207                                         // reference sequence. (It
208                                         // might place on the same
209                                         // chromosome in a genome
210                                         // anyway, but we don't output
211                                         // the annotations that would
212                                         // result.)
213                                         continue
214                                 }
215                                 for variant := 1; variant <= len(tvs); variant++ {
216                                         variant, hash := tileVariantID(variant), tvs[variant-1]
217                                         tileseq := tilelib.TileVariantSequence(tileLibRef{Tag: tag, Variant: variant})
218                                         if len(tileseq) < taglen {
219                                                 return fmt.Errorf("tilevar %d,%d has sequence len %d < taglen %d", tag, variant, len(tileseq), taglen)
220                                         }
221                                         var refpart []byte
222                                         endtag := string(tileseq[len(tileseq)-taglen:])
223                                         if endtagid, ok := tag2tagid[endtag]; !ok {
224                                                 // Tile variant doesn't end on a tag, so it can only place at the end of a chromosome.
225                                                 refpart = refseq[refstart:]
226                                                 log.Warnf("%x tilevar %d,%d endtag not in ref: %s", hash[:13], tag, variant, endtag)
227                                         } else if refendtagstart, ok := tilestart[endtagid]; !ok {
228                                                 // Ref ends a chromsome with a (possibly very large) variant of this tile, but genomes with this tile don't.
229                                                 // Give up. (TODO: something smarter)
230                                                 log.Debugf("%x not annotating tilevar %d,%d because end tag %d is not in ref", hash[:13], tag, variant, endtagid)
231                                                 continue
232                                         } else {
233                                                 // Non-terminal tile vs. non-terminal reference.
234                                                 refpart = refseq[refstart : refendtagstart+taglen]
235                                                 log.Tracef("\n%x tilevar %d,%d endtag %s endtagid %d refendtagstart %d", hash[:13], tag, variant, endtag, endtagid, refendtagstart)
236                                         }
237                                         if len(refpart) > cmd.maxTileSize {
238                                                 log.Warnf("%x tilevar %d,%d skipping long diff, ref %s seq %s pos %d ref len %d", hash[:13], tag, variant, refname, seqname, refstart, len(refpart))
239                                                 continue
240                                         }
241                                         if len(tileseq) > cmd.maxTileSize {
242                                                 log.Warnf("%x tilevar %d,%d skipping long diff, ref %s seq %s variant len %d", hash[:13], tag, variant, refname, seqname, len(tileseq))
243                                                 continue
244                                         }
245                                         // log.Printf("\n%x @ refstart %d \n< %s\n> %s\n", tv.Blake2b, refstart, refpart, tileseq)
246
247                                         diffwg.Add(1)
248                                         limiter <- true
249                                         go func() {
250                                                 defer func() {
251                                                         <-limiter
252                                                         diffwg.Done()
253                                                 }()
254                                                 diffs, _ := hgvs.Diff(strings.ToUpper(string(refpart)), strings.ToUpper(string(tileseq)), 0)
255                                                 for _, diff := range diffs {
256                                                         diff.Position += refstart
257                                                         var varid string
258                                                         if cmd.variantHash {
259                                                                 varid = fmt.Sprintf("%x", hash)[:13]
260                                                         } else {
261                                                                 varid = fmt.Sprintf("%d", variant)
262                                                         }
263                                                         outch <- fmt.Sprintf("%d\t%s\t%s\t%s:g.%s\n", tag, varid, refname, seqname, diff.String())
264                                                 }
265                                         }()
266                                 }
267                         }
268                 }
269         }
270         return nil
271 }