Save runtime profile data periodically.
[lightning.git] / export.go
1 package lightning
2
3 import (
4         "bufio"
5         "bytes"
6         "context"
7         "errors"
8         "flag"
9         "fmt"
10         "io"
11         "net/http"
12         _ "net/http/pprof"
13         "os"
14         "path"
15         "sort"
16         "strings"
17         "sync"
18         "time"
19
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "github.com/arvados/lightning/hgvs"
22         log "github.com/sirupsen/logrus"
23 )
24
25 type outputFormat struct {
26         Print   func(out io.Writer, seqname string, varslice []hgvs.Variant)
27         PadLeft bool
28 }
29
30 var (
31         outputFormats = map[string]outputFormat{
32                 "hgvs-onehot": outputFormatHGVSOneHot,
33                 "hgvs":        outputFormatHGVS,
34                 "vcf":         outputFormatVCF,
35         }
36         outputFormatHGVS       = outputFormat{Print: printHGVS}
37         outputFormatHGVSOneHot = outputFormat{Print: printHGVSOneHot}
38         outputFormatVCF        = outputFormat{Print: printVCF, PadLeft: true}
39 )
40
41 type exporter struct {
42         outputFormat outputFormat
43 }
44
45 func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
46         var err error
47         defer func() {
48                 if err != nil {
49                         fmt.Fprintf(stderr, "%s\n", err)
50                 }
51         }()
52         flags := flag.NewFlagSet("", flag.ContinueOnError)
53         flags.SetOutput(stderr)
54         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
55         pprofdir := flags.String("pprof-dir", "", "write Go profile data to `directory` periodically")
56         runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
57         projectUUID := flags.String("project", "", "project `UUID` for output data")
58         priority := flags.Int("priority", 500, "container request priority")
59         refname := flags.String("ref", "", "reference genome `name`")
60         inputFilename := flags.String("i", "-", "input `file` (library)")
61         outputFilename := flags.String("o", "-", "output `file`")
62         outputFormatStr := flags.String("output-format", "hgvs", "output `format`: hgvs or vcf")
63         outputBed := flags.String("output-bed", "", "also output bed `file`")
64         labelsFilename := flags.String("output-labels", "", "also output genome labels csv `file`")
65         err = flags.Parse(args)
66         if err == flag.ErrHelp {
67                 err = nil
68                 return 0
69         } else if err != nil {
70                 return 2
71         }
72
73         if f, ok := outputFormats[*outputFormatStr]; !ok {
74                 err = fmt.Errorf("invalid output format %q", *outputFormatStr)
75                 return 2
76         } else {
77                 cmd.outputFormat = f
78         }
79
80         if *pprof != "" {
81                 go func() {
82                         log.Println(http.ListenAndServe(*pprof, nil))
83                 }()
84         }
85         if *pprofdir != "" {
86                 go writeProfilesPeriodically(*pprofdir)
87         }
88
89         if !*runlocal {
90                 if *outputFilename != "-" {
91                         err = errors.New("cannot specify output file in container mode: not implemented")
92                         return 1
93                 }
94                 runner := arvadosContainerRunner{
95                         Name:        "lightning export",
96                         Client:      arvados.NewClientFromEnv(),
97                         ProjectUUID: *projectUUID,
98                         RAM:         1600000000000,
99                         VCPUs:       64,
100                         Priority:    *priority,
101                 }
102                 err = runner.TranslatePaths(inputFilename)
103                 if err != nil {
104                         return 1
105                 }
106                 if *outputBed != "" {
107                         if strings.Contains(*outputBed, "/") {
108                                 err = fmt.Errorf("cannot use -output-bed filename %q containing '/' char", *outputBed)
109                                 return 1
110                         }
111                         *outputBed = "/mnt/output/" + *outputBed
112                 }
113                 runner.Args = []string{"export", "-local=true",
114                         "-pprof", ":6000",
115                         "-pprof-dir", "/mnt/output",
116                         "-ref", *refname,
117                         "-output-format", *outputFormatStr,
118                         "-output-bed", *outputBed,
119                         "-output-labels", "/mnt/output/labels.csv",
120                         "-i", *inputFilename,
121                         "-o", "/mnt/output/export.csv",
122                 }
123                 var output string
124                 output, err = runner.Run()
125                 if err != nil {
126                         return 1
127                 }
128                 fmt.Fprintln(stdout, output+"/export.csv")
129                 return 0
130         }
131
132         in, err := open(*inputFilename)
133         if err != nil {
134                 return 1
135         }
136         defer in.Close()
137         input, ok := in.(io.ReadSeeker)
138         if !ok {
139                 err = fmt.Errorf("%s: %T cannot seek", *inputFilename, in)
140                 return 1
141         }
142
143         // Error out early if seeking doesn't work on the input file.
144         _, err = input.Seek(0, io.SeekEnd)
145         if err != nil {
146                 return 1
147         }
148         _, err = input.Seek(0, io.SeekStart)
149         if err != nil {
150                 return 1
151         }
152
153         var cgs []CompactGenome
154         tilelib := &tileLibrary{
155                 retainNoCalls:  true,
156                 compactGenomes: map[string][]tileVariantID{},
157         }
158         err = tilelib.LoadGob(context.Background(), input, strings.HasSuffix(*inputFilename, ".gz"), nil)
159         if err != nil {
160                 return 1
161         }
162
163         refseq, ok := tilelib.refseqs[*refname]
164         if !ok {
165                 err = fmt.Errorf("reference name %q not found in input; have %v", *refname, func() (names []string) {
166                         for name := range tilelib.refseqs {
167                                 names = append(names, name)
168                         }
169                         return
170                 }())
171                 return 1
172         }
173
174         names := cgnames(tilelib)
175         for _, name := range names {
176                 cgs = append(cgs, CompactGenome{Name: name, Variants: tilelib.compactGenomes[name]})
177         }
178         if *labelsFilename != "" {
179                 log.Infof("writing labels to %s", *labelsFilename)
180                 var f *os.File
181                 f, err = os.OpenFile(*labelsFilename, os.O_CREATE|os.O_WRONLY, 0777)
182                 if err != nil {
183                         return 1
184                 }
185                 defer f.Close()
186                 _, outBasename := path.Split(*outputFilename)
187                 for i, name := range names {
188                         _, err = fmt.Fprintf(f, "%d,%q,%q\n", i, trimFilenameForLabel(name), outBasename)
189                         if err != nil {
190                                 err = fmt.Errorf("write %s: %w", *labelsFilename, err)
191                                 return 1
192                         }
193                 }
194                 err = f.Close()
195                 if err != nil {
196                         err = fmt.Errorf("close %s: %w", *labelsFilename, err)
197                         return 1
198                 }
199         }
200
201         _, err = input.Seek(0, io.SeekStart)
202         if err != nil {
203                 return 1
204         }
205
206         var output io.WriteCloser
207         if *outputFilename == "-" {
208                 output = nopCloser{stdout}
209         } else {
210                 output, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0666)
211                 if err != nil {
212                         return 1
213                 }
214                 defer output.Close()
215         }
216         bufw := bufio.NewWriter(output)
217
218         var bedout *os.File
219         var bedbufw *bufio.Writer
220         if *outputBed != "" {
221                 bedout, err = os.OpenFile(*outputBed, os.O_CREATE|os.O_WRONLY, 0666)
222                 if err != nil {
223                         return 1
224                 }
225                 defer bedout.Close()
226                 bedbufw = bufio.NewWriter(bedout)
227         }
228
229         err = cmd.export(bufw, bedout, input, strings.HasSuffix(*inputFilename, ".gz"), tilelib, refseq, cgs)
230         if err != nil {
231                 return 1
232         }
233         err = bufw.Flush()
234         if err != nil {
235                 return 1
236         }
237         err = output.Close()
238         if err != nil {
239                 return 1
240         }
241         if bedout != nil {
242                 err = bedbufw.Flush()
243                 if err != nil {
244                         return 1
245                 }
246                 err = bedout.Close()
247                 if err != nil {
248                         return 1
249                 }
250         }
251         err = in.Close()
252         if err != nil {
253                 return 1
254         }
255         return 0
256 }
257
258 func (cmd *exporter) export(out, bedout io.Writer, librdr io.Reader, gz bool, tilelib *tileLibrary, refseq map[string][]tileLibRef, cgs []CompactGenome) error {
259         need := map[tileLibRef]bool{}
260         var seqnames []string
261         for seqname, librefs := range refseq {
262                 seqnames = append(seqnames, seqname)
263                 for _, libref := range librefs {
264                         if libref.Variant != 0 {
265                                 need[libref] = true
266                         }
267                 }
268         }
269         sort.Strings(seqnames)
270
271         for _, cg := range cgs {
272                 for i, variant := range cg.Variants {
273                         if variant == 0 {
274                                 continue
275                         }
276                         libref := tileLibRef{Tag: tagID(i / 2), Variant: variant}
277                         need[libref] = true
278                 }
279         }
280
281         log.Infof("export: loading %d tile variants", len(need))
282         tileVariant := map[tileLibRef]TileVariant{}
283         err := DecodeLibrary(librdr, gz, func(ent *LibraryEntry) error {
284                 for _, tv := range ent.TileVariants {
285                         libref := tilelib.getRef(tv.Tag, tv.Sequence)
286                         if need[libref] {
287                                 tileVariant[libref] = tv
288                         }
289                 }
290                 return nil
291         })
292         if err != nil {
293                 return err
294         }
295
296         log.Infof("export: loaded %d tile variants", len(tileVariant))
297         var missing []tileLibRef
298         for libref := range need {
299                 if _, ok := tileVariant[libref]; !ok {
300                         missing = append(missing, libref)
301                 }
302         }
303         if len(missing) > 0 {
304                 if limit := 100; len(missing) > limit {
305                         log.Warnf("first %d missing tiles: %v", limit, missing[:limit])
306                 } else {
307                         log.Warnf("missing tiles: %v", missing)
308                 }
309                 return fmt.Errorf("%d needed tiles are missing from library", len(missing))
310         }
311
312         outw := make([]io.WriteCloser, len(seqnames))
313         bedw := make([]io.WriteCloser, len(seqnames))
314
315         var merges sync.WaitGroup
316         merge := func(dst io.Writer, src []io.WriteCloser, label string) {
317                 var mtx sync.Mutex
318                 for i, seqname := range seqnames {
319                         pr, pw := io.Pipe()
320                         src[i] = pw
321                         merges.Add(1)
322                         seqname := seqname
323                         go func() {
324                                 defer merges.Done()
325                                 log.Infof("writing %s %s", seqname, label)
326                                 scanner := bufio.NewScanner(pr)
327                                 for scanner.Scan() {
328                                         mtx.Lock()
329                                         dst.Write(scanner.Bytes())
330                                         dst.Write([]byte{'\n'})
331                                         mtx.Unlock()
332                                 }
333                                 log.Infof("writing %s %s done", seqname, label)
334                         }()
335                 }
336         }
337         merge(out, outw, "output")
338         if bedout != nil {
339                 merge(bedout, bedw, "bed")
340         }
341
342         throttle := throttle{Max: 8}
343         log.Infof("assembling %d sequences in %d goroutines", len(seqnames), throttle.Max)
344         for seqidx, seqname := range seqnames {
345                 seqidx, seqname := seqidx, seqname
346                 outw := outw[seqidx]
347                 bedw := bedw[seqidx]
348                 throttle.Acquire()
349                 go func() {
350                         defer throttle.Release()
351                         if bedw != nil {
352                                 defer bedw.Close()
353                         }
354                         defer outw.Close()
355                         outwb := bufio.NewWriter(outw)
356                         defer outwb.Flush()
357                         cmd.exportSeq(outwb, bedw, tilelib.taglib.keylen, seqname, refseq[seqname], tileVariant, cgs)
358                 }()
359         }
360
361         merges.Wait()
362         return nil
363 }
364
365 // Align genome tiles to reference tiles, write diffs to outw, and (if
366 // bedw is not nil) write tile coverage to bedw.
367 func (cmd *exporter) exportSeq(outw, bedw io.Writer, taglen int, seqname string, reftiles []tileLibRef, tileVariant map[tileLibRef]TileVariant, cgs []CompactGenome) {
368         refpos := 0
369         variantAt := map[int][]hgvs.Variant{} // variantAt[chromOffset][genomeIndex*2+phase]
370         for refstep, libref := range reftiles {
371                 reftile := tileVariant[libref]
372                 tagcoverage := 0 // number of times the start tag was found in genomes -- max is len(cgs)*2
373                 for cgidx, cg := range cgs {
374                         for phase := 0; phase < 2; phase++ {
375                                 if len(cg.Variants) <= int(libref.Tag)*2+phase {
376                                         continue
377                                 }
378                                 variant := cg.Variants[int(libref.Tag)*2+phase]
379                                 if variant == 0 {
380                                         continue
381                                 }
382                                 tagcoverage++
383                                 if variant == libref.Variant {
384                                         continue
385                                 }
386                                 genometile := tileVariant[tileLibRef{Tag: libref.Tag, Variant: variant}]
387                                 if len(genometile.Sequence) == 0 {
388                                         // Hash is known but sequence
389                                         // is not, e.g., retainNoCalls
390                                         // was false during import
391                                         continue
392                                 }
393                                 refSequence := reftile.Sequence
394                                 // If needed, extend the reference
395                                 // sequence up to the tag at the end
396                                 // of the genometile sequence.
397                                 refstepend := refstep + 1
398                                 for refstepend < len(reftiles) && len(refSequence) >= taglen && !bytes.EqualFold(refSequence[len(refSequence)-taglen:], genometile.Sequence[len(genometile.Sequence)-taglen:]) {
399                                         if &refSequence[0] == &reftile.Sequence[0] {
400                                                 refSequence = append([]byte(nil), refSequence...)
401                                         }
402                                         refSequence = append(refSequence, tileVariant[reftiles[refstepend]].Sequence...)
403                                         refstepend++
404                                 }
405                                 // (TODO: handle no-calls)
406                                 vars, _ := hgvs.Diff(strings.ToUpper(string(refSequence)), strings.ToUpper(string(genometile.Sequence)), time.Second)
407                                 for _, v := range vars {
408                                         if cmd.outputFormat.PadLeft {
409                                                 v = v.PadLeft()
410                                         }
411                                         v.Position += refpos
412                                         varslice := variantAt[v.Position]
413                                         if varslice == nil {
414                                                 varslice = make([]hgvs.Variant, len(cgs)*2)
415                                                 variantAt[v.Position] = varslice
416                                         }
417                                         varslice[cgidx*2+phase] = v
418                                 }
419                         }
420                 }
421                 refpos += len(reftile.Sequence) - taglen
422
423                 // Flush entries from variantAt that are behind
424                 // refpos. Flush all entries if this is the last
425                 // reftile of the path/chromosome.
426                 var flushpos []int
427                 lastrefstep := refstep == len(reftiles)-1
428                 for pos := range variantAt {
429                         if lastrefstep || pos <= refpos {
430                                 flushpos = append(flushpos, pos)
431                         }
432                 }
433                 sort.Slice(flushpos, func(i, j int) bool { return flushpos[i] < flushpos[j] })
434                 for _, pos := range flushpos {
435                         varslice := variantAt[pos]
436                         delete(variantAt, pos)
437                         for i := range varslice {
438                                 if varslice[i].Position == 0 {
439                                         varslice[i].Position = pos
440                                 }
441                         }
442                         cmd.outputFormat.Print(outw, seqname, varslice)
443                 }
444                 if bedw != nil && len(reftile.Sequence) > 0 {
445                         tilestart := refpos - len(reftile.Sequence) + taglen
446                         tileend := refpos
447                         if !lastrefstep {
448                                 tileend += taglen
449                         }
450                         thickstart := tilestart + taglen
451                         if refstep == 0 {
452                                 thickstart = 0
453                         }
454                         thickend := refpos
455
456                         // coverage score, 0 to 1000
457                         score := 1000
458                         if len(cgs) > 0 {
459                                 score = 1000 * tagcoverage / len(cgs) / 2
460                         }
461
462                         fmt.Fprintf(bedw, "%s %d %d %d %d . %d %d\n",
463                                 seqname, tilestart, tileend,
464                                 libref.Tag,
465                                 score,
466                                 thickstart, thickend)
467                 }
468         }
469 }
470
471 func printVCF(out io.Writer, seqname string, varslice []hgvs.Variant) {
472         refs := map[string]map[string]int{}
473         for _, v := range varslice {
474                 if v.Ref == "" && v.New == "" {
475                         continue
476                 }
477                 alts := refs[v.Ref]
478                 if alts == nil {
479                         alts = map[string]int{}
480                         refs[v.Ref] = alts
481                 }
482                 alts[v.New] = 0
483         }
484         for ref, alts := range refs {
485                 var altslice []string
486                 for alt := range alts {
487                         altslice = append(altslice, alt)
488                 }
489                 sort.Strings(altslice)
490                 for i, a := range altslice {
491                         alts[a] = i + 1
492                 }
493                 fmt.Fprintf(out, "%s\t%d\t%s\t%s", seqname, varslice[0].Position, ref, strings.Join(altslice, ","))
494                 for i := 0; i < len(varslice); i += 2 {
495                         v1, v2 := varslice[i], varslice[i+1]
496                         a1, a2 := alts[v1.New], alts[v2.New]
497                         if v1.Ref != ref {
498                                 a1 = 0
499                         }
500                         if v2.Ref != ref {
501                                 a2 = 0
502                         }
503                         fmt.Fprintf(out, "\t%d/%d", a1, a2)
504                 }
505                 out.Write([]byte{'\n'})
506         }
507 }
508
509 func printHGVS(out io.Writer, seqname string, varslice []hgvs.Variant) {
510         for i := 0; i < len(varslice)/2; i++ {
511                 if i > 0 {
512                         out.Write([]byte{'\t'})
513                 }
514                 var1, var2 := varslice[i*2], varslice[i*2+1]
515                 if var1 == var2 {
516                         if var1.Ref == var1.New {
517                                 out.Write([]byte{'.'})
518                         } else {
519                                 fmt.Fprintf(out, "%s:g.%s", seqname, var1.String())
520                         }
521                 } else {
522                         fmt.Fprintf(out, "%s:g.[%s];[%s]", seqname, var1.String(), var2.String())
523                 }
524         }
525         out.Write([]byte{'\n'})
526 }
527
528 func printHGVSOneHot(out io.Writer, seqname string, varslice []hgvs.Variant) {
529         vars := map[hgvs.Variant]bool{}
530         for _, v := range varslice {
531                 if v.Ref != v.New {
532                         vars[v] = true
533                 }
534         }
535
536         // sort variants to ensure output is deterministic
537         sorted := make([]hgvs.Variant, 0, len(vars))
538         for v := range vars {
539                 sorted = append(sorted, v)
540         }
541         sort.Slice(sorted, func(a, b int) bool { return hgvs.Less(sorted[a], sorted[b]) })
542
543         for _, v := range sorted {
544                 fmt.Fprintf(out, "%s.%s", seqname, v.String())
545                 for i := 0; i < len(varslice); i += 2 {
546                         if varslice[i] == v || varslice[i+1] == v {
547                                 out.Write([]byte("\t1"))
548                         } else {
549                                 out.Write([]byte("\t0"))
550                         }
551                 }
552                 out.Write([]byte{'\n'})
553         }
554 }