Merge branch '19566-glm'
authorTom Clegg <tom@curii.com>
Mon, 19 Dec 2022 19:25:54 +0000 (14:25 -0500)
committerTom Clegg <tom@curii.com>
Mon, 19 Dec 2022 19:25:54 +0000 (14:25 -0500)
refs #19566

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

arvados.go
glm.go
glm_test.go
manhattan.py
slice.go
slicenumpy.go

index 68f275508d86f1e8da3cfd55419abebae6e42541..77dceda46d63681dc171199ac28b0d65871cf9b2 100644 (file)
@@ -201,6 +201,7 @@ type arvadosContainerRunner struct {
        Mounts      map[string]map[string]interface{}
        Priority    int
        KeepCache   int // cache buffers per VCPU (0 for default)
+       Preemptible bool
 }
 
 func (runner *arvadosContainerRunner) Run() (string, error) {
@@ -269,7 +270,7 @@ func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, e
                        "priority":            runner.Priority,
                        "state":               arvados.ContainerRequestStateCommitted,
                        "scheduling_parameters": arvados.SchedulingParameters{
-                               Preemptible: false,
+                               Preemptible: runner.Preemptible,
                                Partitions:  []string{},
                        },
                        "environment": map[string]string{
diff --git a/glm.go b/glm.go
index 6ae98df928ac02a64d5381ff0a4f5bd8bfb0bfe5..b68bdd1898419ffab3c15d5516c8259ad4646073 100644 (file)
--- a/glm.go
+++ b/glm.go
@@ -35,7 +35,7 @@ func normalize(a []float64) {
 // onehot is the observed outcome, in same order as sampleInfo, but
 // shorter because it only has entries for samples with
 // isTraining==true.
-func pvalueGLM(sampleInfo []sampleInfo, onehot []bool, nPCA int) (p float64) {
+func glmPvalueFunc(sampleInfo []sampleInfo, nPCA int) func(onehot []bool) float64 {
        pcaNames := make([]string, 0, nPCA)
        data := make([][]statmodel.Dtype, 0, nPCA)
        for pca := 0; pca < nPCA; pca++ {
@@ -50,17 +50,11 @@ func pvalueGLM(sampleInfo []sampleInfo, onehot []bool, nPCA int) (p float64) {
                pcaNames = append(pcaNames, fmt.Sprintf("pca%d", pca))
        }
 
-       variant := make([]statmodel.Dtype, 0, len(sampleInfo))
        outcome := make([]statmodel.Dtype, 0, len(sampleInfo))
        constants := make([]statmodel.Dtype, 0, len(sampleInfo))
        row := 0
        for _, si := range sampleInfo {
                if si.isTraining {
-                       if onehot[row] {
-                               variant = append(variant, 1)
-                       } else {
-                               variant = append(variant, 0)
-                       }
                        if si.isCase {
                                outcome = append(outcome, 1)
                        } else {
@@ -70,27 +64,50 @@ func pvalueGLM(sampleInfo []sampleInfo, onehot []bool, nPCA int) (p float64) {
                        row++
                }
        }
-       data = append(data, variant, outcome, constants)
-       dataset := statmodel.NewDataset(data, append(pcaNames, "variant", "outcome", "constants"))
+       data = append([][]statmodel.Dtype{outcome, constants}, data...)
+       names := append([]string{"outcome", "constants"}, pcaNames...)
+       dataset := statmodel.NewDataset(data, names)
 
-       defer func() {
-               if recover() != nil {
-                       // typically "matrix singular or near-singular with condition number +Inf"
-                       p = math.NaN()
-               }
-       }()
-       model, err := glm.NewGLM(dataset, "outcome", append([]string{"constants"}, pcaNames...), glmConfig)
+       model, err := glm.NewGLM(dataset, "outcome", names[1:], glmConfig)
        if err != nil {
-               return math.NaN()
+               log.Printf("%s", err)
+               return func([]bool) float64 { return math.NaN() }
        }
        resultCov := model.Fit()
        logCov := resultCov.LogLike()
-       model, err = glm.NewGLM(dataset, "outcome", append([]string{"constants", "variant"}, pcaNames...), glmConfig)
-       if err != nil {
-               return math.NaN()
+
+       return func(onehot []bool) (p float64) {
+               defer func() {
+                       if recover() != nil {
+                               // typically "matrix singular or near-singular with condition number +Inf"
+                               p = math.NaN()
+                       }
+               }()
+
+               variant := make([]statmodel.Dtype, 0, len(sampleInfo))
+               row := 0
+               for _, si := range sampleInfo {
+                       if si.isTraining {
+                               if onehot[row] {
+                                       variant = append(variant, 1)
+                               } else {
+                                       variant = append(variant, 0)
+                               }
+                               row++
+                       }
+               }
+
+               data := append([][]statmodel.Dtype{data[0], variant}, data[1:]...)
+               names := append([]string{"outcome", "variant"}, names[1:]...)
+               dataset := statmodel.NewDataset(data, names)
+
+               model, err := glm.NewGLM(dataset, "outcome", names[1:], glmConfig)
+               if err != nil {
+                       return math.NaN()
+               }
+               resultComp := model.Fit()
+               logComp := resultComp.LogLike()
+               dist := distuv.ChiSquared{K: 1}
+               return dist.Survival(-2 * (logCov - logComp))
        }
-       resultComp := model.Fit()
-       logComp := resultComp.LogLike()
-       dist := distuv.ChiSquared{K: 1}
-       return dist.Survival(-2 * (logCov - logComp))
 }
index 9761b7b4f0f10388adcfe7cabff32276a474f6d7..ec3c000f9594d51c60ea4fdd2783f69a072b287b 100644 (file)
@@ -174,7 +174,7 @@ func (s *glmSuite) TestPvalueRealDataVsPython(c *check.C) {
                }
        }
 
-       pGo := pvalueGLM(samples, onehot, nPCA)
+       pGo := glmPvalueFunc(samples, nPCA)(onehot)
        c.Logf("pGo = %g", pGo)
 
        var pydata bytes.Buffer
@@ -263,7 +263,7 @@ func (s *glmSuite) TestPvalue(c *check.C) {
                return
        }
 
-       c.Check(pvalueGLM(csv2test(`
+       samples, onehot, npca := csv2test(`
 # case=1, onehot=1, pca1, pca2, pca3
 0, 0, 1, 1.21, 2.37
 0, 0, 2, 1.22, 2.38
@@ -274,7 +274,22 @@ func (s *glmSuite) TestPvalue(c *check.C) {
 1, 1, 1, 1.23, 2.36
 1, 1, 2, 1.22, 2.32
 1, 1, 3, 1.21, 2.31
-`)), check.Equals, 0.002789665435066107)
+`)
+       c.Check(glmPvalueFunc(samples, npca)(onehot), check.Equals, 0.002789665435066107)
+
+       samples, onehot, npca = csv2test(`
+# case=1, onehot=1, pca1, pca2, pca3
+0, 1, 1, 1.21, 2.37
+0, 1, 2, 1.22, 2.38
+0, 1, 3, 1.23, 2.39
+0, 1, 1, 1.24, 2.33
+0, 1, 2, 1.25, 2.34
+1, 1, 3, 1.26, 2.35
+1, 1, 1, 1.23, 2.36
+1, 1, 2, 1.22, 2.32
+1, 1, 3, 1.21, 2.31
+`)
+       c.Check(math.IsNaN(glmPvalueFunc(samples, npca)(onehot)), check.Equals, true)
 }
 
 var benchSamples, benchOnehot = func() ([]sampleInfo, []bool) {
@@ -300,7 +315,7 @@ var benchSamples, benchOnehot = func() ([]sampleInfo, []bool) {
 
 func (s *glmSuite) BenchmarkPvalue(c *check.C) {
        for i := 0; i < c.N; i++ {
-               p := pvalueGLM(benchSamples, benchOnehot, len(benchSamples[0].pcaComponents))
+               p := glmPvalueFunc(benchSamples, len(benchSamples[0].pcaComponents))(benchOnehot)
                c.Check(p, check.Equals, 0.0)
        }
 }
index 4736b8a5d9a44bc37df06a9bbfe53b84a21d19c1..95689d972af021d3b662d4262259b2e7fd9612ce 100644 (file)
@@ -36,11 +36,22 @@ for dirent in os.scandir(input_path):
                     tilepos[int(annotation[0])] = (annotation[4], int(annotation[5]))
 
 series = {"#CHROM": [], "POS": [], "P": []}
-for tag, chrpos in sorted(tilepos.items(), key=lambda item: (item[1][0].lstrip('chr').zfill(2), item[1][1])):
+for tag, chrpos in sorted(tilepos.items(), key=lambda item: (item[1][0][-1] > '9', item[1][0].lstrip('chr').zfill(2), item[1][1])):
     for p in pvalue.get(tag, []):
         series["#CHROM"].append(chrpos[0])
         series["POS"].append(chrpos[1])
         series["P"].append(p)
 
-qmplot.manhattanplot(data=pandas.DataFrame(series))
-matplotlib.pyplot.savefig(output_path)
+qmplot.manhattanplot(data=pandas.DataFrame(series),
+                     suggestiveline=2e-10,  # Turn off suggestiveline
+                     genomewideline=2e-11,  # Turn off genomewidel
+                     sign_line_cols=["#D62728", "#2CA02C"],
+                     marker=".",
+                     alpha = 0.6,
+                     hline_kws={"linestyle": "--", "lw": 1.3},
+                     title="Tile Variant Manhattan Plot",
+                     # xtick_label_set=xtick,
+                     xlabel="Chromosome",
+                     ylabel=r"$-log_{10}{(P)}$",
+                     xticklabel_kws={"rotation": "vertical"})
+matplotlib.pyplot.savefig(output_path, bbox_inches="tight")
index a4899836be66510ac116aa65c90b168033898149..057ac5fa9d57e093ef07de404cf5e147cc5230dc 100644 (file)
--- a/slice.go
+++ b/slice.go
@@ -40,6 +40,7 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std
        runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
        projectUUID := flags.String("project", "", "project `UUID` for output data")
        priority := flags.Int("priority", 500, "container request priority")
+       preemptible := flags.Bool("preemptible", true, "request preemptible instance")
        outputDir := flags.String("output-dir", "./out", "output `directory`")
        tagsPerFile := flags.Int("tags-per-file", 50000, "tags per file (nfiles will be ~10M÷x)")
        err = flags.Parse(args)
@@ -71,6 +72,7 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std
                        Priority:    *priority,
                        KeepCache:   2,
                        APIAccess:   true,
+                       Preemptible: *preemptible,
                }
                for i := range inputDirs {
                        err = runner.TranslatePaths(&inputDirs[i])
index fb3ae95187c5289e4a525caf9fc58e410838f666..bd0fc171f9613c74ce44539947bd970c677f3de2 100644 (file)
@@ -8,6 +8,7 @@ import (
        "bufio"
        "bytes"
        "encoding/gob"
+       "encoding/json"
        "errors"
        "flag"
        "fmt"
@@ -52,6 +53,8 @@ type sliceNumpy struct {
        samples         []sampleInfo
        trainingSet     []int // samples index => training set index, or -1 if not in training set
        trainingSetSize int
+       pvalue          func(onehot []bool) float64
+       pvalueCallCount int64
 }
 
 func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
@@ -72,6 +75,7 @@ func (cmd *sliceNumpy) run(prog string, args []string, stdin io.Reader, stdout,
        arvadosVCPUs := flags.Int("arvados-vcpus", 96, "number of VCPUs to request for arvados container")
        projectUUID := flags.String("project", "", "project `UUID` for output data")
        priority := flags.Int("priority", 500, "container request priority")
+       preemptible := flags.Bool("preemptible", true, "request preemptible instance")
        inputDir := flags.String("input-dir", "./in", "input `directory`")
        outputDir := flags.String("output-dir", "./out", "output `directory`")
        ref := flags.String("ref", "", "reference name (if blank, choose last one that appears in input)")
@@ -123,6 +127,7 @@ func (cmd *sliceNumpy) run(prog string, args []string, stdin io.Reader, stdout,
                        Priority:    *priority,
                        KeepCache:   2,
                        APIAccess:   true,
+                       Preemptible: *preemptible,
                }
                err = runner.TranslatePaths(inputDir, regionsFilename, samplesFilename)
                if err != nil {
@@ -187,6 +192,21 @@ func (cmd *sliceNumpy) run(prog string, args []string, stdin io.Reader, stdout,
                if err != nil {
                        return err
                }
+               if len(cmd.samples[0].pcaComponents) > 0 {
+                       cmd.pvalue = glmPvalueFunc(cmd.samples, cmd.pcaComponents)
+                       // Unfortunately, statsmodel/glm lib logs
+                       // stuff to os.Stdout when it panics on an
+                       // unsolvable problem. We recover() from the
+                       // panic in glm.go, but we also need to
+                       // commandeer os.Stdout to avoid producing
+                       // large quantities of logs.
+                       stdoutWas := os.Stdout
+                       defer func() { os.Stdout = stdoutWas }()
+                       os.Stdout, err = os.Open(os.DevNull)
+                       if err != nil {
+                               return err
+                       }
+               }
        } else if *caseControlOnly {
                return fmt.Errorf("-case-control-only does not make sense without -samples")
        }
@@ -280,6 +300,11 @@ func (cmd *sliceNumpy) run(prog string, args []string, stdin io.Reader, stdout,
                                cmd.trainingSet[i] = -1
                        }
                }
+               if cmd.pvalue == nil {
+                       cmd.pvalue = func(onehot []bool) float64 {
+                               return pvalue(onehot, cmd.chi2Cases)
+                       }
+               }
        }
        if cmd.filter.MinCoverage == 1 {
                // In the generic formula below, floating point
@@ -1174,6 +1199,17 @@ func (cmd *sliceNumpy) run(prog string, args []string, stdin io.Reader, stdout,
                        if err != nil {
                                return err
                        }
+                       fnm = fmt.Sprintf("%s/stats.json", *outputDir)
+                       j, err := json.Marshal(map[string]interface{}{
+                               "pvalueCallCount": cmd.pvalueCallCount,
+                       })
+                       if err != nil {
+                               return err
+                       }
+                       err = os.WriteFile(fnm, j, 0777)
+                       if err != nil {
+                               return err
+                       }
                }
                if *onlyPCA {
                        cols := 0
@@ -1602,12 +1638,8 @@ func (cmd *sliceNumpy) tv2homhet(cgs map[string]CompactGenome, maxv tileVariantI
                if col < 4 && !cmd.includeVariant1 {
                        continue
                }
-               var p float64
-               if len(cmd.samples[0].pcaComponents) > 0 {
-                       p = pvalueGLM(cmd.samples, obs[col], cmd.pcaComponents)
-               } else {
-                       p = pvalue(obs[col], cmd.chi2Cases)
-               }
+               atomic.AddInt64(&cmd.pvalueCallCount, 1)
+               p := cmd.pvalue(obs[col])
                if cmd.chi2PValue < 1 && !(p < cmd.chi2PValue) {
                        continue
                }