Export VCF-ish.
[lightning.git] / export.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "context"
7         "errors"
8         "flag"
9         "fmt"
10         "io"
11         "net/http"
12         _ "net/http/pprof"
13         "os"
14         "sort"
15         "strings"
16         "sync"
17         "time"
18
19         "git.arvados.org/arvados.git/sdk/go/arvados"
20         "github.com/arvados/lightning/hgvs"
21         log "github.com/sirupsen/logrus"
22 )
23
24 type outputFormat struct {
25         Print   func(out io.Writer, seqname string, varslice []hgvs.Variant)
26         PadLeft bool
27 }
28
29 var (
30         outputFormats = map[string]outputFormat{
31                 "hgvs": outputFormatHGVS,
32                 "vcf":  outputFormatVCF,
33         }
34         outputFormatHGVS = outputFormat{Print: printHGVS}
35         outputFormatVCF  = outputFormat{Print: printVCF, PadLeft: true}
36 )
37
38 type exporter struct {
39         outputFormat outputFormat
40 }
41
42 func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
43         var err error
44         defer func() {
45                 if err != nil {
46                         fmt.Fprintf(stderr, "%s\n", err)
47                 }
48         }()
49         flags := flag.NewFlagSet("", flag.ContinueOnError)
50         flags.SetOutput(stderr)
51         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
52         runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
53         projectUUID := flags.String("project", "", "project `UUID` for output data")
54         priority := flags.Int("priority", 500, "container request priority")
55         refname := flags.String("ref", "", "reference genome `name`")
56         inputFilename := flags.String("i", "-", "input `file` (library)")
57         outputFilename := flags.String("o", "-", "output `file`")
58         outputFormatStr := flags.String("output-format", "hgvs", "output `format`: hgvs or vcf")
59         pick := flags.String("pick", "", "`name` of single genome to export")
60         err = flags.Parse(args)
61         if err == flag.ErrHelp {
62                 err = nil
63                 return 0
64         } else if err != nil {
65                 return 2
66         }
67
68         if f, ok := outputFormats[*outputFormatStr]; !ok {
69                 err = fmt.Errorf("invalid output format %q", *outputFormatStr)
70                 return 2
71         } else {
72                 cmd.outputFormat = f
73         }
74
75         if *pprof != "" {
76                 go func() {
77                         log.Println(http.ListenAndServe(*pprof, nil))
78                 }()
79         }
80
81         if !*runlocal {
82                 if *outputFilename != "-" {
83                         err = errors.New("cannot specify output file in container mode: not implemented")
84                         return 1
85                 }
86                 runner := arvadosContainerRunner{
87                         Name:        "lightning export",
88                         Client:      arvados.NewClientFromEnv(),
89                         ProjectUUID: *projectUUID,
90                         RAM:         128000000000,
91                         VCPUs:       2,
92                         Priority:    *priority,
93                 }
94                 err = runner.TranslatePaths(inputFilename)
95                 if err != nil {
96                         return 1
97                 }
98                 runner.Args = []string{"export", "-local=true", "-pick", *pick, "-ref", *refname, "-output-format", *outputFormatStr, "-i", *inputFilename, "-o", "/mnt/output/export.csv"}
99                 var output string
100                 output, err = runner.Run()
101                 if err != nil {
102                         return 1
103                 }
104                 fmt.Fprintln(stdout, output+"/export.csv")
105                 return 0
106         }
107
108         input, err := os.Open(*inputFilename)
109         if err != nil {
110                 return 1
111         }
112         defer input.Close()
113
114         // Error out early if seeking doesn't work on the input file.
115         _, err = input.Seek(0, io.SeekEnd)
116         if err != nil {
117                 return 1
118         }
119         _, err = input.Seek(0, io.SeekStart)
120         if err != nil {
121                 return 1
122         }
123
124         var mtx sync.Mutex
125         var cgs []CompactGenome
126         var tilelib tileLibrary
127         err = tilelib.LoadGob(context.Background(), input, func(cg CompactGenome) {
128                 if *pick != "" && *pick != cg.Name {
129                         return
130                 }
131                 log.Debugf("export: pick %q", cg.Name)
132                 mtx.Lock()
133                 defer mtx.Unlock()
134                 cgs = append(cgs, cg)
135         })
136         if err != nil {
137                 return 1
138         }
139         sort.Slice(cgs, func(i, j int) bool { return cgs[i].Name < cgs[j].Name })
140         log.Printf("export: pick %q => %d genomes", *pick, len(cgs))
141
142         refseq, ok := tilelib.refseqs[*refname]
143         if !ok {
144                 err = fmt.Errorf("reference name %q not found in input; have %v", *refname, func() (names []string) {
145                         for name := range tilelib.refseqs {
146                                 names = append(names, name)
147                         }
148                         return
149                 }())
150                 return 1
151         }
152
153         _, err = input.Seek(0, io.SeekStart)
154         if err != nil {
155                 return 1
156         }
157
158         var output io.WriteCloser
159         if *outputFilename == "-" {
160                 output = nopCloser{stdout}
161         } else {
162                 output, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
163                 if err != nil {
164                         return 1
165                 }
166                 defer output.Close()
167         }
168         bufw := bufio.NewWriter(output)
169         err = cmd.export(bufw, input, tilelib.taglib.keylen, refseq, cgs)
170         if err != nil {
171                 return 1
172         }
173         err = bufw.Flush()
174         if err != nil {
175                 return 1
176         }
177         err = output.Close()
178         if err != nil {
179                 return 1
180         }
181         err = input.Close()
182         if err != nil {
183                 return 1
184         }
185         return 0
186 }
187
188 func (cmd *exporter) export(out io.Writer, librdr io.Reader, taglen int, refseq map[string][]tileLibRef, cgs []CompactGenome) error {
189         need := map[tileLibRef]bool{}
190         var seqnames []string
191         for seqname, librefs := range refseq {
192                 seqnames = append(seqnames, seqname)
193                 for _, libref := range librefs {
194                         if libref.Variant != 0 {
195                                 need[libref] = true
196                         }
197                 }
198         }
199         sort.Strings(seqnames)
200
201         for _, cg := range cgs {
202                 for i, variant := range cg.Variants {
203                         if variant == 0 {
204                                 continue
205                         }
206                         libref := tileLibRef{Tag: tagID(i / 2), Variant: variant}
207                         need[libref] = true
208                 }
209         }
210
211         log.Infof("export: loading %d tile variants", len(need))
212         tileVariant := map[tileLibRef]TileVariant{}
213         err := DecodeLibrary(librdr, func(ent *LibraryEntry) error {
214                 for _, tv := range ent.TileVariants {
215                         libref := tileLibRef{Tag: tv.Tag, Variant: tv.Variant}
216                         if need[libref] {
217                                 tileVariant[libref] = tv
218                         }
219                 }
220                 return nil
221         })
222         if err != nil {
223                 return err
224         }
225
226         log.Infof("export: loaded %d tile variants", len(tileVariant))
227         var missing []tileLibRef
228         for libref := range need {
229                 if _, ok := tileVariant[libref]; !ok {
230                         missing = append(missing, libref)
231                 }
232         }
233         if len(missing) > 0 {
234                 if limit := 100; len(missing) > limit {
235                         log.Warnf("first %d missing tiles: %v", limit, missing[:limit])
236                 } else {
237                         log.Warnf("missing tiles: %v", missing)
238                 }
239                 return fmt.Errorf("%d needed tiles are missing from library", len(missing))
240         }
241
242         refpos := 0
243         for _, seqname := range seqnames {
244                 variantAt := map[int][]hgvs.Variant{} // variantAt[chromOffset][genomeIndex*2+phase]
245                 for refstep, libref := range refseq[seqname] {
246                         reftile := tileVariant[libref]
247                         for cgidx, cg := range cgs {
248                                 for phase := 0; phase < 2; phase++ {
249                                         if len(cg.Variants) <= int(libref.Tag)*2+phase {
250                                                 continue
251                                         }
252                                         variant := cg.Variants[int(libref.Tag)*2+phase]
253                                         if variant == 0 {
254                                                 continue
255                                         }
256                                         genometile := tileVariant[tileLibRef{Tag: libref.Tag, Variant: variant}]
257                                         if variant == libref.Variant {
258                                                 continue
259                                         }
260                                         refSequence := reftile.Sequence
261                                         // If needed, extend the
262                                         // reference sequence up to
263                                         // the tag at the end of the
264                                         // genometile sequence.
265                                         refstepend := refstep + 1
266                                         for refstepend < len(refseq[seqname]) && len(refSequence) >= taglen && !bytes.EqualFold(refSequence[len(refSequence)-taglen:], genometile.Sequence[len(genometile.Sequence)-taglen:]) {
267                                                 if &refSequence[0] == &reftile.Sequence[0] {
268                                                         refSequence = append([]byte(nil), refSequence...)
269                                                 }
270                                                 refSequence = append(refSequence, tileVariant[refseq[seqname][refstepend]].Sequence...)
271                                                 refstepend++
272                                         }
273                                         vars, _ := hgvs.Diff(strings.ToUpper(string(refSequence)), strings.ToUpper(string(genometile.Sequence)), time.Second)
274                                         for _, v := range vars {
275                                                 if cmd.outputFormat.PadLeft {
276                                                         v = v.PadLeft()
277                                                 }
278                                                 v.Position += refpos
279                                                 log.Debugf("%s seq %s phase %d tag %d tile diff %s\n", cg.Name, seqname, phase, libref.Tag, v.String())
280                                                 varslice := variantAt[v.Position]
281                                                 if varslice == nil {
282                                                         varslice = make([]hgvs.Variant, len(cgs)*2)
283                                                         variantAt[v.Position] = varslice
284                                                 }
285                                                 varslice[cgidx*2+phase] = v
286                                         }
287                                 }
288                         }
289                         refpos += len(reftile.Sequence) - taglen
290
291                         // Flush entries from variantAt that are
292                         // behind refpos. Flush all entries if this is
293                         // the last reftile of the path/chromosome.
294                         var flushpos []int
295                         lastrefstep := refstep == len(refseq[seqname])-1
296                         for pos := range variantAt {
297                                 if lastrefstep || pos <= refpos {
298                                         flushpos = append(flushpos, pos)
299                                 }
300                         }
301                         sort.Slice(flushpos, func(i, j int) bool { return flushpos[i] < flushpos[j] })
302                         for _, pos := range flushpos {
303                                 varslice := variantAt[pos]
304                                 delete(variantAt, pos)
305                                 for i := range varslice {
306                                         if varslice[i].Position == 0 {
307                                                 varslice[i].Position = pos
308                                         }
309                                 }
310                                 cmd.outputFormat.Print(out, seqname, varslice)
311                         }
312                 }
313         }
314         return nil
315 }
316
317 func printVCF(out io.Writer, seqname string, varslice []hgvs.Variant) {
318         refs := map[string]map[string]int{}
319         for _, v := range varslice {
320                 if v.Ref == "" && v.New == "" {
321                         continue
322                 }
323                 alts := refs[v.Ref]
324                 if alts == nil {
325                         alts = map[string]int{}
326                         refs[v.Ref] = alts
327                 }
328                 alts[v.New] = 0
329         }
330         for ref, alts := range refs {
331                 var altslice []string
332                 for alt := range alts {
333                         altslice = append(altslice, alt)
334                 }
335                 sort.Strings(altslice)
336                 for i, a := range altslice {
337                         alts[a] = i + 1
338                 }
339                 fmt.Fprintf(out, "%s\t%d\t%s\t%s", seqname, varslice[0].Position, ref, strings.Join(altslice, ","))
340                 for i := 0; i < len(varslice)/2; i++ {
341                         v1, v2 := varslice[i*2], varslice[i*2+1]
342                         a1, a2 := alts[v1.New], alts[v2.New]
343                         if v1.Ref != ref {
344                                 a1 = 0
345                         }
346                         if v2.Ref != ref {
347                                 a2 = 0
348                         }
349                         fmt.Fprintf(out, "\t%d/%d", a1, a2)
350                 }
351                 out.Write([]byte{'\n'})
352         }
353 }
354
355 func printHGVS(out io.Writer, seqname string, varslice []hgvs.Variant) {
356         for i := 0; i < len(varslice)/2; i++ {
357                 if i > 0 {
358                         out.Write([]byte{'\t'})
359                 }
360                 var1, var2 := varslice[i*2], varslice[i*2+1]
361                 if var1 == var2 {
362                         if var1.Ref == var1.New {
363                                 out.Write([]byte{'.'})
364                         } else {
365                                 fmt.Fprintf(out, "%s:g.%s", seqname, var1.String())
366                         }
367                 } else {
368                         fmt.Fprintf(out, "%s:g.[%s];[%s]", seqname, var1.String(), var2.String())
369                 }
370         }
371         out.Write([]byte{'\n'})
372 }