1 // Copyright (C) The Lightning Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
27 "git.arvados.org/arvados.git/sdk/go/arvados"
28 "github.com/arvados/lightning/hgvs"
29 "github.com/kshedden/gonpy"
30 log "github.com/sirupsen/logrus"
31 "golang.org/x/crypto/blake2b"
34 type sliceNumpy struct {
44 func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
48 fmt.Fprintf(stderr, "%s\n", err)
51 flags := flag.NewFlagSet("", flag.ContinueOnError)
52 flags.SetOutput(stderr)
53 pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
54 runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
55 projectUUID := flags.String("project", "", "project `UUID` for output data")
56 priority := flags.Int("priority", 500, "container request priority")
57 inputDir := flags.String("input-dir", "./in", "input `directory`")
58 outputDir := flags.String("output-dir", "./out", "output `directory`")
59 ref := flags.String("ref", "", "reference name (if blank, choose last one that appears in input)")
60 regionsFilename := flags.String("regions", "", "only output columns/annotations that intersect regions in specified bed `file`")
61 expandRegions := flags.Int("expand-regions", 0, "expand specified regions by `N` base pairs on each side`")
62 mergeOutput := flags.Bool("merge-output", false, "merge output into one matrix.npy and one matrix.annotations.csv")
63 hgvsSingle := flags.Bool("single-hgvs-matrix", false, "also generate hgvs-based matrix")
64 hgvsChunked := flags.Bool("chunked-hgvs-matrix", false, "also generate hgvs-based matrix per chromosome")
65 onehotChunked := flags.Bool("chunked-onehot", false, "generate one-hot tile-based matrix")
66 flags.IntVar(&cmd.threads, "threads", 16, "number of memory-hungry assembly threads")
67 flags.StringVar(&cmd.chi2CasesFile, "chi2-cases-file", "", "text file indicating positive cases (for Χ² test)")
68 flags.Float64Var(&cmd.chi2PValue, "chi2-p-value", 1, "do Χ² test and omit columns with p-value above this threshold")
69 cmd.filter.Flags(flags)
70 err = flags.Parse(args)
71 if err == flag.ErrHelp {
74 } else if err != nil {
80 log.Println(http.ListenAndServe(*pprof, nil))
84 if cmd.chi2CasesFile == "" && cmd.chi2PValue != 1 {
85 log.Errorf("cannot use provided -chi2-p-value=%f because -chi2-cases-file= value is empty", cmd.chi2PValue)
90 runner := arvadosContainerRunner{
91 Name: "lightning slice-numpy",
92 Client: arvados.NewClientFromEnv(),
93 ProjectUUID: *projectUUID,
100 err = runner.TranslatePaths(inputDir, regionsFilename, &cmd.chi2CasesFile)
104 runner.Args = []string{"slice-numpy", "-local=true",
106 "-input-dir=" + *inputDir,
107 "-output-dir=/mnt/output",
108 "-threads=" + fmt.Sprintf("%d", cmd.threads),
109 "-regions=" + *regionsFilename,
110 "-expand-regions=" + fmt.Sprintf("%d", *expandRegions),
111 "-merge-output=" + fmt.Sprintf("%v", *mergeOutput),
112 "-single-hgvs-matrix=" + fmt.Sprintf("%v", *hgvsSingle),
113 "-chunked-hgvs-matrix=" + fmt.Sprintf("%v", *hgvsChunked),
114 "-chunked-onehot=" + fmt.Sprintf("%v", *onehotChunked),
115 "-chi2-cases-file=" + cmd.chi2CasesFile,
116 "-chi2-p-value=" + fmt.Sprintf("%f", cmd.chi2PValue),
118 runner.Args = append(runner.Args, cmd.filter.Args()...)
120 output, err = runner.Run()
124 fmt.Fprintln(stdout, output)
128 infiles, err := allGobFiles(*inputDir)
132 if len(infiles) == 0 {
133 err = fmt.Errorf("no input files found in %s", *inputDir)
136 sort.Strings(infiles)
138 var refseq map[string][]tileLibRef
139 var reftiledata = make(map[tileLibRef][]byte, 11000000)
140 in0, err := open(infiles[0])
145 matchGenome, err := regexp.Compile(cmd.filter.MatchGenome)
147 err = fmt.Errorf("-match-genome: invalid regexp: %q", cmd.filter.MatchGenome)
153 DecodeLibrary(in0, strings.HasSuffix(infiles[0], ".gz"), func(ent *LibraryEntry) error {
154 if len(ent.TagSet) > 0 {
155 taglen = len(ent.TagSet[0])
157 for _, cseq := range ent.CompactSequences {
158 if cseq.Name == *ref || *ref == "" {
159 refseq = cseq.TileSequences
162 for _, cg := range ent.CompactGenomes {
163 if matchGenome.MatchString(cg.Name) {
164 cmd.cgnames = append(cmd.cgnames, cg.Name)
167 for _, tv := range ent.TileVariants {
169 reftiledata[tileLibRef{tv.Tag, tv.Variant}] = tv.Sequence
179 err = fmt.Errorf("%s: reference sequence not found", infiles[0])
183 err = fmt.Errorf("tagset not found")
186 if len(cmd.cgnames) == 0 {
187 err = fmt.Errorf("no genomes found matching regexp %q", cmd.filter.MatchGenome)
190 sort.Strings(cmd.cgnames)
192 cmd.minCoverage = int(math.Ceil(cmd.filter.MinCoverage * float64(len(cmd.cgnames))))
194 if cmd.chi2CasesFile != "" {
195 f, err2 := open(cmd.chi2CasesFile)
200 buf, err2 := io.ReadAll(f)
206 cmd.chi2Cases = make([]bool, len(cmd.cgnames))
208 for _, pattern := range bytes.Split(buf, []byte{'\n'}) {
209 if len(pattern) == 0 {
212 pattern := string(pattern)
214 for i, name := range cmd.cgnames {
215 if !strings.Contains(name, pattern) {
218 cmd.chi2Cases[i] = true
221 log.Warnf("pattern %q in cases file matches multiple genome IDs: %q, %q", pattern, cmd.cgnames[idx], name)
227 log.Warnf("pattern %q in cases file does not match any genome IDs", pattern)
231 log.Printf("%d cases, %d controls", ncases, len(cmd.cgnames)-ncases)
235 labelsFilename := *outputDir + "/labels.csv"
236 log.Infof("writing labels to %s", labelsFilename)
238 f, err = os.Create(labelsFilename)
243 for i, name := range cmd.cgnames {
244 _, err = fmt.Fprintf(f, "%d,%q\n", i, trimFilenameForLabel(name))
246 err = fmt.Errorf("write %s: %w", labelsFilename, err)
252 err = fmt.Errorf("close %s: %w", labelsFilename, err)
257 log.Info("indexing reference tiles")
258 type reftileinfo struct {
259 variant tileVariantID
260 seqname string // chr1
261 pos int // distance from start of chromosome to starttag
262 tiledata []byte // acgtggcaa...
264 isdup := map[tagID]bool{}
265 reftile := map[tagID]*reftileinfo{}
266 for seqname, cseq := range refseq {
268 for _, libref := range cseq {
269 tiledata := reftiledata[libref]
270 if len(tiledata) == 0 {
271 err = fmt.Errorf("missing tiledata for tag %d variant %d in %s in ref", libref.Tag, libref.Variant, seqname)
274 if isdup[libref.Tag] {
275 log.Printf("dropping reference tile %+v from %s @ %d, tag not unique", libref, seqname, pos)
276 } else if reftile[libref.Tag] != nil {
277 log.Printf("dropping reference tile %+v from %s @ %d, tag not unique", tileLibRef{Tag: libref.Tag, Variant: reftile[libref.Tag].variant}, reftile[libref.Tag].seqname, reftile[libref.Tag].pos)
278 delete(reftile, libref.Tag)
279 log.Printf("dropping reference tile %+v from %s @ %d, tag not unique", libref, seqname, pos)
280 isdup[libref.Tag] = true
282 reftile[libref.Tag] = &reftileinfo{
284 variant: libref.Variant,
289 pos += len(tiledata) - taglen
291 log.Printf("... %s done, len %d", seqname, pos+taglen)
295 if *regionsFilename != "" {
296 log.Printf("loading regions from %s", *regionsFilename)
297 mask, err = makeMask(*regionsFilename, *expandRegions)
301 log.Printf("before applying mask, len(reftile) == %d", len(reftile))
302 log.Printf("deleting reftile entries for regions outside %d intervals", mask.Len())
303 for tag, rt := range reftile {
304 if !mask.Check(strings.TrimPrefix(rt.seqname, "chr"), rt.pos, rt.pos+len(rt.tiledata)) {
308 log.Printf("after applying mask, len(reftile) == %d", len(reftile))
311 type hgvsColSet map[hgvs.Variant][2][]int8
312 encodeHGVS := throttle{Max: len(refseq)}
313 encodeHGVSTodo := map[string]chan hgvsColSet{}
314 tmpHGVSCols := map[string]*os.File{}
316 for seqname := range refseq {
318 f, err = os.Create(*outputDir + "/tmp." + seqname + ".gob")
322 defer os.Remove(f.Name())
323 bufw := bufio.NewWriterSize(f, 1<<24)
324 enc := gob.NewEncoder(bufw)
325 tmpHGVSCols[seqname] = f
326 todo := make(chan hgvsColSet, 128)
327 encodeHGVSTodo[seqname] = todo
328 encodeHGVS.Go(func() error {
329 for colset := range todo {
330 err := enc.Encode(colset)
332 encodeHGVS.Report(err)
343 var toMerge [][]int16
344 if *mergeOutput || *hgvsSingle {
345 toMerge = make([][]int16, len(infiles))
348 throttleMem := throttle{Max: cmd.threads} // TODO: estimate using mem and data size
349 throttleNumpyMem := throttle{Max: cmd.threads/2 + 1}
350 log.Info("generating annotations and numpy matrix for each slice")
352 for infileIdx, infile := range infiles {
353 infileIdx, infile := infileIdx, infile
354 throttleMem.Go(func() error {
355 seq := make(map[tagID][]TileVariant, 50000)
356 cgs := make(map[string]CompactGenome, len(cmd.cgnames))
357 f, err := open(infile)
362 log.Infof("%04d: reading %s", infileIdx, infile)
363 err = DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error {
364 for _, tv := range ent.TileVariants {
368 if mask != nil && reftile[tv.Tag] == nil {
374 variants := seq[tv.Tag]
375 if len(variants) == 0 {
376 variants = make([]TileVariant, 100)
378 for len(variants) <= int(tv.Variant) {
379 variants = append(variants, TileVariant{})
381 variants[int(tv.Variant)] = tv
382 seq[tv.Tag] = variants
384 for _, cg := range ent.CompactGenomes {
385 if !matchGenome.MatchString(cg.Name) {
388 // pad to full slice size
389 // to avoid out-of-bounds
391 if sliceSize := 2 * int(cg.EndTag-cg.StartTag); len(cg.Variants) < sliceSize {
392 cg.Variants = append(cg.Variants, make([]tileVariantID, sliceSize-len(cg.Variants))...)
401 tagstart := cgs[cmd.cgnames[0]].StartTag
402 tagend := cgs[cmd.cgnames[0]].EndTag
406 log.Infof("%04d: renumber/dedup variants for tags %d-%d", infileIdx, tagstart, tagend)
407 variantRemap := make([][]tileVariantID, tagend-tagstart)
408 throttleCPU := throttle{Max: runtime.GOMAXPROCS(0)}
409 for tag, variants := range seq {
410 tag, variants := tag, variants
411 throttleCPU.Acquire()
413 defer throttleCPU.Release()
414 count := make(map[[blake2b.Size256]byte]int, len(variants))
418 count[blake2b.Sum256(rt.tiledata)] = 0
421 for _, cg := range cgs {
422 idx := int(tag-tagstart) * 2
423 for allele := 0; allele < 2; allele++ {
424 v := cg.Variants[idx+allele]
425 if v > 0 && len(variants[v].Sequence) > 0 {
426 count[variants[v].Blake2b]++
430 // hash[i] will be the hash of
431 // the variant(s) that should
432 // be at rank i (0-based).
433 hash := make([][blake2b.Size256]byte, 0, len(count))
434 for b := range count {
435 hash = append(hash, b)
437 sort.Slice(hash, func(i, j int) bool {
438 bi, bj := &hash[i], &hash[j]
439 if ci, cj := count[*bi], count[*bj]; ci != cj {
442 return bytes.Compare((*bi)[:], (*bj)[:]) < 0
445 // rank[b] will be the 1-based
446 // new variant number for
447 // variants whose hash is b.
448 rank := make(map[[blake2b.Size256]byte]tileVariantID, len(hash))
449 for i, h := range hash {
450 rank[h] = tileVariantID(i + 1)
452 // remap[v] will be the new
453 // variant number for original
455 remap := make([]tileVariantID, len(variants))
456 for i, tv := range variants {
457 remap[i] = rank[tv.Blake2b]
459 variantRemap[tag-tagstart] = remap
461 rt.variant = rank[blake2b.Sum256(rt.tiledata)]
467 var onehotChunk [][]int8
468 var onehotXrefs []onehotXref
470 annotationsFilename := fmt.Sprintf("%s/matrix.%04d.annotations.csv", *outputDir, infileIdx)
471 log.Infof("%04d: writing %s", infileIdx, annotationsFilename)
472 annof, err := os.Create(annotationsFilename)
476 annow := bufio.NewWriterSize(annof, 1<<20)
478 for tag := tagstart; tag < tagend; tag++ {
479 rt, ok := reftile[tag]
484 // Excluded by specified
485 // regions, or reference does
486 // not use any variant of this
487 // tile. (TODO: log this?
488 // mention it in annotations?)
491 remap := variantRemap[tag-tagstart]
492 maxv := tileVariantID(0)
493 for _, v := range remap {
499 onehot, xrefs := cmd.tv2homhet(cgs, maxv, remap, tag, tagstart)
500 onehotChunk = append(onehotChunk, onehot...)
501 onehotXrefs = append(onehotXrefs, xrefs...)
503 fmt.Fprintf(annow, "%d,%d,%d,=,%s,%d,,,\n", tag, outcol, rt.variant, rt.seqname, rt.pos)
505 reftilestr := strings.ToUpper(string(rt.tiledata))
507 done := make([]bool, maxv+1)
508 variantDiffs := make([][]hgvs.Variant, maxv+1)
509 for v, tv := range variants {
511 if v == rt.variant || done[v] {
516 if len(tv.Sequence) < taglen || !bytes.HasSuffix(rt.tiledata, tv.Sequence[len(tv.Sequence)-taglen:]) {
517 fmt.Fprintf(annow, "%d,%d,%d,,%s,%d,,,\n", tag, outcol, v, rt.seqname, rt.pos)
520 if lendiff := len(rt.tiledata) - len(tv.Sequence); lendiff < -1000 || lendiff > 1000 {
521 fmt.Fprintf(annow, "%d,%d,%d,,%s,%d,,,\n", tag, outcol, v, rt.seqname, rt.pos)
524 diffs, _ := hgvs.Diff(reftilestr, strings.ToUpper(string(tv.Sequence)), 0)
525 for i := range diffs {
526 diffs[i].Position += rt.pos
528 for _, diff := range diffs {
529 fmt.Fprintf(annow, "%d,%d,%d,%s:g.%s,%s,%d,%s,%s,%s\n", tag, outcol, v, rt.seqname, diff.String(), rt.seqname, diff.Position, diff.Ref, diff.New, diff.Left)
532 variantDiffs[v] = diffs
536 // We can now determine, for each HGVS
537 // variant (diff) in this reftile
538 // region, whether a given genome
539 // phase/allele (1) has the variant, (0) has
540 // =ref or a different variant in that
541 // position, or (-1) is lacking
542 // coverage / couldn't be diffed.
543 hgvsCol := hgvsColSet{}
544 for _, diffs := range variantDiffs {
545 for _, diff := range diffs {
546 if _, ok := hgvsCol[diff]; ok {
549 hgvsCol[diff] = [2][]int8{
550 make([]int8, len(cmd.cgnames)),
551 make([]int8, len(cmd.cgnames)),
555 for row, name := range cmd.cgnames {
556 variants := cgs[name].Variants[(tag-tagstart)*2:]
557 for ph := 0; ph < 2; ph++ {
559 if int(v) >= len(remap) {
565 // hgvsCol[*][ph][row] is already 0
566 } else if len(variantDiffs[v]) == 0 {
567 // lacking coverage / couldn't be diffed
568 for _, col := range hgvsCol {
572 for _, diff := range variantDiffs[v] {
573 hgvsCol[diff][ph][row] = 1
578 for diff, colpair := range hgvsCol {
579 allele2homhet(colpair)
580 if !cmd.filterHGVScolpair(colpair) {
581 delete(hgvsCol, diff)
584 if len(hgvsCol) > 0 {
585 encodeHGVSTodo[rt.seqname] <- hgvsCol
600 // transpose onehotChunk[col][row] to numpy[row*ncols+col]
601 rows := len(cmd.cgnames)
602 cols := len(onehotChunk)
603 log.Infof("%04d: preparing onehot numpy (rows=%d, cols=%d, mem=%d)", infileIdx, rows, cols, rows*cols)
604 throttleNumpyMem.Acquire()
605 out := make([]int8, rows*cols)
606 for row := range cmd.cgnames {
607 out := out[row*cols:]
608 for colnum, values := range onehotChunk {
609 out[colnum] = values[row]
615 throttleNumpyMem.Release()
617 fnm := fmt.Sprintf("%s/onehot.%04d.npy", *outputDir, infileIdx)
618 err = writeNumpyInt8(fnm, out, rows, cols)
623 fnm = fmt.Sprintf("%s/onehot-columns.%04d.npy", *outputDir, infileIdx)
624 xcols := len(onehotXrefs)
625 xdata := make([]int32, 4*xcols)
626 for i, xref := range onehotXrefs {
627 xdata[i] = int32(xref.tag)
628 xdata[xcols+i] = int32(xref.variant)
632 xdata[xcols*3+i] = int32(xref.pvalue * 1000000)
634 err = writeNumpyInt32(fnm, xdata, 4, xcols)
639 if !*onehotChunked || *mergeOutput || *hgvsSingle {
640 log.Infof("%04d: preparing numpy", infileIdx)
641 throttleNumpyMem.Acquire()
642 rows := len(cmd.cgnames)
644 out := make([]int16, rows*cols)
645 for row, name := range cmd.cgnames {
646 out := out[row*cols:]
648 for col, v := range cgs[name].Variants {
649 tag := tagstart + tagID(col/2)
650 if mask != nil && reftile[tag] == nil {
653 if variants, ok := seq[tag]; ok && len(variants) > int(v) && len(variants[v].Sequence) > 0 {
654 out[outcol] = int16(variantRemap[tag-tagstart][v])
664 throttleNumpyMem.Release()
665 if *mergeOutput || *hgvsSingle {
666 log.Infof("%04d: matrix fragment %d rows x %d cols", infileIdx, rows, cols)
667 toMerge[infileIdx] = out
669 if !*mergeOutput && !*onehotChunked {
670 fnm := fmt.Sprintf("%s/matrix.%04d.npy", *outputDir, infileIdx)
671 err = writeNumpyInt16(fnm, out, rows, cols)
678 log.Infof("%s: done (%d/%d)", infile, int(atomic.AddInt64(&done, 1)), len(infiles))
682 if err = throttleMem.Wait(); err != nil {
687 log.Info("flushing hgvsCols temp files")
688 for seqname := range refseq {
689 close(encodeHGVSTodo[seqname])
691 err = encodeHGVS.Wait()
695 for seqname := range refseq {
696 log.Infof("%s: reading hgvsCols from temp file", seqname)
697 f := tmpHGVSCols[seqname]
698 _, err = f.Seek(0, io.SeekStart)
702 var hgvsCols hgvsColSet
703 dec := gob.NewDecoder(bufio.NewReaderSize(f, 1<<24))
705 err = dec.Decode(&hgvsCols)
710 log.Infof("%s: sorting %d hgvs variants", seqname, len(hgvsCols))
711 variants := make([]hgvs.Variant, 0, len(hgvsCols))
712 for v := range hgvsCols {
713 variants = append(variants, v)
715 sort.Slice(variants, func(i, j int) bool {
716 vi, vj := &variants[i], &variants[j]
717 if vi.Position != vj.Position {
718 return vi.Position < vj.Position
719 } else if vi.Ref != vj.Ref {
720 return vi.Ref < vj.Ref
722 return vi.New < vj.New
725 rows := len(cmd.cgnames)
726 cols := len(variants) * 2
727 log.Infof("%s: building hgvs matrix (rows=%d, cols=%d, mem=%d)", seqname, rows, cols, rows*cols)
728 out := make([]int8, rows*cols)
729 for varIdx, variant := range variants {
730 hgvsCols := hgvsCols[variant]
731 for row := range cmd.cgnames {
732 for ph := 0; ph < 2; ph++ {
733 out[row*cols+varIdx+ph] = hgvsCols[ph][row]
737 err = writeNumpyInt8(fmt.Sprintf("%s/hgvs.%s.npy", *outputDir, seqname), out, rows, cols)
743 fnm := fmt.Sprintf("%s/hgvs.%s.annotations.csv", *outputDir, seqname)
744 log.Infof("%s: writing hgvs column labels to %s", seqname, fnm)
745 var hgvsLabels bytes.Buffer
746 for varIdx, variant := range variants {
747 fmt.Fprintf(&hgvsLabels, "%d,%s:g.%s\n", varIdx, seqname, variant.String())
749 err = ioutil.WriteFile(fnm, hgvsLabels.Bytes(), 0666)
756 if *mergeOutput || *hgvsSingle {
757 var annow *bufio.Writer
760 annoFilename := fmt.Sprintf("%s/matrix.annotations.csv", *outputDir)
761 annof, err = os.Create(annoFilename)
765 annow = bufio.NewWriterSize(annof, 1<<20)
768 rows := len(cmd.cgnames)
770 for _, chunk := range toMerge {
771 cols += len(chunk) / rows
773 log.Infof("merging output matrix (rows=%d, cols=%d, mem=%d) and annotations", rows, cols, rows*cols*2)
776 out = make([]int16, rows*cols)
778 hgvsCols := map[string][2][]int16{} // hgvs -> [[g0,g1,g2,...], [g0,g1,g2,...]] (slice of genomes for each phase)
780 for outIdx, chunk := range toMerge {
781 chunkcols := len(chunk) / rows
783 for row := 0; row < rows; row++ {
784 copy(out[row*cols+startcol:], chunk[row*chunkcols:(row+1)*chunkcols])
787 toMerge[outIdx] = nil
789 annotationsFilename := fmt.Sprintf("%s/matrix.%04d.annotations.csv", *outputDir, outIdx)
790 log.Infof("reading %s", annotationsFilename)
791 buf, err := os.ReadFile(annotationsFilename)
796 err = os.Remove(annotationsFilename)
801 for _, line := range bytes.Split(buf, []byte{'\n'}) {
805 fields := bytes.SplitN(line, []byte{','}, 9)
806 tag, _ := strconv.Atoi(string(fields[0]))
807 incol, _ := strconv.Atoi(string(fields[1]))
808 tileVariant, _ := strconv.Atoi(string(fields[2]))
809 hgvsID := string(fields[3])
810 seqname := string(fields[4])
811 pos, _ := strconv.Atoi(string(fields[5]))
814 // Null entry for un-diffable
819 // Null entry for ref tile
822 if mask != nil && !mask.Check(strings.TrimPrefix(seqname, "chr"), pos, pos+len(refseq)) {
823 // The tile intersects one of
824 // the selected regions, but
825 // this particular HGVS
829 hgvsColPair := hgvsCols[hgvsID]
830 if hgvsColPair[0] == nil {
831 // values in new columns start
832 // out as -1 ("no data yet")
833 // or 0 ("=ref") here, may
834 // change to 1 ("hgvs variant
835 // present") below, either on
836 // this line or a future line.
837 hgvsColPair = [2][]int16{make([]int16, len(cmd.cgnames)), make([]int16, len(cmd.cgnames))}
838 rt, ok := reftile[tagID(tag)]
840 err = fmt.Errorf("bug: seeing annotations for tag %d, but it has no reftile entry", tag)
843 for ph := 0; ph < 2; ph++ {
844 for row := 0; row < rows; row++ {
845 v := chunk[row*chunkcols+incol*2+ph]
846 if tileVariantID(v) == rt.variant {
847 hgvsColPair[ph][row] = 0
849 hgvsColPair[ph][row] = -1
853 hgvsCols[hgvsID] = hgvsColPair
855 hgvsref := hgvs.Variant{
860 fmt.Fprintf(annow, "%d,%d,%d,%s:g.%s,%s,%d,%s,%s,%s\n", tag, incol+startcol/2, rt.variant, seqname, hgvsref.String(), seqname, pos, refseq, refseq, fields[8])
864 fmt.Fprintf(annow, "%d,%d,%d,%s,%s,%d,%s,%s,%s\n", tag, incol+startcol/2, tileVariant, hgvsID, seqname, pos, refseq, fields[7], fields[8])
866 for ph := 0; ph < 2; ph++ {
867 for row := 0; row < rows; row++ {
868 v := chunk[row*chunkcols+incol*2+ph]
869 if int(v) == tileVariant {
870 hgvsColPair[ph][row] = 1
876 startcol += chunkcols
887 err = writeNumpyInt16(fmt.Sprintf("%s/matrix.npy", *outputDir), out, rows, cols)
895 cols = len(hgvsCols) * 2
896 log.Printf("building hgvs-based matrix: %d rows x %d cols", rows, cols)
897 out = make([]int16, rows*cols)
898 hgvsIDs := make([]string, 0, cols/2)
899 for hgvsID := range hgvsCols {
900 hgvsIDs = append(hgvsIDs, hgvsID)
902 sort.Strings(hgvsIDs)
903 var hgvsLabels bytes.Buffer
904 for idx, hgvsID := range hgvsIDs {
905 fmt.Fprintf(&hgvsLabels, "%d,%s\n", idx, hgvsID)
906 for ph := 0; ph < 2; ph++ {
907 hgvscol := hgvsCols[hgvsID][ph]
908 for row, val := range hgvscol {
909 out[row*cols+idx*2+ph] = val
913 err = writeNumpyInt16(fmt.Sprintf("%s/hgvs.npy", *outputDir), out, rows, cols)
918 fnm := fmt.Sprintf("%s/hgvs.annotations.csv", *outputDir)
919 log.Printf("writing hgvs labels: %s", fnm)
920 err = ioutil.WriteFile(fnm, hgvsLabels.Bytes(), 0777)
929 func (cmd *sliceNumpy) filterHGVScolpair(colpair [2][]int8) bool {
930 if cmd.chi2PValue >= 1 {
933 col0 := make([]bool, 0, len(cmd.chi2Cases))
934 col1 := make([]bool, 0, len(cmd.chi2Cases))
935 cases := make([]bool, 0, len(cmd.chi2Cases))
936 for i, c := range cmd.chi2Cases {
937 if colpair[0][i] < 0 {
940 col0 = append(col0, colpair[0][i] != 0)
941 col1 = append(col1, colpair[1][i] != 0)
942 cases = append(cases, c)
944 return len(cases) >= cmd.minCoverage &&
945 (pvalue(cases, col0) <= cmd.chi2PValue || pvalue(cases, col1) <= cmd.chi2PValue)
948 func writeNumpyInt32(fnm string, out []int32, rows, cols int) error {
949 output, err := os.Create(fnm)
954 bufw := bufio.NewWriterSize(output, 1<<26)
955 npw, err := gonpy.NewWriter(nopCloser{bufw})
959 log.WithFields(log.Fields{
963 "bytes": rows * cols * 4,
964 }).Infof("writing numpy: %s", fnm)
965 npw.Shape = []int{rows, cols}
971 return output.Close()
974 func writeNumpyInt16(fnm string, out []int16, rows, cols int) error {
975 output, err := os.Create(fnm)
980 bufw := bufio.NewWriterSize(output, 1<<26)
981 npw, err := gonpy.NewWriter(nopCloser{bufw})
985 log.WithFields(log.Fields{
989 "bytes": rows * cols * 2,
990 }).Infof("writing numpy: %s", fnm)
991 npw.Shape = []int{rows, cols}
997 return output.Close()
1000 func writeNumpyInt8(fnm string, out []int8, rows, cols int) error {
1001 output, err := os.Create(fnm)
1005 defer output.Close()
1006 bufw := bufio.NewWriterSize(output, 1<<26)
1007 npw, err := gonpy.NewWriter(nopCloser{bufw})
1011 log.WithFields(log.Fields{
1015 "bytes": rows * cols,
1016 }).Infof("writing numpy: %s", fnm)
1017 npw.Shape = []int{rows, cols}
1023 return output.Close()
1026 func allele2homhet(colpair [2][]int8) {
1027 a, b := colpair[0], colpair[1]
1028 for i, av := range a {
1030 if av < 0 || bv < 0 {
1033 } else if av > 0 && bv > 0 {
1036 } else if av > 0 || bv > 0 {
1040 // ref (or a different variant in same position)
1041 // (this is a no-op) a[i], b[i] = 0, 0
1046 type onehotXref struct {
1048 variant tileVariantID
1053 // Build onehot matrix (m[variant*2+isHet][genome] == 0 or 1) for all
1054 // variants of a single tile/tag#.
1056 // Return nil if no tile variant passes Χ² filter.
1057 func (cmd *sliceNumpy) tv2homhet(cgs map[string]CompactGenome, maxv tileVariantID, remap []tileVariantID, tag, chunkstarttag tagID) ([][]int8, []onehotXref) {
1059 // everyone has the most common variant
1062 tagoffset := tag - chunkstarttag
1064 for _, cg := range cgs {
1065 if cg.Variants[tagoffset*2] > 0 && cg.Variants[tagoffset*2+1] > 0 {
1069 if coverage < cmd.minCoverage {
1072 obs := make([][]bool, (maxv+1)*2) // 2 slices (hom + het) for each variant#
1073 for i := range obs {
1074 obs[i] = make([]bool, len(cmd.cgnames))
1076 for cgid, name := range cmd.cgnames {
1077 cgvars := cgs[name].Variants
1078 for v := tileVariantID(2); v <= maxv; v++ {
1079 if remap[cgvars[tagoffset*2]] == v && remap[cgvars[tagoffset*2+1]] == v {
1080 obs[v*2][cgid] = true
1081 } else if remap[cgvars[tagoffset*2]] == v || remap[cgvars[tagoffset*2+1]] == v {
1082 obs[v*2+1][cgid] = true
1087 var xref []onehotXref
1088 for homcol := 4; homcol < len(obs); homcol += 2 {
1090 pvalue(cmd.chi2Cases, obs[homcol]),
1091 pvalue(cmd.chi2Cases, obs[homcol+1]),
1093 if cmd.chi2PValue < 1 && !(p[0] < cmd.chi2PValue || p[1] < cmd.chi2PValue) {
1096 for het := 0; het < 2; het++ {
1097 onehot = append(onehot, bool2int8(obs[homcol+het]))
1098 xref = append(xref, onehotXref{
1100 variant: tileVariantID(homcol / 2),
1109 func bool2int8(in []bool) []int8 {
1110 out := make([]int8, len(in))
1111 for i, v := range in {