Export bed file.
[lightning.git] / export.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "context"
7         "errors"
8         "flag"
9         "fmt"
10         "io"
11         "net/http"
12         _ "net/http/pprof"
13         "os"
14         "sort"
15         "strings"
16         "sync"
17         "time"
18
19         "git.arvados.org/arvados.git/sdk/go/arvados"
20         "github.com/arvados/lightning/hgvs"
21         log "github.com/sirupsen/logrus"
22 )
23
24 type outputFormat struct {
25         Print   func(out io.Writer, seqname string, varslice []hgvs.Variant)
26         PadLeft bool
27 }
28
29 var (
30         outputFormats = map[string]outputFormat{
31                 "hgvs": outputFormatHGVS,
32                 "vcf":  outputFormatVCF,
33         }
34         outputFormatHGVS = outputFormat{Print: printHGVS}
35         outputFormatVCF  = outputFormat{Print: printVCF, PadLeft: true}
36 )
37
38 type exporter struct {
39         outputFormat outputFormat
40 }
41
42 func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
43         var err error
44         defer func() {
45                 if err != nil {
46                         fmt.Fprintf(stderr, "%s\n", err)
47                 }
48         }()
49         flags := flag.NewFlagSet("", flag.ContinueOnError)
50         flags.SetOutput(stderr)
51         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
52         runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
53         projectUUID := flags.String("project", "", "project `UUID` for output data")
54         priority := flags.Int("priority", 500, "container request priority")
55         refname := flags.String("ref", "", "reference genome `name`")
56         inputFilename := flags.String("i", "-", "input `file` (library)")
57         outputFilename := flags.String("o", "-", "output `file`")
58         outputFormatStr := flags.String("output-format", "hgvs", "output `format`: hgvs or vcf")
59         outputBed := flags.String("output-bed", "", "also output bed `file`")
60         pick := flags.String("pick", "", "`name` of single genome to export")
61         err = flags.Parse(args)
62         if err == flag.ErrHelp {
63                 err = nil
64                 return 0
65         } else if err != nil {
66                 return 2
67         }
68
69         if f, ok := outputFormats[*outputFormatStr]; !ok {
70                 err = fmt.Errorf("invalid output format %q", *outputFormatStr)
71                 return 2
72         } else {
73                 cmd.outputFormat = f
74         }
75
76         if *pprof != "" {
77                 go func() {
78                         log.Println(http.ListenAndServe(*pprof, nil))
79                 }()
80         }
81
82         if !*runlocal {
83                 if *outputFilename != "-" {
84                         err = errors.New("cannot specify output file in container mode: not implemented")
85                         return 1
86                 }
87                 runner := arvadosContainerRunner{
88                         Name:        "lightning export",
89                         Client:      arvados.NewClientFromEnv(),
90                         ProjectUUID: *projectUUID,
91                         RAM:         128000000000,
92                         VCPUs:       2,
93                         Priority:    *priority,
94                 }
95                 err = runner.TranslatePaths(inputFilename)
96                 if err != nil {
97                         return 1
98                 }
99                 if *outputBed != "" {
100                         if strings.Contains(*outputBed, "/") {
101                                 err = fmt.Errorf("cannot use -output-bed filename %q containing '/' char", *outputBed)
102                                 return 1
103                         }
104                         *outputBed = "/mnt/output/" + *outputBed
105                 }
106                 runner.Args = []string{"export", "-local=true", "-pick", *pick, "-ref", *refname, "-output-format", *outputFormatStr, "-output-bed", *outputBed, "-i", *inputFilename, "-o", "/mnt/output/export.csv"}
107                 var output string
108                 output, err = runner.Run()
109                 if err != nil {
110                         return 1
111                 }
112                 fmt.Fprintln(stdout, output+"/export.csv")
113                 return 0
114         }
115
116         input, err := os.Open(*inputFilename)
117         if err != nil {
118                 return 1
119         }
120         defer input.Close()
121
122         // Error out early if seeking doesn't work on the input file.
123         _, err = input.Seek(0, io.SeekEnd)
124         if err != nil {
125                 return 1
126         }
127         _, err = input.Seek(0, io.SeekStart)
128         if err != nil {
129                 return 1
130         }
131
132         var mtx sync.Mutex
133         var cgs []CompactGenome
134         var tilelib tileLibrary
135         err = tilelib.LoadGob(context.Background(), input, func(cg CompactGenome) {
136                 if *pick != "" && *pick != cg.Name {
137                         return
138                 }
139                 log.Debugf("export: pick %q", cg.Name)
140                 mtx.Lock()
141                 defer mtx.Unlock()
142                 cgs = append(cgs, cg)
143         })
144         if err != nil {
145                 return 1
146         }
147         sort.Slice(cgs, func(i, j int) bool { return cgs[i].Name < cgs[j].Name })
148         log.Printf("export: pick %q => %d genomes", *pick, len(cgs))
149
150         refseq, ok := tilelib.refseqs[*refname]
151         if !ok {
152                 err = fmt.Errorf("reference name %q not found in input; have %v", *refname, func() (names []string) {
153                         for name := range tilelib.refseqs {
154                                 names = append(names, name)
155                         }
156                         return
157                 }())
158                 return 1
159         }
160
161         _, err = input.Seek(0, io.SeekStart)
162         if err != nil {
163                 return 1
164         }
165
166         var output io.WriteCloser
167         if *outputFilename == "-" {
168                 output = nopCloser{stdout}
169         } else {
170                 output, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0666)
171                 if err != nil {
172                         return 1
173                 }
174                 defer output.Close()
175         }
176         bufw := bufio.NewWriter(output)
177
178         var bedout *os.File
179         var bedbufw *bufio.Writer
180         if *outputBed != "" {
181                 bedout, err = os.OpenFile(*outputBed, os.O_CREATE|os.O_WRONLY, 0666)
182                 if err != nil {
183                         return 1
184                 }
185                 defer bedout.Close()
186                 bedbufw = bufio.NewWriter(bedout)
187         }
188
189         err = cmd.export(bufw, bedout, input, tilelib.taglib.keylen, refseq, cgs)
190         if err != nil {
191                 return 1
192         }
193         err = bufw.Flush()
194         if err != nil {
195                 return 1
196         }
197         err = output.Close()
198         if err != nil {
199                 return 1
200         }
201         if bedout != nil {
202                 err = bedbufw.Flush()
203                 if err != nil {
204                         return 1
205                 }
206                 err = bedout.Close()
207                 if err != nil {
208                         return 1
209                 }
210         }
211         err = input.Close()
212         if err != nil {
213                 return 1
214         }
215         return 0
216 }
217
218 func (cmd *exporter) export(out, bedout io.Writer, librdr io.Reader, taglen int, refseq map[string][]tileLibRef, cgs []CompactGenome) error {
219         need := map[tileLibRef]bool{}
220         var seqnames []string
221         for seqname, librefs := range refseq {
222                 seqnames = append(seqnames, seqname)
223                 for _, libref := range librefs {
224                         if libref.Variant != 0 {
225                                 need[libref] = true
226                         }
227                 }
228         }
229         sort.Strings(seqnames)
230
231         for _, cg := range cgs {
232                 for i, variant := range cg.Variants {
233                         if variant == 0 {
234                                 continue
235                         }
236                         libref := tileLibRef{Tag: tagID(i / 2), Variant: variant}
237                         need[libref] = true
238                 }
239         }
240
241         log.Infof("export: loading %d tile variants", len(need))
242         tileVariant := map[tileLibRef]TileVariant{}
243         err := DecodeLibrary(librdr, func(ent *LibraryEntry) error {
244                 for _, tv := range ent.TileVariants {
245                         libref := tileLibRef{Tag: tv.Tag, Variant: tv.Variant}
246                         if need[libref] {
247                                 tileVariant[libref] = tv
248                         }
249                 }
250                 return nil
251         })
252         if err != nil {
253                 return err
254         }
255
256         log.Infof("export: loaded %d tile variants", len(tileVariant))
257         var missing []tileLibRef
258         for libref := range need {
259                 if _, ok := tileVariant[libref]; !ok {
260                         missing = append(missing, libref)
261                 }
262         }
263         if len(missing) > 0 {
264                 if limit := 100; len(missing) > limit {
265                         log.Warnf("first %d missing tiles: %v", limit, missing[:limit])
266                 } else {
267                         log.Warnf("missing tiles: %v", missing)
268                 }
269                 return fmt.Errorf("%d needed tiles are missing from library", len(missing))
270         }
271
272         refpos := 0
273         for _, seqname := range seqnames {
274                 variantAt := map[int][]hgvs.Variant{} // variantAt[chromOffset][genomeIndex*2+phase]
275                 for refstep, libref := range refseq[seqname] {
276                         reftile := tileVariant[libref]
277                         coverage := int64(0) // number of ref bases that are called in genomes -- max is len(reftile.Sequence)*len(cgs)*2
278                         for cgidx, cg := range cgs {
279                                 for phase := 0; phase < 2; phase++ {
280                                         if len(cg.Variants) <= int(libref.Tag)*2+phase {
281                                                 continue
282                                         }
283                                         variant := cg.Variants[int(libref.Tag)*2+phase]
284                                         if variant == 0 {
285                                                 continue
286                                         }
287                                         genometile := tileVariant[tileLibRef{Tag: libref.Tag, Variant: variant}]
288                                         if variant == libref.Variant {
289                                                 continue
290                                         }
291                                         refSequence := reftile.Sequence
292                                         // If needed, extend the
293                                         // reference sequence up to
294                                         // the tag at the end of the
295                                         // genometile sequence.
296                                         refstepend := refstep + 1
297                                         for refstepend < len(refseq[seqname]) && len(refSequence) >= taglen && !bytes.EqualFold(refSequence[len(refSequence)-taglen:], genometile.Sequence[len(genometile.Sequence)-taglen:]) {
298                                                 if &refSequence[0] == &reftile.Sequence[0] {
299                                                         refSequence = append([]byte(nil), refSequence...)
300                                                 }
301                                                 refSequence = append(refSequence, tileVariant[refseq[seqname][refstepend]].Sequence...)
302                                                 refstepend++
303                                         }
304                                         // (TODO: handle no-calls)
305                                         vars, _ := hgvs.Diff(strings.ToUpper(string(refSequence)), strings.ToUpper(string(genometile.Sequence)), time.Second)
306                                         for _, v := range vars {
307                                                 if cmd.outputFormat.PadLeft {
308                                                         v = v.PadLeft()
309                                                 }
310                                                 v.Position += refpos
311                                                 log.Debugf("%s seq %s phase %d tag %d tile diff %s\n", cg.Name, seqname, phase, libref.Tag, v.String())
312                                                 varslice := variantAt[v.Position]
313                                                 if varslice == nil {
314                                                         varslice = make([]hgvs.Variant, len(cgs)*2)
315                                                         variantAt[v.Position] = varslice
316                                                 }
317                                                 varslice[cgidx*2+phase] = v
318                                         }
319                                         coverage += int64(len(reftile.Sequence))
320                                 }
321                         }
322                         refpos += len(reftile.Sequence) - taglen
323
324                         // Flush entries from variantAt that are
325                         // behind refpos. Flush all entries if this is
326                         // the last reftile of the path/chromosome.
327                         var flushpos []int
328                         lastrefstep := refstep == len(refseq[seqname])-1
329                         for pos := range variantAt {
330                                 if lastrefstep || pos <= refpos {
331                                         flushpos = append(flushpos, pos)
332                                 }
333                         }
334                         sort.Slice(flushpos, func(i, j int) bool { return flushpos[i] < flushpos[j] })
335                         for _, pos := range flushpos {
336                                 varslice := variantAt[pos]
337                                 delete(variantAt, pos)
338                                 for i := range varslice {
339                                         if varslice[i].Position == 0 {
340                                                 varslice[i].Position = pos
341                                         }
342                                 }
343                                 cmd.outputFormat.Print(out, seqname, varslice)
344                         }
345                         if bedout != nil && len(reftile.Sequence) > 0 {
346                                 tilestart := refpos - len(reftile.Sequence) + taglen
347                                 tileend := refpos
348                                 if !lastrefstep {
349                                         tileend += taglen
350                                 }
351                                 thickstart := tilestart + taglen
352                                 if refstep == 0 {
353                                         thickstart = 0
354                                 }
355                                 thickend := refpos
356                                 // coverage score, 0 to 1000
357                                 score := 1000 * coverage / int64(len(reftile.Sequence)) / int64(len(cgs)) / 2
358                                 fmt.Fprintf(bedout, "%s %d %d %d %d . %d %d\n",
359                                         seqname, tilestart, tileend,
360                                         libref.Tag,
361                                         score,
362                                         thickstart, thickend)
363                         }
364                 }
365         }
366         return nil
367 }
368
369 func printVCF(out io.Writer, seqname string, varslice []hgvs.Variant) {
370         refs := map[string]map[string]int{}
371         for _, v := range varslice {
372                 if v.Ref == "" && v.New == "" {
373                         continue
374                 }
375                 alts := refs[v.Ref]
376                 if alts == nil {
377                         alts = map[string]int{}
378                         refs[v.Ref] = alts
379                 }
380                 alts[v.New] = 0
381         }
382         for ref, alts := range refs {
383                 var altslice []string
384                 for alt := range alts {
385                         altslice = append(altslice, alt)
386                 }
387                 sort.Strings(altslice)
388                 for i, a := range altslice {
389                         alts[a] = i + 1
390                 }
391                 fmt.Fprintf(out, "%s\t%d\t%s\t%s", seqname, varslice[0].Position, ref, strings.Join(altslice, ","))
392                 for i := 0; i < len(varslice)/2; i++ {
393                         v1, v2 := varslice[i*2], varslice[i*2+1]
394                         a1, a2 := alts[v1.New], alts[v2.New]
395                         if v1.Ref != ref {
396                                 a1 = 0
397                         }
398                         if v2.Ref != ref {
399                                 a2 = 0
400                         }
401                         fmt.Fprintf(out, "\t%d/%d", a1, a2)
402                 }
403                 out.Write([]byte{'\n'})
404         }
405 }
406
407 func printHGVS(out io.Writer, seqname string, varslice []hgvs.Variant) {
408         for i := 0; i < len(varslice)/2; i++ {
409                 if i > 0 {
410                         out.Write([]byte{'\t'})
411                 }
412                 var1, var2 := varslice[i*2], varslice[i*2+1]
413                 if var1 == var2 {
414                         if var1.Ref == var1.New {
415                                 out.Write([]byte{'.'})
416                         } else {
417                                 fmt.Fprintf(out, "%s:g.%s", seqname, var1.String())
418                         }
419                 } else {
420                         fmt.Fprintf(out, "%s:g.[%s];[%s]", seqname, var1.String(), var2.String())
421                 }
422         }
423         out.Write([]byte{'\n'})
424 }