Fix some tests.
[lightning.git] / export.go
index 5b0910054cec3a4d952871c997987fb93ed1d123..39af631e9a4e0b552f2e507f7c0fe37ffb17bed4 100644 (file)
--- a/export.go
+++ b/export.go
@@ -1,3 +1,7 @@
+// Copyright (C) The Lightning Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
 package lightning
 
 import (
@@ -35,14 +39,15 @@ type tvVariant struct {
 type outputFormat interface {
        Filename() string
        PadLeft() bool
-       Head(out io.Writer, cgs []CompactGenome) error
+       Head(out io.Writer, cgs []CompactGenome, cases []bool, p float64) error
        Print(out io.Writer, seqname string, varslice []tvVariant) error
        Finish(outdir string, out io.Writer, seqname string) error
+       MaxGoroutines() int
 }
 
 var outputFormats = map[string]func() outputFormat{
        "hgvs-numpy": func() outputFormat {
-               return &formatHGVSNumpy{alleles: map[string][][]bool{}, variants: map[string][]hgvs.Variant{}}
+               return &formatHGVSNumpy{alleles: map[string][][]int8{}}
        },
        "hgvs-onehot": func() outputFormat { return formatHGVSOneHot{} },
        "hgvs":        func() outputFormat { return formatHGVS{} },
@@ -55,6 +60,9 @@ type exporter struct {
        outputPerChrom bool
        compress       bool
        maxTileSize    int
+       filter         filter
+       maxPValue      float64
+       cases          []bool
 }
 
 func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
@@ -73,6 +81,8 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std
        priority := flags.Int("priority", 500, "container request priority")
        refname := flags.String("ref", "", "reference genome `name`")
        inputDir := flags.String("input-dir", ".", "input `directory`")
+       cases := flags.String("cases", "", "file indicating which genomes are positive cases (for computing p-values)")
+       flags.Float64Var(&cmd.maxPValue, "p-value", 1, "do chi square test and omit columns with p-value above this threshold")
        outputDir := flags.String("output-dir", ".", "output `directory`")
        outputFormatStr := flags.String("output-format", "hgvs", "output `format`: hgvs, pvcf, or vcf")
        outputBed := flags.String("output-bed", "", "also output bed `file`")
@@ -80,15 +90,15 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std
        flags.BoolVar(&cmd.compress, "z", false, "write gzip-compressed output files")
        labelsFilename := flags.String("output-labels", "", "also output genome labels csv `file`")
        flags.IntVar(&cmd.maxTileSize, "max-tile-size", 50000, "don't try to make annotations for tiles bigger than given `size`")
+       cmd.filter.Flags(flags)
        err = flags.Parse(args)
        if err == flag.ErrHelp {
                err = nil
                return 0
        } else if err != nil {
                return 2
-       }
-       if flag.NArg() > 0 {
-               err = fmt.Errorf("extra unparsed command line arguments: %q", flag.Args())
+       } else if flags.NArg() > 0 {
+               err = fmt.Errorf("errant command line arguments after parsed flags: %v", flags.Args())
                return 2
        }
 
@@ -117,12 +127,12 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std
                        Name:        "lightning export",
                        Client:      arvados.NewClientFromEnv(),
                        ProjectUUID: *projectUUID,
-                       RAM:         700000000000,
+                       RAM:         750000000000,
                        VCPUs:       96,
                        Priority:    *priority,
                        APIAccess:   true,
                }
-               err = runner.TranslatePaths(inputDir)
+               err = runner.TranslatePaths(inputDir, cases)
                if err != nil {
                        return 1
                }
@@ -137,6 +147,8 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std
                        "-pprof", ":6000",
                        "-pprof-dir", "/mnt/output",
                        "-ref", *refname,
+                       "-cases", *cases,
+                       "-p-value", fmt.Sprintf("%f", cmd.maxPValue),
                        "-output-format", *outputFormatStr,
                        "-output-bed", *outputBed,
                        "-output-labels", "/mnt/output/labels.csv",
@@ -146,6 +158,7 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std
                        "-output-dir", "/mnt/output",
                        "-z=" + fmt.Sprintf("%v", cmd.compress),
                }
+               runner.Args = append(runner.Args, cmd.filter.Args()...)
                var output string
                output, err = runner.Run()
                if err != nil {
@@ -161,7 +174,7 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std
                retainTileSequences: true,
                compactGenomes:      map[string][]tileVariantID{},
        }
-       err = tilelib.LoadDir(context.Background(), *inputDir, nil)
+       err = tilelib.LoadDir(context.Background(), *inputDir)
        if err != nil {
                return 1
        }
@@ -177,6 +190,9 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std
                return 1
        }
 
+       log.Infof("filtering: %+v", cmd.filter)
+       cmd.filter.Apply(tilelib)
+
        names := cgnames(tilelib)
        for _, name := range names {
                cgs = append(cgs, CompactGenome{Name: name, Variants: tilelib.compactGenomes[name]})
@@ -203,6 +219,44 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std
                }
        }
 
+       cmd.cases = make([]bool, len(names))
+       if *cases != "" {
+               log.Infof("reading cases file: %s", *cases)
+               var f io.ReadCloser
+               f, err = open(*cases)
+               if err != nil {
+                       return 1
+               }
+               defer f.Close()
+               var buf []byte
+               buf, err = io.ReadAll(f)
+               if err != nil {
+                       return 1
+               }
+               for _, pattern := range bytes.Split(buf, []byte("\n")) {
+                       if len(pattern) == 0 {
+                               continue
+                       }
+                       pattern := string(pattern)
+                       idx := -1
+                       for i, name := range names {
+                               if !strings.Contains(name, pattern) {
+                                       continue
+                               } else if idx >= 0 {
+                                       err = fmt.Errorf("pattern %q in cases file matches multiple genome IDs: %q, %q", pattern, names[idx], name)
+                                       return 1
+                               } else {
+                                       idx = i
+                               }
+                       }
+                       if idx < 0 {
+                               log.Warnf("pattern %q in cases file does not match any genome IDs", pattern)
+                               continue
+                       }
+                       cmd.cases[idx] = true
+               }
+       }
+
        var bedout io.Writer
        var bedfile *os.File
        var bedbufw *bufio.Writer
@@ -298,7 +352,7 @@ func (cmd *exporter) export(outdir string, bedout io.Writer, tilelib *tileLibrar
                                defer z.Close()
                                outw[i] = z
                        }
-                       err = cmd.outputFormat.Head(outw[i], cgs)
+                       err = cmd.outputFormat.Head(outw[i], cgs, cmd.cases, cmd.maxPValue)
                        if err != nil {
                                return err
                        }
@@ -320,7 +374,7 @@ func (cmd *exporter) export(outdir string, bedout io.Writer, tilelib *tileLibrar
                        defer z.Close()
                        out = z
                }
-               cmd.outputFormat.Head(out, cgs)
+               cmd.outputFormat.Head(out, cgs, cmd.cases, cmd.maxPValue)
                merge(out, outw, "output")
        }
        if bedout != nil {
@@ -328,6 +382,9 @@ func (cmd *exporter) export(outdir string, bedout io.Writer, tilelib *tileLibrar
        }
 
        throttle := throttle{Max: runtime.NumCPU()}
+       if max := cmd.outputFormat.MaxGoroutines(); max > 0 {
+               throttle.Max = max
+       }
        log.Infof("assembling %d sequences in %d goroutines", len(seqnames), throttle.Max)
        for seqidx, seqname := range seqnames {
                seqidx, seqname := seqidx, seqname
@@ -386,15 +443,14 @@ func eachVariant(bedw io.Writer, taglen int, seqname string, reftiles []tileLibR
                tagcoverage := 0 // number of times the start tag was found in genomes -- max is len(cgs)*2
                for cgidx, cg := range cgs {
                        for phase := 0; phase < 2; phase++ {
-                               if len(cg.Variants) <= int(libref.Tag)*2+phase {
-                                       continue
+                               var variant tileVariantID
+                               if i := int(libref.Tag)*2 + phase; len(cg.Variants) > i {
+                                       variant = cg.Variants[i]
                                }
-                               variant := cg.Variants[int(libref.Tag)*2+phase]
-                               if variant == 0 {
-                                       continue
+                               if variant > 0 {
+                                       tagcoverage++
                                }
-                               tagcoverage++
-                               if variant == libref.Variant {
+                               if variant == libref.Variant || variant == 0 {
                                        continue
                                }
                                glibref := tileLibRef{Tag: libref.Tag, Variant: variant}
@@ -467,9 +523,29 @@ func eachVariant(bedw io.Writer, taglen int, seqname string, reftiles []tileLibR
                for i, pos := range flushpos {
                        varslice := variantAt[pos]
                        delete(variantAt, pos)
+                       // Check for uninitialized (zero-value)
+                       // elements in varslice
                        for i := range varslice {
-                               if varslice[i].Position == 0 {
-                                       varslice[i].Position = pos
+                               if varslice[i].Position != 0 {
+                                       // Not a zero-value element
+                                       continue
+                               }
+                               // Set the position so
+                               // varslice[*].Position are all equal
+                               varslice[i].Position = pos
+                               // This could be either =ref or a
+                               // missing/low-quality tile. Figure
+                               // out which.
+                               vidx := int(libref.Tag)*2 + i%2
+                               if vidx >= len(cgs[i/2].Variants) {
+                                       // Missing tile.
+                                       varslice[i].New = "-"
+                                       continue
+                               }
+                               v := cgs[i/2].Variants[vidx]
+                               if v < 1 || len(tilelib.TileVariantSequence(tileLibRef{Tag: libref.Tag, Variant: v})) == 0 {
+                                       // Missing/low-quality tile.
+                                       varslice[i].New = "-" // fasta "gap of indeterminate length"
                                }
                        }
                        flushvariants[i] = varslice
@@ -512,6 +588,11 @@ func bucketVarsliceByRef(varslice []tvVariant) map[string]map[string]int {
        byref := map[string]map[string]int{}
        for _, v := range varslice {
                if v.Ref == "" && v.New == "" {
+                       // =ref
+                       continue
+               }
+               if v.New == "-" {
+                       // no-call
                        continue
                }
                alts := byref[v.Ref]
@@ -526,10 +607,11 @@ func bucketVarsliceByRef(varslice []tvVariant) map[string]map[string]int {
 
 type formatVCF struct{}
 
+func (formatVCF) MaxGoroutines() int                     { return 0 }
 func (formatVCF) Filename() string                       { return "out.vcf" }
 func (formatVCF) PadLeft() bool                          { return true }
 func (formatVCF) Finish(string, io.Writer, string) error { return nil }
-func (formatVCF) Head(out io.Writer, cgs []CompactGenome) error {
+func (formatVCF) Head(out io.Writer, cgs []CompactGenome, cases []bool, p float64) error {
        _, err := fmt.Fprint(out, "#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\n")
        return err
 }
@@ -558,10 +640,11 @@ func (formatVCF) Print(out io.Writer, seqname string, varslice []tvVariant) erro
 
 type formatPVCF struct{}
 
+func (formatPVCF) MaxGoroutines() int                     { return 0 }
 func (formatPVCF) Filename() string                       { return "out.vcf" }
 func (formatPVCF) PadLeft() bool                          { return true }
 func (formatPVCF) Finish(string, io.Writer, string) error { return nil }
-func (formatPVCF) Head(out io.Writer, cgs []CompactGenome) error {
+func (formatPVCF) Head(out io.Writer, cgs []CompactGenome, cases []bool, p float64) error {
        fmt.Fprintln(out, `##FORMAT=<ID=GT,Number=1,Type=String,Description="Genotype">`)
        fmt.Fprintf(out, "#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT")
        for _, cg := range cgs {
@@ -612,16 +695,24 @@ func (formatPVCF) Print(out io.Writer, seqname string, varslice []tvVariant) err
 
 type formatHGVS struct{}
 
-func (formatHGVS) Filename() string                              { return "out.tsv" }
-func (formatHGVS) PadLeft() bool                                 { return false }
-func (formatHGVS) Head(out io.Writer, cgs []CompactGenome) error { return nil }
-func (formatHGVS) Finish(string, io.Writer, string) error        { return nil }
+func (formatHGVS) MaxGoroutines() int                                                     { return 0 }
+func (formatHGVS) Filename() string                                                       { return "out.tsv" }
+func (formatHGVS) PadLeft() bool                                                          { return false }
+func (formatHGVS) Head(out io.Writer, cgs []CompactGenome, cases []bool, p float64) error { return nil }
+func (formatHGVS) Finish(string, io.Writer, string) error                                 { return nil }
 func (formatHGVS) Print(out io.Writer, seqname string, varslice []tvVariant) error {
        for i := 0; i < len(varslice)/2; i++ {
                if i > 0 {
                        out.Write([]byte{'\t'})
                }
                var1, var2 := varslice[i*2], varslice[i*2+1]
+               if var1.New == "-" || var2.New == "-" {
+                       _, err := out.Write([]byte{'N'})
+                       if err != nil {
+                               return err
+                       }
+                       continue
+               }
                if var1.Variant == var2.Variant {
                        if var1.Ref == var1.New {
                                _, err := out.Write([]byte{'.'})
@@ -647,10 +738,13 @@ func (formatHGVS) Print(out io.Writer, seqname string, varslice []tvVariant) err
 
 type formatHGVSOneHot struct{}
 
-func (formatHGVSOneHot) Filename() string                              { return "out.tsv" }
-func (formatHGVSOneHot) PadLeft() bool                                 { return false }
-func (formatHGVSOneHot) Head(out io.Writer, cgs []CompactGenome) error { return nil }
-func (formatHGVSOneHot) Finish(string, io.Writer, string) error        { return nil }
+func (formatHGVSOneHot) MaxGoroutines() int { return 0 }
+func (formatHGVSOneHot) Filename() string   { return "out.tsv" }
+func (formatHGVSOneHot) PadLeft() bool      { return false }
+func (formatHGVSOneHot) Head(out io.Writer, cgs []CompactGenome, cases []bool, p float64) error {
+       return nil
+}
+func (formatHGVSOneHot) Finish(string, io.Writer, string) error { return nil }
 func (formatHGVSOneHot) Print(out io.Writer, seqname string, varslice []tvVariant) error {
        vars := map[hgvs.Variant]bool{}
        for _, v := range varslice {
@@ -667,6 +761,9 @@ func (formatHGVSOneHot) Print(out io.Writer, seqname string, varslice []tvVarian
        sort.Slice(sorted, func(a, b int) bool { return hgvs.Less(sorted[a], sorted[b]) })
 
        for _, v := range sorted {
+               if v.New == "-" {
+                       continue
+               }
                fmt.Fprintf(out, "%s.%s", seqname, v.String())
                for i := 0; i < len(varslice); i += 2 {
                        if varslice[i].Variant == v || varslice[i+1].Variant == v {
@@ -685,14 +782,21 @@ func (formatHGVSOneHot) Print(out io.Writer, seqname string, varslice []tvVarian
 
 type formatHGVSNumpy struct {
        sync.Mutex
-       variants map[string][]hgvs.Variant // variants[seqname][variantidx]
-       alleles  map[string][][]bool       // alleles[seqname][variantidx][genomeidx*2+phase]
+       writelock sync.Mutex
+       alleles   map[string][][]int8 // alleles[seqname][variantidx][genomeidx*2+phase]
+       cases     []bool
+       maxPValue float64
 }
 
-func (*formatHGVSNumpy) Filename() string                              { return "matrix.npy" }
-func (*formatHGVSNumpy) PadLeft() bool                                 { return false }
-func (*formatHGVSNumpy) Head(out io.Writer, cgs []CompactGenome) error { return nil }
-func (f *formatHGVSNumpy) Print(_ io.Writer, seqname string, varslice []tvVariant) error {
+func (*formatHGVSNumpy) MaxGoroutines() int { return 4 }
+func (*formatHGVSNumpy) Filename() string   { return "annotations.csv" }
+func (*formatHGVSNumpy) PadLeft() bool      { return false }
+func (f *formatHGVSNumpy) Head(out io.Writer, cgs []CompactGenome, cases []bool, p float64) error {
+       f.cases = cases
+       f.maxPValue = p
+       return nil
+}
+func (f *formatHGVSNumpy) Print(outw io.Writer, seqname string, varslice []tvVariant) error {
        // sort variants to ensure output is deterministic
        sorted := make([]hgvs.Variant, 0, len(varslice))
        for _, v := range varslice {
@@ -701,37 +805,56 @@ func (f *formatHGVSNumpy) Print(_ io.Writer, seqname string, varslice []tvVarian
        sort.Slice(sorted, func(a, b int) bool { return hgvs.Less(sorted[a], sorted[b]) })
 
        f.Lock()
-       defer f.Unlock()
-
-       seqvariants := f.variants[seqname]
        seqalleles := f.alleles[seqname]
+       f.Unlock()
 
-       // append a row to seqvariants and seqalleles for each unique
-       // non-ref variant in varslice.
+       chi2x := make([]bool, 0, len(varslice))
+       chi2y := make([]bool, 0, len(varslice))
+
+       // append a row to seqalleles for each unique non-ref variant
+       // in varslice.
        var previous hgvs.Variant
        for _, v := range sorted {
-               if previous == v || v.Ref == v.New {
+               if previous == v || v.Ref == v.New || v.New == "-" {
                        continue
                }
                previous = v
-               newrow := make([]bool, len(varslice))
+               chi2x, chi2y := chi2x, chi2y
+               newrow := make([]int8, len(varslice))
                for i, allele := range varslice {
                        if allele.Variant == v {
-                               newrow[i] = true
+                               newrow[i] = 1
+                               chi2x = append(chi2x, true)
+                               chi2y = append(chi2y, f.cases[i/2])
+                       } else if allele.Variant.New == "-" {
+                               newrow[i] = -1
+                       } else {
+                               chi2x = append(chi2x, false)
+                               chi2y = append(chi2y, f.cases[i/2])
                        }
                }
+               if f.maxPValue < 1 && pvalue(chi2x, chi2y) > f.maxPValue {
+                       continue
+               }
                seqalleles = append(seqalleles, newrow)
-               seqvariants = append(seqvariants, v)
+               _, err := fmt.Fprintf(outw, "%d,%q\n", len(seqalleles)-1, seqname+"."+v.String())
+               if err != nil {
+                       return err
+               }
        }
-       f.variants[seqname] = seqvariants
+
+       f.Lock()
        f.alleles[seqname] = seqalleles
+       f.Unlock()
        return nil
 }
-func (f *formatHGVSNumpy) Finish(outdir string, outw io.Writer, seqname string) error {
+func (f *formatHGVSNumpy) Finish(outdir string, _ io.Writer, seqname string) error {
        // Write seqname's data to a .npy matrix with one row per
        // genome and 2 columns per variant.
-       seqvariants := f.variants[seqname]
+       f.Lock()
        seqalleles := f.alleles[seqname]
+       delete(f.alleles, seqname)
+       f.Unlock()
        if len(seqalleles) == 0 {
                return nil
        }
@@ -743,16 +866,25 @@ func (f *formatHGVSNumpy) Finish(outdir string, outw io.Writer, seqname string)
        for varidx, alleles := range seqalleles {
                for g := 0; g < len(alleles)/2; g++ {
                        aa, ab := alleles[g*2], alleles[g*2+1]
-                       if aa && ab {
+                       if aa < 0 || ab < 0 {
+                               // no-call
+                               out[g*cols+varidx*2] = -1
+                               out[g*cols+varidx*2+1] = -1
+                       } else if aa > 0 && ab > 0 {
                                // hom
                                out[g*cols+varidx*2] = 1
-                       } else if aa || ab {
+                       } else if aa > 0 || ab > 0 {
                                // het
                                out[g*cols+varidx*2+1] = 1
                        }
                }
        }
-       bufw := bufio.NewWriter(outw)
+       outf, err := os.OpenFile(outdir+"/matrix."+seqname+".npy", os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0777)
+       if err != nil {
+               return err
+       }
+       defer outf.Close()
+       bufw := bufio.NewWriter(outf)
        npw, err := gonpy.NewWriter(nopCloser{bufw})
        if err != nil {
                return err
@@ -763,25 +895,16 @@ func (f *formatHGVSNumpy) Finish(outdir string, outw io.Writer, seqname string)
                "cols":    cols,
        }).Info("writing numpy")
        npw.Shape = []int{rows, cols}
+       f.writelock.Lock() // serialize because WriteInt8 uses lots of memory
        npw.WriteInt8(out)
+       f.writelock.Unlock()
        err = bufw.Flush()
        if err != nil {
                return err
        }
-
-       // Write annotations
-       csv, err := os.OpenFile(outdir+"/annotations."+seqname+".csv", os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0777)
+       err = outf.Close()
        if err != nil {
                return err
        }
-       defer csv.Close()
-       for i, v := range seqvariants {
-               fmt.Fprintf(csv, "%d,%q\n", i, seqname+"."+v.String())
-       }
-       err = csv.Close()
-       if err != nil {
-               return err
-       }
-
        return nil
 }