Fix log message.
[lightning.git] / slicenumpy.go
1 // Copyright (C) The Lightning Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package lightning
6
7 import (
8         "bufio"
9         "bytes"
10         "encoding/gob"
11         "flag"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "math"
16         "net/http"
17         _ "net/http/pprof"
18         "os"
19         "regexp"
20         "runtime"
21         "runtime/debug"
22         "sort"
23         "strconv"
24         "strings"
25         "sync/atomic"
26         "unsafe"
27
28         "git.arvados.org/arvados.git/sdk/go/arvados"
29         "github.com/arvados/lightning/hgvs"
30         "github.com/kshedden/gonpy"
31         log "github.com/sirupsen/logrus"
32         "golang.org/x/crypto/blake2b"
33 )
34
35 type sliceNumpy struct {
36         filter                filter
37         threads               int
38         chi2CaseControlColumn string
39         chi2CaseControlFile   string
40         chi2Cases             []bool
41         chi2PValue            float64
42         minCoverage           int
43         cgnames               []string
44 }
45
46 func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
47         var err error
48         defer func() {
49                 if err != nil {
50                         fmt.Fprintf(stderr, "%s\n", err)
51                 }
52         }()
53         flags := flag.NewFlagSet("", flag.ContinueOnError)
54         flags.SetOutput(stderr)
55         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
56         runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
57         projectUUID := flags.String("project", "", "project `UUID` for output data")
58         priority := flags.Int("priority", 500, "container request priority")
59         inputDir := flags.String("input-dir", "./in", "input `directory`")
60         outputDir := flags.String("output-dir", "./out", "output `directory`")
61         ref := flags.String("ref", "", "reference name (if blank, choose last one that appears in input)")
62         regionsFilename := flags.String("regions", "", "only output columns/annotations that intersect regions in specified bed `file`")
63         expandRegions := flags.Int("expand-regions", 0, "expand specified regions by `N` base pairs on each side`")
64         mergeOutput := flags.Bool("merge-output", false, "merge output into one matrix.npy and one matrix.annotations.csv")
65         hgvsSingle := flags.Bool("single-hgvs-matrix", false, "also generate hgvs-based matrix")
66         hgvsChunked := flags.Bool("chunked-hgvs-matrix", false, "also generate hgvs-based matrix per chromosome")
67         onehotSingle := flags.Bool("single-onehot", false, "generate one-hot tile-based matrix")
68         onehotChunked := flags.Bool("chunked-onehot", false, "generate one-hot tile-based matrix per input chunk")
69         flags.IntVar(&cmd.threads, "threads", 16, "number of memory-hungry assembly threads")
70         flags.StringVar(&cmd.chi2CaseControlFile, "chi2-case-control-file", "", "tsv file or directory indicating cases and controls for Χ² test (if directory, all .tsv files will be read)")
71         flags.StringVar(&cmd.chi2CaseControlColumn, "chi2-case-control-column", "", "name of case/control column in case-control files for Χ² test (value must be 0 for control, 1 for case)")
72         flags.Float64Var(&cmd.chi2PValue, "chi2-p-value", 1, "do Χ² test and omit columns with p-value above this threshold")
73         cmd.filter.Flags(flags)
74         err = flags.Parse(args)
75         if err == flag.ErrHelp {
76                 err = nil
77                 return 0
78         } else if err != nil {
79                 return 2
80         }
81
82         if *pprof != "" {
83                 go func() {
84                         log.Println(http.ListenAndServe(*pprof, nil))
85                 }()
86         }
87
88         if cmd.chi2PValue != 1 && (cmd.chi2CaseControlFile == "" || cmd.chi2CaseControlColumn == "") {
89                 log.Errorf("cannot use provided -chi2-p-value=%f because -chi2-case-control-file= or -chi2-case-control-column= value is empty", cmd.chi2PValue)
90                 return 2
91         }
92
93         if !*runlocal {
94                 runner := arvadosContainerRunner{
95                         Name:        "lightning slice-numpy",
96                         Client:      arvados.NewClientFromEnv(),
97                         ProjectUUID: *projectUUID,
98                         RAM:         750000000000,
99                         VCPUs:       96,
100                         Priority:    *priority,
101                         KeepCache:   2,
102                         APIAccess:   true,
103                 }
104                 err = runner.TranslatePaths(inputDir, regionsFilename, &cmd.chi2CaseControlFile)
105                 if err != nil {
106                         return 1
107                 }
108                 runner.Args = []string{"slice-numpy", "-local=true",
109                         "-pprof=:6060",
110                         "-input-dir=" + *inputDir,
111                         "-output-dir=/mnt/output",
112                         "-threads=" + fmt.Sprintf("%d", cmd.threads),
113                         "-regions=" + *regionsFilename,
114                         "-expand-regions=" + fmt.Sprintf("%d", *expandRegions),
115                         "-merge-output=" + fmt.Sprintf("%v", *mergeOutput),
116                         "-single-hgvs-matrix=" + fmt.Sprintf("%v", *hgvsSingle),
117                         "-chunked-hgvs-matrix=" + fmt.Sprintf("%v", *hgvsChunked),
118                         "-single-onehot=" + fmt.Sprintf("%v", *onehotSingle),
119                         "-chunked-onehot=" + fmt.Sprintf("%v", *onehotChunked),
120                         "-chi2-case-control-file=" + cmd.chi2CaseControlFile,
121                         "-chi2-case-control-column=" + cmd.chi2CaseControlColumn,
122                         "-chi2-p-value=" + fmt.Sprintf("%f", cmd.chi2PValue),
123                 }
124                 runner.Args = append(runner.Args, cmd.filter.Args()...)
125                 var output string
126                 output, err = runner.Run()
127                 if err != nil {
128                         return 1
129                 }
130                 fmt.Fprintln(stdout, output)
131                 return 0
132         }
133
134         infiles, err := allFiles(*inputDir, matchGobFile)
135         if err != nil {
136                 return 1
137         }
138         if len(infiles) == 0 {
139                 err = fmt.Errorf("no input files found in %s", *inputDir)
140                 return 1
141         }
142         sort.Strings(infiles)
143
144         var refseq map[string][]tileLibRef
145         var reftiledata = make(map[tileLibRef][]byte, 11000000)
146         in0, err := open(infiles[0])
147         if err != nil {
148                 return 1
149         }
150
151         matchGenome, err := regexp.Compile(cmd.filter.MatchGenome)
152         if err != nil {
153                 err = fmt.Errorf("-match-genome: invalid regexp: %q", cmd.filter.MatchGenome)
154                 return 1
155         }
156
157         cmd.cgnames = nil
158         var tagset [][]byte
159         DecodeLibrary(in0, strings.HasSuffix(infiles[0], ".gz"), func(ent *LibraryEntry) error {
160                 if len(ent.TagSet) > 0 {
161                         tagset = ent.TagSet
162                 }
163                 for _, cseq := range ent.CompactSequences {
164                         if cseq.Name == *ref || *ref == "" {
165                                 refseq = cseq.TileSequences
166                         }
167                 }
168                 for _, cg := range ent.CompactGenomes {
169                         if matchGenome.MatchString(cg.Name) {
170                                 cmd.cgnames = append(cmd.cgnames, cg.Name)
171                         }
172                 }
173                 for _, tv := range ent.TileVariants {
174                         if tv.Ref {
175                                 reftiledata[tileLibRef{tv.Tag, tv.Variant}] = tv.Sequence
176                         }
177                 }
178                 return nil
179         })
180         if err != nil {
181                 return 1
182         }
183         in0.Close()
184         if refseq == nil {
185                 err = fmt.Errorf("%s: reference sequence not found", infiles[0])
186                 return 1
187         }
188         if len(tagset) == 0 {
189                 err = fmt.Errorf("tagset not found")
190                 return 1
191         }
192
193         taglib := &tagLibrary{}
194         err = taglib.setTags(tagset)
195         if err != nil {
196                 return 1
197         }
198         taglen := taglib.TagLen()
199
200         if len(cmd.cgnames) == 0 {
201                 err = fmt.Errorf("no genomes found matching regexp %q", cmd.filter.MatchGenome)
202                 return 1
203         }
204         sort.Strings(cmd.cgnames)
205         err = cmd.useCaseControlFiles()
206         if err != nil {
207                 return 1
208         }
209         cmd.minCoverage = int(math.Ceil(cmd.filter.MinCoverage * float64(len(cmd.cgnames))))
210
211         {
212                 labelsFilename := *outputDir + "/samples.csv"
213                 log.Infof("writing labels to %s", labelsFilename)
214                 var f *os.File
215                 f, err = os.Create(labelsFilename)
216                 if err != nil {
217                         return 1
218                 }
219                 defer f.Close()
220                 for i, name := range cmd.cgnames {
221                         cc := 0
222                         if cmd.chi2Cases != nil && cmd.chi2Cases[i] {
223                                 cc = 1
224                         }
225                         _, err = fmt.Fprintf(f, "%d,%q,%d\n", i, trimFilenameForLabel(name), cc)
226                         if err != nil {
227                                 err = fmt.Errorf("write %s: %w", labelsFilename, err)
228                                 return 1
229                         }
230                 }
231                 err = f.Close()
232                 if err != nil {
233                         err = fmt.Errorf("close %s: %w", labelsFilename, err)
234                         return 1
235                 }
236         }
237
238         log.Info("indexing reference tiles")
239         type reftileinfo struct {
240                 variant  tileVariantID
241                 seqname  string // chr1
242                 pos      int    // distance from start of chromosome to starttag
243                 tiledata []byte // acgtggcaa...
244         }
245         isdup := map[tagID]bool{}
246         reftile := map[tagID]*reftileinfo{}
247         for seqname, cseq := range refseq {
248                 pos := 0
249                 for _, libref := range cseq {
250                         tiledata := reftiledata[libref]
251                         if len(tiledata) == 0 {
252                                 err = fmt.Errorf("missing tiledata for tag %d variant %d in %s in ref", libref.Tag, libref.Variant, seqname)
253                                 return 1
254                         }
255                         foundthistag := false
256                         taglib.FindAll(tiledata[:len(tiledata)-1], func(tagid tagID, offset, _ int) {
257                                 if !foundthistag && tagid == libref.Tag {
258                                         foundthistag = true
259                                         return
260                                 }
261                                 if dupref, ok := reftile[tagid]; ok {
262                                         log.Printf("dropping reference tile %+v from %s @ %d, tag not unique, also found inside %+v from %s @ %d", tileLibRef{Tag: tagid, Variant: dupref.variant}, dupref.seqname, dupref.pos, libref, seqname, pos+offset+1)
263                                         delete(reftile, tagid)
264                                 } else {
265                                         log.Printf("found tag %d at offset %d inside tile variant %+v on %s @ %d", tagid, offset, libref, seqname, pos+offset+1)
266                                 }
267                                 isdup[tagid] = true
268                         })
269                         if isdup[libref.Tag] {
270                                 log.Printf("dropping reference tile %+v from %s @ %d, tag not unique", libref, seqname, pos)
271                         } else if reftile[libref.Tag] != nil {
272                                 log.Printf("dropping reference tile %+v from %s @ %d, tag not unique", tileLibRef{Tag: libref.Tag, Variant: reftile[libref.Tag].variant}, reftile[libref.Tag].seqname, reftile[libref.Tag].pos)
273                                 delete(reftile, libref.Tag)
274                                 log.Printf("dropping reference tile %+v from %s @ %d, tag not unique", libref, seqname, pos)
275                                 isdup[libref.Tag] = true
276                         } else {
277                                 reftile[libref.Tag] = &reftileinfo{
278                                         seqname:  seqname,
279                                         variant:  libref.Variant,
280                                         tiledata: tiledata,
281                                         pos:      pos,
282                                 }
283                         }
284                         pos += len(tiledata) - taglen
285                 }
286                 log.Printf("... %s done, len %d", seqname, pos+taglen)
287         }
288
289         var mask *mask
290         if *regionsFilename != "" {
291                 log.Printf("loading regions from %s", *regionsFilename)
292                 mask, err = makeMask(*regionsFilename, *expandRegions)
293                 if err != nil {
294                         return 1
295                 }
296                 log.Printf("before applying mask, len(reftile) == %d", len(reftile))
297                 log.Printf("deleting reftile entries for regions outside %d intervals", mask.Len())
298                 for tag, rt := range reftile {
299                         if !mask.Check(strings.TrimPrefix(rt.seqname, "chr"), rt.pos, rt.pos+len(rt.tiledata)) {
300                                 delete(reftile, tag)
301                         }
302                 }
303                 log.Printf("after applying mask, len(reftile) == %d", len(reftile))
304         }
305
306         type hgvsColSet map[hgvs.Variant][2][]int8
307         encodeHGVS := throttle{Max: len(refseq)}
308         encodeHGVSTodo := map[string]chan hgvsColSet{}
309         tmpHGVSCols := map[string]*os.File{}
310         if *hgvsChunked {
311                 for seqname := range refseq {
312                         var f *os.File
313                         f, err = os.Create(*outputDir + "/tmp." + seqname + ".gob")
314                         if err != nil {
315                                 return 1
316                         }
317                         defer os.Remove(f.Name())
318                         bufw := bufio.NewWriterSize(f, 1<<24)
319                         enc := gob.NewEncoder(bufw)
320                         tmpHGVSCols[seqname] = f
321                         todo := make(chan hgvsColSet, 128)
322                         encodeHGVSTodo[seqname] = todo
323                         encodeHGVS.Go(func() error {
324                                 for colset := range todo {
325                                         err := enc.Encode(colset)
326                                         if err != nil {
327                                                 encodeHGVS.Report(err)
328                                                 for range todo {
329                                                 }
330                                                 return err
331                                         }
332                                 }
333                                 return bufw.Flush()
334                         })
335                 }
336         }
337
338         var toMerge [][]int16
339         if *mergeOutput || *hgvsSingle {
340                 toMerge = make([][]int16, len(infiles))
341         }
342         var onehotIndirect [][2][]uint32 // [chunkIndex][axis][index]
343         var onehotXrefs [][]onehotXref
344         if *onehotSingle {
345                 onehotIndirect = make([][2][]uint32, len(infiles))
346                 onehotXrefs = make([][]onehotXref, len(infiles))
347         }
348
349         throttleMem := throttle{Max: cmd.threads} // TODO: estimate using mem and data size
350         throttleNumpyMem := throttle{Max: cmd.threads/2 + 1}
351         log.Info("generating annotations and numpy matrix for each slice")
352         var done int64
353         for infileIdx, infile := range infiles {
354                 infileIdx, infile := infileIdx, infile
355                 throttleMem.Go(func() error {
356                         seq := make(map[tagID][]TileVariant, 50000)
357                         cgs := make(map[string]CompactGenome, len(cmd.cgnames))
358                         f, err := open(infile)
359                         if err != nil {
360                                 return err
361                         }
362                         defer f.Close()
363                         log.Infof("%04d: reading %s", infileIdx, infile)
364                         err = DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error {
365                                 for _, tv := range ent.TileVariants {
366                                         if tv.Ref {
367                                                 continue
368                                         }
369                                         if mask != nil && reftile[tv.Tag] == nil {
370                                                 // Don't waste
371                                                 // time/memory on
372                                                 // masked-out tiles.
373                                                 continue
374                                         }
375                                         variants := seq[tv.Tag]
376                                         if len(variants) == 0 {
377                                                 variants = make([]TileVariant, 100)
378                                         }
379                                         for len(variants) <= int(tv.Variant) {
380                                                 variants = append(variants, TileVariant{})
381                                         }
382                                         variants[int(tv.Variant)] = tv
383                                         seq[tv.Tag] = variants
384                                 }
385                                 for _, cg := range ent.CompactGenomes {
386                                         if !matchGenome.MatchString(cg.Name) {
387                                                 continue
388                                         }
389                                         // pad to full slice size
390                                         // to avoid out-of-bounds
391                                         // checks later
392                                         if sliceSize := 2 * int(cg.EndTag-cg.StartTag); len(cg.Variants) < sliceSize {
393                                                 cg.Variants = append(cg.Variants, make([]tileVariantID, sliceSize-len(cg.Variants))...)
394                                         }
395                                         cgs[cg.Name] = cg
396                                 }
397                                 return nil
398                         })
399                         if err != nil {
400                                 return err
401                         }
402                         tagstart := cgs[cmd.cgnames[0]].StartTag
403                         tagend := cgs[cmd.cgnames[0]].EndTag
404
405                         // TODO: filters
406
407                         log.Infof("%04d: renumber/dedup variants for tags %d-%d", infileIdx, tagstart, tagend)
408                         variantRemap := make([][]tileVariantID, tagend-tagstart)
409                         throttleCPU := throttle{Max: runtime.GOMAXPROCS(0)}
410                         for tag, variants := range seq {
411                                 tag, variants := tag, variants
412                                 throttleCPU.Acquire()
413                                 go func() {
414                                         defer throttleCPU.Release()
415                                         count := make(map[[blake2b.Size256]byte]int, len(variants))
416
417                                         rt := reftile[tag]
418                                         if rt != nil {
419                                                 count[blake2b.Sum256(rt.tiledata)] = 0
420                                         }
421
422                                         for _, cg := range cgs {
423                                                 idx := int(tag-tagstart) * 2
424                                                 for allele := 0; allele < 2; allele++ {
425                                                         v := cg.Variants[idx+allele]
426                                                         if v > 0 && len(variants[v].Sequence) > 0 {
427                                                                 count[variants[v].Blake2b]++
428                                                         }
429                                                 }
430                                         }
431                                         // hash[i] will be the hash of
432                                         // the variant(s) that should
433                                         // be at rank i (0-based).
434                                         hash := make([][blake2b.Size256]byte, 0, len(count))
435                                         for b := range count {
436                                                 hash = append(hash, b)
437                                         }
438                                         sort.Slice(hash, func(i, j int) bool {
439                                                 bi, bj := &hash[i], &hash[j]
440                                                 if ci, cj := count[*bi], count[*bj]; ci != cj {
441                                                         return ci > cj
442                                                 } else {
443                                                         return bytes.Compare((*bi)[:], (*bj)[:]) < 0
444                                                 }
445                                         })
446                                         // rank[b] will be the 1-based
447                                         // new variant number for
448                                         // variants whose hash is b.
449                                         rank := make(map[[blake2b.Size256]byte]tileVariantID, len(hash))
450                                         for i, h := range hash {
451                                                 rank[h] = tileVariantID(i + 1)
452                                         }
453                                         // remap[v] will be the new
454                                         // variant number for original
455                                         // variant number v.
456                                         remap := make([]tileVariantID, len(variants))
457                                         for i, tv := range variants {
458                                                 remap[i] = rank[tv.Blake2b]
459                                         }
460                                         variantRemap[tag-tagstart] = remap
461                                         if rt != nil {
462                                                 rt.variant = rank[blake2b.Sum256(rt.tiledata)]
463                                         }
464                                 }()
465                         }
466                         throttleCPU.Wait()
467
468                         var onehotChunk [][]int8
469                         var onehotXref []onehotXref
470
471                         annotationsFilename := fmt.Sprintf("%s/matrix.%04d.annotations.csv", *outputDir, infileIdx)
472                         log.Infof("%04d: writing %s", infileIdx, annotationsFilename)
473                         annof, err := os.Create(annotationsFilename)
474                         if err != nil {
475                                 return err
476                         }
477                         annow := bufio.NewWriterSize(annof, 1<<20)
478                         outcol := 0
479                         for tag := tagstart; tag < tagend; tag++ {
480                                 rt, ok := reftile[tag]
481                                 if !ok {
482                                         if mask == nil {
483                                                 outcol++
484                                         }
485                                         // Excluded by specified
486                                         // regions, or reference does
487                                         // not use any variant of this
488                                         // tile. (TODO: log this?
489                                         // mention it in annotations?)
490                                         continue
491                                 }
492                                 remap := variantRemap[tag-tagstart]
493                                 maxv := tileVariantID(0)
494                                 for _, v := range remap {
495                                         if maxv < v {
496                                                 maxv = v
497                                         }
498                                 }
499                                 if *onehotChunked || *onehotSingle {
500                                         onehot, xrefs := cmd.tv2homhet(cgs, maxv, remap, tag, tagstart)
501                                         onehotChunk = append(onehotChunk, onehot...)
502                                         onehotXref = append(onehotXref, xrefs...)
503                                 }
504                                 fmt.Fprintf(annow, "%d,%d,%d,=,%s,%d,,,\n", tag, outcol, rt.variant, rt.seqname, rt.pos)
505                                 variants := seq[tag]
506                                 reftilestr := strings.ToUpper(string(rt.tiledata))
507
508                                 done := make([]bool, maxv+1)
509                                 variantDiffs := make([][]hgvs.Variant, maxv+1)
510                                 for v, tv := range variants {
511                                         v := remap[v]
512                                         if v == rt.variant || done[v] {
513                                                 continue
514                                         } else {
515                                                 done[v] = true
516                                         }
517                                         if len(tv.Sequence) < taglen || !bytes.HasSuffix(rt.tiledata, tv.Sequence[len(tv.Sequence)-taglen:]) {
518                                                 fmt.Fprintf(annow, "%d,%d,%d,,%s,%d,,,\n", tag, outcol, v, rt.seqname, rt.pos)
519                                                 continue
520                                         }
521                                         if lendiff := len(rt.tiledata) - len(tv.Sequence); lendiff < -1000 || lendiff > 1000 {
522                                                 fmt.Fprintf(annow, "%d,%d,%d,,%s,%d,,,\n", tag, outcol, v, rt.seqname, rt.pos)
523                                                 continue
524                                         }
525                                         diffs, _ := hgvs.Diff(reftilestr, strings.ToUpper(string(tv.Sequence)), 0)
526                                         for i := range diffs {
527                                                 diffs[i].Position += rt.pos
528                                         }
529                                         for _, diff := range diffs {
530                                                 fmt.Fprintf(annow, "%d,%d,%d,%s:g.%s,%s,%d,%s,%s,%s\n", tag, outcol, v, rt.seqname, diff.String(), rt.seqname, diff.Position, diff.Ref, diff.New, diff.Left)
531                                         }
532                                         if *hgvsChunked {
533                                                 variantDiffs[v] = diffs
534                                         }
535                                 }
536                                 if *hgvsChunked {
537                                         // We can now determine, for each HGVS
538                                         // variant (diff) in this reftile
539                                         // region, whether a given genome
540                                         // phase/allele (1) has the variant, (0) has
541                                         // =ref or a different variant in that
542                                         // position, or (-1) is lacking
543                                         // coverage / couldn't be diffed.
544                                         hgvsCol := hgvsColSet{}
545                                         for _, diffs := range variantDiffs {
546                                                 for _, diff := range diffs {
547                                                         if _, ok := hgvsCol[diff]; ok {
548                                                                 continue
549                                                         }
550                                                         hgvsCol[diff] = [2][]int8{
551                                                                 make([]int8, len(cmd.cgnames)),
552                                                                 make([]int8, len(cmd.cgnames)),
553                                                         }
554                                                 }
555                                         }
556                                         for row, name := range cmd.cgnames {
557                                                 variants := cgs[name].Variants[(tag-tagstart)*2:]
558                                                 for ph := 0; ph < 2; ph++ {
559                                                         v := variants[ph]
560                                                         if int(v) >= len(remap) {
561                                                                 v = 0
562                                                         } else {
563                                                                 v = remap[v]
564                                                         }
565                                                         if v == rt.variant {
566                                                                 // hgvsCol[*][ph][row] is already 0
567                                                         } else if len(variantDiffs[v]) == 0 {
568                                                                 // lacking coverage / couldn't be diffed
569                                                                 for _, col := range hgvsCol {
570                                                                         col[ph][row] = -1
571                                                                 }
572                                                         } else {
573                                                                 for _, diff := range variantDiffs[v] {
574                                                                         hgvsCol[diff][ph][row] = 1
575                                                                 }
576                                                         }
577                                                 }
578                                         }
579                                         for diff, colpair := range hgvsCol {
580                                                 allele2homhet(colpair)
581                                                 if !cmd.filterHGVScolpair(colpair) {
582                                                         delete(hgvsCol, diff)
583                                                 }
584                                         }
585                                         if len(hgvsCol) > 0 {
586                                                 encodeHGVSTodo[rt.seqname] <- hgvsCol
587                                         }
588                                 }
589                                 outcol++
590                         }
591                         err = annow.Flush()
592                         if err != nil {
593                                 return err
594                         }
595                         err = annof.Close()
596                         if err != nil {
597                                 return err
598                         }
599
600                         if *onehotChunked {
601                                 // transpose onehotChunk[col][row] to numpy[row*ncols+col]
602                                 rows := len(cmd.cgnames)
603                                 cols := len(onehotChunk)
604                                 log.Infof("%04d: preparing onehot numpy (rows=%d, cols=%d, mem=%d)", infileIdx, len(cmd.cgnames), len(onehotChunk), len(cmd.cgnames)*len(onehotChunk))
605                                 throttleNumpyMem.Acquire()
606                                 out := onehotcols2int8(onehotChunk)
607                                 fnm := fmt.Sprintf("%s/onehot.%04d.npy", *outputDir, infileIdx)
608                                 err = writeNumpyInt8(fnm, out, rows, cols)
609                                 if err != nil {
610                                         return err
611                                 }
612                                 fnm = fmt.Sprintf("%s/onehot-columns.%04d.npy", *outputDir, infileIdx)
613                                 err = writeNumpyInt32(fnm, onehotXref2int32(onehotXref), 4, len(onehotXref))
614                                 if err != nil {
615                                         return err
616                                 }
617                                 debug.FreeOSMemory()
618                                 throttleNumpyMem.Release()
619                         }
620                         if *onehotSingle {
621                                 onehotIndirect[infileIdx] = onehotChunk2Indirect(onehotChunk)
622                                 onehotXrefs[infileIdx] = onehotXref
623                                 n := len(onehotIndirect[infileIdx][0])
624                                 log.Infof("%04d: keeping onehot coordinates in memory (n=%d, mem=%d)", infileIdx, n, n*8)
625                         }
626                         if !(*onehotSingle || *onehotChunked) || *mergeOutput || *hgvsSingle {
627                                 log.Infof("%04d: preparing numpy", infileIdx)
628                                 throttleNumpyMem.Acquire()
629                                 rows := len(cmd.cgnames)
630                                 cols := 2 * outcol
631                                 out := make([]int16, rows*cols)
632                                 for row, name := range cmd.cgnames {
633                                         out := out[row*cols:]
634                                         outcol := 0
635                                         for col, v := range cgs[name].Variants {
636                                                 tag := tagstart + tagID(col/2)
637                                                 if mask != nil && reftile[tag] == nil {
638                                                         continue
639                                                 }
640                                                 if variants, ok := seq[tag]; ok && len(variants) > int(v) && len(variants[v].Sequence) > 0 {
641                                                         out[outcol] = int16(variantRemap[tag-tagstart][v])
642                                                 } else {
643                                                         out[outcol] = -1
644                                                 }
645                                                 outcol++
646                                         }
647                                 }
648                                 seq = nil
649                                 cgs = nil
650                                 debug.FreeOSMemory()
651                                 throttleNumpyMem.Release()
652                                 if *mergeOutput || *hgvsSingle {
653                                         log.Infof("%04d: matrix fragment %d rows x %d cols", infileIdx, rows, cols)
654                                         toMerge[infileIdx] = out
655                                 }
656                                 if !*mergeOutput && !*onehotChunked && !*onehotSingle {
657                                         fnm := fmt.Sprintf("%s/matrix.%04d.npy", *outputDir, infileIdx)
658                                         err = writeNumpyInt16(fnm, out, rows, cols)
659                                         if err != nil {
660                                                 return err
661                                         }
662                                 }
663                         }
664                         debug.FreeOSMemory()
665                         log.Infof("%s: done (%d/%d)", infile, int(atomic.AddInt64(&done, 1)), len(infiles))
666                         return nil
667                 })
668         }
669         if err = throttleMem.Wait(); err != nil {
670                 return 1
671         }
672
673         if *hgvsChunked {
674                 log.Info("flushing hgvsCols temp files")
675                 for seqname := range refseq {
676                         close(encodeHGVSTodo[seqname])
677                 }
678                 err = encodeHGVS.Wait()
679                 if err != nil {
680                         return 1
681                 }
682                 for seqname := range refseq {
683                         log.Infof("%s: reading hgvsCols from temp file", seqname)
684                         f := tmpHGVSCols[seqname]
685                         _, err = f.Seek(0, io.SeekStart)
686                         if err != nil {
687                                 return 1
688                         }
689                         var hgvsCols hgvsColSet
690                         dec := gob.NewDecoder(bufio.NewReaderSize(f, 1<<24))
691                         for err == nil {
692                                 err = dec.Decode(&hgvsCols)
693                         }
694                         if err != io.EOF {
695                                 return 1
696                         }
697                         log.Infof("%s: sorting %d hgvs variants", seqname, len(hgvsCols))
698                         variants := make([]hgvs.Variant, 0, len(hgvsCols))
699                         for v := range hgvsCols {
700                                 variants = append(variants, v)
701                         }
702                         sort.Slice(variants, func(i, j int) bool {
703                                 vi, vj := &variants[i], &variants[j]
704                                 if vi.Position != vj.Position {
705                                         return vi.Position < vj.Position
706                                 } else if vi.Ref != vj.Ref {
707                                         return vi.Ref < vj.Ref
708                                 } else {
709                                         return vi.New < vj.New
710                                 }
711                         })
712                         rows := len(cmd.cgnames)
713                         cols := len(variants) * 2
714                         log.Infof("%s: building hgvs matrix (rows=%d, cols=%d, mem=%d)", seqname, rows, cols, rows*cols)
715                         out := make([]int8, rows*cols)
716                         for varIdx, variant := range variants {
717                                 hgvsCols := hgvsCols[variant]
718                                 for row := range cmd.cgnames {
719                                         for ph := 0; ph < 2; ph++ {
720                                                 out[row*cols+varIdx+ph] = hgvsCols[ph][row]
721                                         }
722                                 }
723                         }
724                         err = writeNumpyInt8(fmt.Sprintf("%s/hgvs.%s.npy", *outputDir, seqname), out, rows, cols)
725                         if err != nil {
726                                 return 1
727                         }
728                         out = nil
729
730                         fnm := fmt.Sprintf("%s/hgvs.%s.annotations.csv", *outputDir, seqname)
731                         log.Infof("%s: writing hgvs column labels to %s", seqname, fnm)
732                         var hgvsLabels bytes.Buffer
733                         for varIdx, variant := range variants {
734                                 fmt.Fprintf(&hgvsLabels, "%d,%s:g.%s\n", varIdx, seqname, variant.String())
735                         }
736                         err = ioutil.WriteFile(fnm, hgvsLabels.Bytes(), 0666)
737                         if err != nil {
738                                 return 1
739                         }
740                 }
741         }
742
743         if *mergeOutput || *hgvsSingle {
744                 var annow *bufio.Writer
745                 var annof *os.File
746                 if *mergeOutput {
747                         annoFilename := fmt.Sprintf("%s/matrix.annotations.csv", *outputDir)
748                         annof, err = os.Create(annoFilename)
749                         if err != nil {
750                                 return 1
751                         }
752                         annow = bufio.NewWriterSize(annof, 1<<20)
753                 }
754
755                 rows := len(cmd.cgnames)
756                 cols := 0
757                 for _, chunk := range toMerge {
758                         cols += len(chunk) / rows
759                 }
760                 log.Infof("merging output matrix (rows=%d, cols=%d, mem=%d) and annotations", rows, cols, rows*cols*2)
761                 var out []int16
762                 if *mergeOutput {
763                         out = make([]int16, rows*cols)
764                 }
765                 hgvsCols := map[string][2][]int16{} // hgvs -> [[g0,g1,g2,...], [g0,g1,g2,...]] (slice of genomes for each phase)
766                 startcol := 0
767                 for outIdx, chunk := range toMerge {
768                         chunkcols := len(chunk) / rows
769                         if *mergeOutput {
770                                 for row := 0; row < rows; row++ {
771                                         copy(out[row*cols+startcol:], chunk[row*chunkcols:(row+1)*chunkcols])
772                                 }
773                         }
774                         toMerge[outIdx] = nil
775
776                         annotationsFilename := fmt.Sprintf("%s/matrix.%04d.annotations.csv", *outputDir, outIdx)
777                         log.Infof("reading %s", annotationsFilename)
778                         buf, err := os.ReadFile(annotationsFilename)
779                         if err != nil {
780                                 return 1
781                         }
782                         if *mergeOutput {
783                                 err = os.Remove(annotationsFilename)
784                                 if err != nil {
785                                         return 1
786                                 }
787                         }
788                         for _, line := range bytes.Split(buf, []byte{'\n'}) {
789                                 if len(line) == 0 {
790                                         continue
791                                 }
792                                 fields := bytes.SplitN(line, []byte{','}, 9)
793                                 tag, _ := strconv.Atoi(string(fields[0]))
794                                 incol, _ := strconv.Atoi(string(fields[1]))
795                                 tileVariant, _ := strconv.Atoi(string(fields[2]))
796                                 hgvsID := string(fields[3])
797                                 seqname := string(fields[4])
798                                 pos, _ := strconv.Atoi(string(fields[5]))
799                                 refseq := fields[6]
800                                 if hgvsID == "" {
801                                         // Null entry for un-diffable
802                                         // tile variant
803                                         continue
804                                 }
805                                 if hgvsID == "=" {
806                                         // Null entry for ref tile
807                                         continue
808                                 }
809                                 if mask != nil && !mask.Check(strings.TrimPrefix(seqname, "chr"), pos, pos+len(refseq)) {
810                                         // The tile intersects one of
811                                         // the selected regions, but
812                                         // this particular HGVS
813                                         // variant does not.
814                                         continue
815                                 }
816                                 hgvsColPair := hgvsCols[hgvsID]
817                                 if hgvsColPair[0] == nil {
818                                         // values in new columns start
819                                         // out as -1 ("no data yet")
820                                         // or 0 ("=ref") here, may
821                                         // change to 1 ("hgvs variant
822                                         // present") below, either on
823                                         // this line or a future line.
824                                         hgvsColPair = [2][]int16{make([]int16, len(cmd.cgnames)), make([]int16, len(cmd.cgnames))}
825                                         rt, ok := reftile[tagID(tag)]
826                                         if !ok {
827                                                 err = fmt.Errorf("bug: seeing annotations for tag %d, but it has no reftile entry", tag)
828                                                 return 1
829                                         }
830                                         for ph := 0; ph < 2; ph++ {
831                                                 for row := 0; row < rows; row++ {
832                                                         v := chunk[row*chunkcols+incol*2+ph]
833                                                         if tileVariantID(v) == rt.variant {
834                                                                 hgvsColPair[ph][row] = 0
835                                                         } else {
836                                                                 hgvsColPair[ph][row] = -1
837                                                         }
838                                                 }
839                                         }
840                                         hgvsCols[hgvsID] = hgvsColPair
841                                         if annow != nil {
842                                                 hgvsref := hgvs.Variant{
843                                                         Position: pos,
844                                                         Ref:      string(refseq),
845                                                         New:      string(refseq),
846                                                 }
847                                                 fmt.Fprintf(annow, "%d,%d,%d,%s:g.%s,%s,%d,%s,%s,%s\n", tag, incol+startcol/2, rt.variant, seqname, hgvsref.String(), seqname, pos, refseq, refseq, fields[8])
848                                         }
849                                 }
850                                 if annow != nil {
851                                         fmt.Fprintf(annow, "%d,%d,%d,%s,%s,%d,%s,%s,%s\n", tag, incol+startcol/2, tileVariant, hgvsID, seqname, pos, refseq, fields[7], fields[8])
852                                 }
853                                 for ph := 0; ph < 2; ph++ {
854                                         for row := 0; row < rows; row++ {
855                                                 v := chunk[row*chunkcols+incol*2+ph]
856                                                 if int(v) == tileVariant {
857                                                         hgvsColPair[ph][row] = 1
858                                                 }
859                                         }
860                                 }
861                         }
862
863                         startcol += chunkcols
864                 }
865                 if *mergeOutput {
866                         err = annow.Flush()
867                         if err != nil {
868                                 return 1
869                         }
870                         err = annof.Close()
871                         if err != nil {
872                                 return 1
873                         }
874                         err = writeNumpyInt16(fmt.Sprintf("%s/matrix.npy", *outputDir), out, rows, cols)
875                         if err != nil {
876                                 return 1
877                         }
878                 }
879                 out = nil
880
881                 if *hgvsSingle {
882                         cols = len(hgvsCols) * 2
883                         log.Printf("building hgvs-based matrix: %d rows x %d cols", rows, cols)
884                         out = make([]int16, rows*cols)
885                         hgvsIDs := make([]string, 0, cols/2)
886                         for hgvsID := range hgvsCols {
887                                 hgvsIDs = append(hgvsIDs, hgvsID)
888                         }
889                         sort.Strings(hgvsIDs)
890                         var hgvsLabels bytes.Buffer
891                         for idx, hgvsID := range hgvsIDs {
892                                 fmt.Fprintf(&hgvsLabels, "%d,%s\n", idx, hgvsID)
893                                 for ph := 0; ph < 2; ph++ {
894                                         hgvscol := hgvsCols[hgvsID][ph]
895                                         for row, val := range hgvscol {
896                                                 out[row*cols+idx*2+ph] = val
897                                         }
898                                 }
899                         }
900                         err = writeNumpyInt16(fmt.Sprintf("%s/hgvs.npy", *outputDir), out, rows, cols)
901                         if err != nil {
902                                 return 1
903                         }
904
905                         fnm := fmt.Sprintf("%s/hgvs.annotations.csv", *outputDir)
906                         log.Printf("writing hgvs labels: %s", fnm)
907                         err = ioutil.WriteFile(fnm, hgvsLabels.Bytes(), 0777)
908                         if err != nil {
909                                 return 1
910                         }
911                 }
912         }
913         if *onehotSingle {
914                 nzCount := 0
915                 for _, part := range onehotIndirect {
916                         nzCount += len(part[0])
917                 }
918                 onehot := make([]uint32, nzCount*2) // [r,r,r,...,c,c,c,...]
919                 var xrefs []onehotXref
920                 outcol := 0
921                 for i, part := range onehotIndirect {
922                         for i := range part[1] {
923                                 part[1][i] += uint32(outcol)
924                         }
925                         copy(onehot[outcol:], part[0])
926                         copy(onehot[outcol+nzCount:], part[1])
927                         outcol += len(part[0])
928                         xrefs = append(xrefs, onehotXrefs[i]...)
929
930                         part[0] = nil
931                         part[1] = nil
932                         onehotXrefs[i] = nil
933                         debug.FreeOSMemory()
934                 }
935                 fnm := fmt.Sprintf("%s/onehot.npy", *outputDir)
936                 err = writeNumpyUint32(fnm, onehot, 2, nzCount)
937                 if err != nil {
938                         return 1
939                 }
940                 fnm = fmt.Sprintf("%s/onehot-columns.npy", *outputDir)
941                 err = writeNumpyInt32(fnm, onehotXref2int32(xrefs), 4, len(xrefs))
942                 if err != nil {
943                         return 1
944                 }
945         }
946         return 0
947 }
948
949 // Read case/control files, remove non-case/control entries from
950 // cmd.cgnames, and build cmd.chi2Cases.
951 func (cmd *sliceNumpy) useCaseControlFiles() error {
952         if cmd.chi2CaseControlFile == "" {
953                 return nil
954         }
955         infiles, err := allFiles(cmd.chi2CaseControlFile, nil)
956         if err != nil {
957                 return err
958         }
959         // index in cmd.cgnames => case(true) / control(false)
960         cc := map[int]bool{}
961         for _, infile := range infiles {
962                 f, err := open(infile)
963                 if err != nil {
964                         return err
965                 }
966                 buf, err := io.ReadAll(f)
967                 f.Close()
968                 if err != nil {
969                         return err
970                 }
971                 ccCol := -1
972                 for _, tsv := range bytes.Split(buf, []byte{'\n'}) {
973                         if len(tsv) == 0 {
974                                 continue
975                         }
976                         split := strings.Split(string(tsv), "\t")
977                         if ccCol < 0 {
978                                 // header row
979                                 for col, name := range split {
980                                         if name == cmd.chi2CaseControlColumn {
981                                                 ccCol = col
982                                                 break
983                                         }
984                                 }
985                                 if ccCol < 0 {
986                                         return fmt.Errorf("%s: no column named %q in header row %q", infile, cmd.chi2CaseControlColumn, tsv)
987                                 }
988                                 continue
989                         }
990                         if len(split) <= ccCol {
991                                 continue
992                         }
993                         pattern := split[0]
994                         found := -1
995                         for i, name := range cmd.cgnames {
996                                 if strings.Contains(name, pattern) {
997                                         if found >= 0 {
998                                                 log.Warnf("pattern %q in %s matches multiple genome IDs (%qs, %q)", pattern, infile, cmd.cgnames[found], name)
999                                         }
1000                                         found = i
1001                                 }
1002                         }
1003                         if found < 0 {
1004                                 log.Warnf("pattern %q in %s does not match any genome IDs", pattern, infile)
1005                                 continue
1006                         }
1007                         if split[ccCol] == "0" {
1008                                 cc[found] = false
1009                         }
1010                         if split[ccCol] == "1" {
1011                                 cc[found] = true
1012                         }
1013                 }
1014         }
1015         allnames := cmd.cgnames
1016         cmd.cgnames = nil
1017         cmd.chi2Cases = nil
1018         ncases := 0
1019         for i, name := range allnames {
1020                 if cc, ok := cc[i]; ok {
1021                         cmd.cgnames = append(cmd.cgnames, name)
1022                         cmd.chi2Cases = append(cmd.chi2Cases, cc)
1023                         if cc {
1024                                 ncases++
1025                         }
1026                 }
1027         }
1028         log.Printf("%d cases, %d controls, %d neither (dropped)", ncases, len(cmd.cgnames)-ncases, len(allnames)-len(cmd.cgnames))
1029         return nil
1030 }
1031
1032 func (cmd *sliceNumpy) filterHGVScolpair(colpair [2][]int8) bool {
1033         if cmd.chi2PValue >= 1 {
1034                 return true
1035         }
1036         col0 := make([]bool, 0, len(cmd.chi2Cases))
1037         col1 := make([]bool, 0, len(cmd.chi2Cases))
1038         cases := make([]bool, 0, len(cmd.chi2Cases))
1039         for i, c := range cmd.chi2Cases {
1040                 if colpair[0][i] < 0 {
1041                         continue
1042                 }
1043                 col0 = append(col0, colpair[0][i] != 0)
1044                 col1 = append(col1, colpair[1][i] != 0)
1045                 cases = append(cases, c)
1046         }
1047         return len(cases) >= cmd.minCoverage &&
1048                 (pvalue(col0, cases) <= cmd.chi2PValue || pvalue(col1, cases) <= cmd.chi2PValue)
1049 }
1050
1051 func writeNumpyUint32(fnm string, out []uint32, rows, cols int) error {
1052         output, err := os.Create(fnm)
1053         if err != nil {
1054                 return err
1055         }
1056         defer output.Close()
1057         bufw := bufio.NewWriterSize(output, 1<<26)
1058         npw, err := gonpy.NewWriter(nopCloser{bufw})
1059         if err != nil {
1060                 return err
1061         }
1062         log.WithFields(log.Fields{
1063                 "filename": fnm,
1064                 "rows":     rows,
1065                 "cols":     cols,
1066                 "bytes":    rows * cols * 4,
1067         }).Infof("writing numpy: %s", fnm)
1068         npw.Shape = []int{rows, cols}
1069         npw.WriteUint32(out)
1070         err = bufw.Flush()
1071         if err != nil {
1072                 return err
1073         }
1074         return output.Close()
1075 }
1076
1077 func writeNumpyInt32(fnm string, out []int32, rows, cols int) error {
1078         output, err := os.Create(fnm)
1079         if err != nil {
1080                 return err
1081         }
1082         defer output.Close()
1083         bufw := bufio.NewWriterSize(output, 1<<26)
1084         npw, err := gonpy.NewWriter(nopCloser{bufw})
1085         if err != nil {
1086                 return err
1087         }
1088         log.WithFields(log.Fields{
1089                 "filename": fnm,
1090                 "rows":     rows,
1091                 "cols":     cols,
1092                 "bytes":    rows * cols * 4,
1093         }).Infof("writing numpy: %s", fnm)
1094         npw.Shape = []int{rows, cols}
1095         npw.WriteInt32(out)
1096         err = bufw.Flush()
1097         if err != nil {
1098                 return err
1099         }
1100         return output.Close()
1101 }
1102
1103 func writeNumpyInt16(fnm string, out []int16, rows, cols int) error {
1104         output, err := os.Create(fnm)
1105         if err != nil {
1106                 return err
1107         }
1108         defer output.Close()
1109         bufw := bufio.NewWriterSize(output, 1<<26)
1110         npw, err := gonpy.NewWriter(nopCloser{bufw})
1111         if err != nil {
1112                 return err
1113         }
1114         log.WithFields(log.Fields{
1115                 "filename": fnm,
1116                 "rows":     rows,
1117                 "cols":     cols,
1118                 "bytes":    rows * cols * 2,
1119         }).Infof("writing numpy: %s", fnm)
1120         npw.Shape = []int{rows, cols}
1121         npw.WriteInt16(out)
1122         err = bufw.Flush()
1123         if err != nil {
1124                 return err
1125         }
1126         return output.Close()
1127 }
1128
1129 func writeNumpyInt8(fnm string, out []int8, rows, cols int) error {
1130         output, err := os.Create(fnm)
1131         if err != nil {
1132                 return err
1133         }
1134         defer output.Close()
1135         bufw := bufio.NewWriterSize(output, 1<<26)
1136         npw, err := gonpy.NewWriter(nopCloser{bufw})
1137         if err != nil {
1138                 return err
1139         }
1140         log.WithFields(log.Fields{
1141                 "filename": fnm,
1142                 "rows":     rows,
1143                 "cols":     cols,
1144                 "bytes":    rows * cols,
1145         }).Infof("writing numpy: %s", fnm)
1146         npw.Shape = []int{rows, cols}
1147         npw.WriteInt8(out)
1148         err = bufw.Flush()
1149         if err != nil {
1150                 return err
1151         }
1152         return output.Close()
1153 }
1154
1155 func allele2homhet(colpair [2][]int8) {
1156         a, b := colpair[0], colpair[1]
1157         for i, av := range a {
1158                 bv := b[i]
1159                 if av < 0 || bv < 0 {
1160                         // no-call
1161                         a[i], b[i] = -1, -1
1162                 } else if av > 0 && bv > 0 {
1163                         // hom
1164                         a[i], b[i] = 1, 0
1165                 } else if av > 0 || bv > 0 {
1166                         // het
1167                         a[i], b[i] = 0, 1
1168                 } else {
1169                         // ref (or a different variant in same position)
1170                         // (this is a no-op) a[i], b[i] = 0, 0
1171                 }
1172         }
1173 }
1174
1175 type onehotXref struct {
1176         tag     tagID
1177         variant tileVariantID
1178         het     bool
1179         pvalue  float64
1180 }
1181
1182 const onehotXrefSize = unsafe.Sizeof(onehotXref{})
1183
1184 // Build onehot matrix (m[variant*2+isHet][genome] == 0 or 1) for all
1185 // variants of a single tile/tag#.
1186 //
1187 // Return nil if no tile variant passes Χ² filter.
1188 func (cmd *sliceNumpy) tv2homhet(cgs map[string]CompactGenome, maxv tileVariantID, remap []tileVariantID, tag, chunkstarttag tagID) ([][]int8, []onehotXref) {
1189         if maxv < 2 {
1190                 // everyone has the most common variant
1191                 return nil, nil
1192         }
1193         tagoffset := tag - chunkstarttag
1194         coverage := 0
1195         for _, cg := range cgs {
1196                 if cg.Variants[tagoffset*2] > 0 && cg.Variants[tagoffset*2+1] > 0 {
1197                         coverage++
1198                 }
1199         }
1200         if coverage < cmd.minCoverage {
1201                 return nil, nil
1202         }
1203         obs := make([][]bool, (maxv+1)*2) // 2 slices (hom + het) for each variant#
1204         for i := range obs {
1205                 obs[i] = make([]bool, len(cmd.cgnames))
1206         }
1207         for cgid, name := range cmd.cgnames {
1208                 cgvars := cgs[name].Variants
1209                 for v := tileVariantID(2); v <= maxv; v++ {
1210                         if remap[cgvars[tagoffset*2]] == v && remap[cgvars[tagoffset*2+1]] == v {
1211                                 obs[v*2][cgid] = true
1212                         } else if remap[cgvars[tagoffset*2]] == v || remap[cgvars[tagoffset*2+1]] == v {
1213                                 obs[v*2+1][cgid] = true
1214                         }
1215                 }
1216         }
1217         var onehot [][]int8
1218         var xref []onehotXref
1219         for homcol := 4; homcol < len(obs); homcol += 2 {
1220                 p := [2]float64{
1221                         pvalue(obs[homcol], cmd.chi2Cases),
1222                         pvalue(obs[homcol+1], cmd.chi2Cases),
1223                 }
1224                 if cmd.chi2PValue < 1 && !(p[0] < cmd.chi2PValue || p[1] < cmd.chi2PValue) {
1225                         continue
1226                 }
1227                 for het := 0; het < 2; het++ {
1228                         onehot = append(onehot, bool2int8(obs[homcol+het]))
1229                         xref = append(xref, onehotXref{
1230                                 tag:     tag,
1231                                 variant: tileVariantID(homcol / 2),
1232                                 het:     het == 1,
1233                                 pvalue:  p[het],
1234                         })
1235                 }
1236         }
1237         return onehot, xref
1238 }
1239
1240 func bool2int8(in []bool) []int8 {
1241         out := make([]int8, len(in))
1242         for i, v := range in {
1243                 if v {
1244                         out[i] = 1
1245                 }
1246         }
1247         return out
1248 }
1249
1250 // convert a []onehotXref with length N to a numpy-style []int32
1251 // matrix with N columns, one row per field of onehotXref struct.
1252 //
1253 // Hom/het row contains hom=0, het=1.
1254 //
1255 // P-value row contains 1000000x actual p-value.
1256 func onehotXref2int32(xrefs []onehotXref) []int32 {
1257         xcols := len(xrefs)
1258         xdata := make([]int32, 4*xcols)
1259         for i, xref := range xrefs {
1260                 xdata[i] = int32(xref.tag)
1261                 xdata[xcols+i] = int32(xref.variant)
1262                 if xref.het {
1263                         xdata[xcols*2+i] = 1
1264                 }
1265                 xdata[xcols*3+i] = int32(xref.pvalue * 1000000)
1266         }
1267         return xdata
1268 }
1269
1270 // transpose onehot data from in[col][row] to numpy-style
1271 // out[row*cols+col].
1272 func onehotcols2int8(in [][]int8) []int8 {
1273         if len(in) == 0 {
1274                 return nil
1275         }
1276         cols := len(in)
1277         rows := len(in[0])
1278         out := make([]int8, rows*cols)
1279         for row := 0; row < rows; row++ {
1280                 outrow := out[row*cols:]
1281                 for col, incol := range in {
1282                         outrow[col] = incol[row]
1283                 }
1284         }
1285         return out
1286 }
1287
1288 // Return [2][]uint32{rowIndices, colIndices} indicating which
1289 // elements of matrixT[c][r] have non-zero values.
1290 func onehotChunk2Indirect(matrixT [][]int8) [2][]uint32 {
1291         var nz [2][]uint32
1292         for c, col := range matrixT {
1293                 for r, val := range col {
1294                         if val != 0 {
1295                                 nz[0] = append(nz[0], uint32(r))
1296                                 nz[1] = append(nz[1], uint32(c))
1297                         }
1298                 }
1299         }
1300         return nz
1301 }