"bufio"
"bytes"
"encoding/gob"
+ "errors"
"flag"
"fmt"
"io"
"git.arvados.org/arvados.git/sdk/go/arvados"
"github.com/arvados/lightning/hgvs"
"github.com/kshedden/gonpy"
+ "github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
"golang.org/x/crypto/blake2b"
)
chi2PValue float64
minCoverage int
cgnames []string
+ includeVariant1 bool
+ debugTag tagID
}
func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
hgvsChunked := flags.Bool("chunked-hgvs-matrix", false, "also generate hgvs-based matrix per chromosome")
onehotSingle := flags.Bool("single-onehot", false, "generate one-hot tile-based matrix")
onehotChunked := flags.Bool("chunked-onehot", false, "generate one-hot tile-based matrix per input chunk")
+ debugTag := flags.Int("debug-tag", -1, "log debugging details about specified tag")
flags.IntVar(&cmd.threads, "threads", 16, "number of memory-hungry assembly threads")
flags.StringVar(&cmd.chi2CaseControlFile, "chi2-case-control-file", "", "tsv file or directory indicating cases and controls for Χ² test (if directory, all .tsv files will be read)")
flags.StringVar(&cmd.chi2CaseControlColumn, "chi2-case-control-column", "", "name of case/control column in case-control files for Χ² test (value must be 0 for control, 1 for case)")
flags.Float64Var(&cmd.chi2PValue, "chi2-p-value", 1, "do Χ² test and omit columns with p-value above this threshold")
+ flags.BoolVar(&cmd.includeVariant1, "include-variant-1", false, "include most common variant when building one-hot matrix")
cmd.filter.Flags(flags)
err = flags.Parse(args)
if err == flag.ErrHelp {
return 2
}
+ cmd.debugTag = tagID(*debugTag)
+
if !*runlocal {
runner := arvadosContainerRunner{
Name: "lightning slice-numpy",
"-chi2-case-control-file=" + cmd.chi2CaseControlFile,
"-chi2-case-control-column=" + cmd.chi2CaseControlColumn,
"-chi2-p-value=" + fmt.Sprintf("%f", cmd.chi2PValue),
+ "-include-variant-1=" + fmt.Sprintf("%v", cmd.includeVariant1),
+ "-debug-tag=" + fmt.Sprintf("%d", cmd.debugTag),
}
runner.Args = append(runner.Args, cmd.filter.Args()...)
var output string
cmd.cgnames = nil
var tagset [][]byte
- DecodeLibrary(in0, strings.HasSuffix(infiles[0], ".gz"), func(ent *LibraryEntry) error {
+ err = DecodeLibrary(in0, strings.HasSuffix(infiles[0], ".gz"), func(ent *LibraryEntry) error {
if len(ent.TagSet) > 0 {
tagset = ent.TagSet
}
for seqname, cseq := range refseq {
pos := 0
for _, libref := range cseq {
- if libref.Tag > tagID(cmd.filter.MaxTag) {
+ if cmd.filter.MaxTag >= 0 && libref.Tag > tagID(cmd.filter.MaxTag) {
continue
}
tiledata := reftiledata[libref]
toMerge = make([][]int16, len(infiles))
}
var onehotIndirect [][2][]uint32 // [chunkIndex][axis][index]
+ var onehotChunkSize []uint32
var onehotXrefs [][]onehotXref
if *onehotSingle {
onehotIndirect = make([][2][]uint32, len(infiles))
+ onehotChunkSize = make([]uint32, len(infiles))
onehotXrefs = make([][]onehotXref, len(infiles))
}
+ chunkStartTag := make([]tagID, len(infiles))
throttleMem := throttle{Max: cmd.threads} // TODO: estimate using mem and data size
throttleNumpyMem := throttle{Max: cmd.threads/2 + 1}
log.Info("generating annotations and numpy matrix for each slice")
+ var errSkip = errors.New("skip infile")
var done int64
for infileIdx, infile := range infiles {
infileIdx, infile := infileIdx, infile
// masked-out tiles.
continue
}
+ if tv.Tag == cmd.debugTag {
+ log.Printf("infile %d %s tag %d variant %d hash %x", infileIdx, infile, tv.Tag, tv.Variant, tv.Blake2b[:3])
+ }
variants := seq[tv.Tag]
if len(variants) == 0 {
variants = make([]TileVariant, 100)
seq[tv.Tag] = variants
}
for _, cg := range ent.CompactGenomes {
+ if cmd.filter.MaxTag >= 0 && cg.StartTag > tagID(cmd.filter.MaxTag) {
+ return errSkip
+ }
if !matchGenome.MatchString(cg.Name) {
continue
}
}
return nil
})
- if err != nil {
+ if err == errSkip {
+ return nil
+ } else if err != nil {
return err
}
tagstart := cgs[cmd.cgnames[0]].StartTag
tagend := cgs[cmd.cgnames[0]].EndTag
+ chunkStartTag[infileIdx] = tagstart
// TODO: filters
throttleCPU := throttle{Max: runtime.GOMAXPROCS(0)}
for tag, variants := range seq {
tag, variants := tag, variants
- throttleCPU.Acquire()
- go func() {
- defer throttleCPU.Release()
+ throttleCPU.Go(func() error {
count := make(map[[blake2b.Size256]byte]int, len(variants))
rt := reftile[tag]
count[blake2b.Sum256(rt.tiledata)] = 0
}
- for _, cg := range cgs {
+ for cgname, cg := range cgs {
idx := int(tag-tagstart) * 2
for allele := 0; allele < 2; allele++ {
v := cg.Variants[idx+allele]
if v > 0 && len(variants[v].Sequence) > 0 {
count[variants[v].Blake2b]++
}
+ if v > 0 && tag == cmd.debugTag {
+ log.Printf("tag %d cg %s allele %d tv %d hash %x count is now %d", tag, cgname, allele, v, variants[v].Blake2b[:3], count[variants[v].Blake2b])
+ }
}
}
// hash[i] will be the hash of
for i, h := range hash {
rank[h] = tileVariantID(i + 1)
}
+ if tag == cmd.debugTag {
+ for h, r := range rank {
+ log.Printf("tag %d rank(%x) = %v", tag, h[:3], r)
+ }
+ }
// remap[v] will be the new
// variant number for original
// variant number v.
for i, tv := range variants {
remap[i] = rank[tv.Blake2b]
}
+ if tag == cmd.debugTag {
+ for in, out := range remap {
+ if out > 0 {
+ log.Printf("tag %d remap %d => %d", tag, in, out)
+ }
+ }
+ }
variantRemap[tag-tagstart] = remap
if rt != nil {
- rt.variant = rank[blake2b.Sum256(rt.tiledata)]
+ refrank := rank[blake2b.Sum256(rt.tiledata)]
+ if tag == cmd.debugTag {
+ log.Printf("tag %d reftile variant %d => %d", tag, rt.variant, refrank)
+ }
+ rt.variant = refrank
}
- }()
+ return nil
+ })
}
throttleCPU.Wait()
// Excluded by specified regions
continue
}
- if tag > tagID(cmd.filter.MaxTag) {
- continue
+ if cmd.filter.MaxTag >= 0 && tag > tagID(cmd.filter.MaxTag) {
+ break
}
remap := variantRemap[tag-tagstart]
maxv := tileVariantID(0)
}
if *onehotChunked || *onehotSingle {
onehot, xrefs := cmd.tv2homhet(cgs, maxv, remap, tag, tagstart)
+ if tag == cmd.debugTag {
+ log.WithFields(logrus.Fields{
+ "onehot": onehot,
+ "xrefs": xrefs,
+ }).Info("tv2homhet()")
+ }
onehotChunk = append(onehotChunk, onehot...)
onehotXref = append(onehotXref, xrefs...)
}
// transpose onehotChunk[col][row] to numpy[row*ncols+col]
rows := len(cmd.cgnames)
cols := len(onehotChunk)
- log.Infof("%04d: preparing onehot numpy (rows=%d, cols=%d, mem=%d)", infileIdx, len(cmd.cgnames), len(onehotChunk), len(cmd.cgnames)*len(onehotChunk))
+ log.Infof("%04d: preparing onehot numpy (rows=%d, cols=%d, mem=%d)", infileIdx, rows, cols, rows*cols)
throttleNumpyMem.Acquire()
out := onehotcols2int8(onehotChunk)
fnm := fmt.Sprintf("%s/onehot.%04d.npy", *outputDir, infileIdx)
}
if *onehotSingle {
onehotIndirect[infileIdx] = onehotChunk2Indirect(onehotChunk)
+ onehotChunkSize[infileIdx] = uint32(len(onehotChunk))
onehotXrefs[infileIdx] = onehotXref
n := len(onehotIndirect[infileIdx][0])
- log.Infof("%04d: keeping onehot coordinates in memory (n=%d, mem=%d)", infileIdx, n, n*8)
+ log.Infof("%04d: keeping onehot coordinates in memory (n=%d, mem=%d)", infileIdx, n, n*8*2)
}
if !(*onehotSingle || *onehotChunked) || *mergeOutput || *hgvsSingle {
- log.Infof("%04d: preparing numpy", infileIdx)
+ log.Infof("%04d: preparing numpy (rows=%d, cols=%d)", infileIdx, len(cmd.cgnames), 2*outcol)
throttleNumpyMem.Acquire()
rows := len(cmd.cgnames)
cols := 2 * outcol
out := make([]int16, rows*cols)
for row, name := range cmd.cgnames {
- out := out[row*cols:]
- outcol := 0
+ outidx := row * cols
for col, v := range cgs[name].Variants {
tag := tagstart + tagID(col/2)
+ if cmd.filter.MaxTag >= 0 && tag > tagID(cmd.filter.MaxTag) {
+ break
+ }
if mask != nil && reftile[tag] == nil {
continue
}
- if variants, ok := seq[tag]; ok && len(variants) > int(v) && len(variants[v].Sequence) > 0 {
- out[outcol] = int16(variantRemap[tag-tagstart][v])
+ if v == 0 {
+ out[outidx] = 0 // tag not found / spanning tile
+ } else if variants, ok := seq[tag]; ok && int(v) < len(variants) && len(variants[v].Sequence) > 0 {
+ out[outidx] = int16(variantRemap[tag-tagstart][v])
} else {
- out[outcol] = -1
+ out[outidx] = -1 // low quality tile variant
+ }
+ if tag == cmd.debugTag {
+ log.Printf("tag %d row %d col %d outidx %d v %d out %d", tag, row, col, outidx, v, out[outidx])
}
- outcol++
+ outidx++
}
}
seq = nil
}
onehot := make([]uint32, nzCount*2) // [r,r,r,...,c,c,c,...]
var xrefs []onehotXref
+ chunkOffset := uint32(0)
outcol := 0
for i, part := range onehotIndirect {
for i := range part[1] {
- part[1][i] += uint32(outcol)
+ part[1][i] += chunkOffset
}
copy(onehot[outcol:], part[0])
copy(onehot[outcol+nzCount:], part[1])
- outcol += len(part[0])
xrefs = append(xrefs, onehotXrefs[i]...)
+ outcol += len(part[0])
+ chunkOffset += onehotChunkSize[i]
+
part[0] = nil
part[1] = nil
onehotXrefs[i] = nil
return 1
}
fnm = fmt.Sprintf("%s/onehot-columns.npy", *outputDir)
- err = writeNumpyInt32(fnm, onehotXref2int32(xrefs), 4, len(xrefs))
+ err = writeNumpyInt32(fnm, onehotXref2int32(xrefs), 5, len(xrefs))
if err != nil {
return 1
}
}
+ if !*mergeOutput && !*onehotChunked && !*onehotSingle {
+ tagoffsetFilename := *outputDir + "/chunk-tag-offset.csv"
+ log.Infof("writing tag offsets to %s", tagoffsetFilename)
+ var f *os.File
+ f, err = os.Create(tagoffsetFilename)
+ if err != nil {
+ return 1
+ }
+ defer f.Close()
+ for idx, offset := range chunkStartTag {
+ _, err = fmt.Fprintf(f, "%q,%d\n", fmt.Sprintf("matrix.%04d.npy", idx), offset)
+ if err != nil {
+ err = fmt.Errorf("write %s: %w", tagoffsetFilename, err)
+ return 1
+ }
+ }
+ err = f.Close()
+ if err != nil {
+ err = fmt.Errorf("close %s: %w", tagoffsetFilename, err)
+ return 1
+ }
+ }
return 0
}
type onehotXref struct {
tag tagID
variant tileVariantID
- het bool
+ hom bool
pvalue float64
}
const onehotXrefSize = unsafe.Sizeof(onehotXref{})
-// Build onehot matrix (m[variant*2+isHet][genome] == 0 or 1) for all
+// Build onehot matrix (m[tileVariantIndex][genome] == 0 or 1) for all
// variants of a single tile/tag#.
//
// Return nil if no tile variant passes Χ² filter.
func (cmd *sliceNumpy) tv2homhet(cgs map[string]CompactGenome, maxv tileVariantID, remap []tileVariantID, tag, chunkstarttag tagID) ([][]int8, []onehotXref) {
- if maxv < 2 {
- // everyone has the most common variant
+ if tag == cmd.debugTag {
+ tv := make([]tileVariantID, len(cmd.cgnames)*2)
+ for i, name := range cmd.cgnames {
+ copy(tv[i*2:(i+1)*2], cgs[name].Variants[(tag-chunkstarttag)*2:])
+ }
+ log.WithFields(logrus.Fields{
+ "cgs[i].Variants[tag*2+j]": tv,
+ "maxv": maxv,
+ "remap": remap,
+ "tag": tag,
+ "chunkstarttag": chunkstarttag,
+ }).Info("tv2homhet()")
+ }
+ if maxv < 1 || (maxv < 2 && !cmd.includeVariant1) {
+ // everyone has the most common variant (of the variants we don't drop)
return nil, nil
}
tagoffset := tag - chunkstarttag
obs[i] = make([]bool, len(cmd.cgnames))
}
for cgid, name := range cmd.cgnames {
- cgvars := cgs[name].Variants
- for v := tileVariantID(2); v <= maxv; v++ {
- if remap[cgvars[tagoffset*2]] == v && remap[cgvars[tagoffset*2+1]] == v {
+ cgvars := cgs[name].Variants[tagoffset*2:]
+ tv0, tv1 := remap[cgvars[0]], remap[cgvars[1]]
+ for v := tileVariantID(1); v <= maxv; v++ {
+ if tv0 == v && tv1 == v {
obs[v*2][cgid] = true
- } else if remap[cgvars[tagoffset*2]] == v || remap[cgvars[tagoffset*2+1]] == v {
+ } else if tv0 == v || tv1 == v {
obs[v*2+1][cgid] = true
}
}
}
var onehot [][]int8
var xref []onehotXref
- for homcol := 4; homcol < len(obs); homcol += 2 {
- for het := 0; het < 2; het++ {
- p := pvalue(obs[homcol+het], cmd.chi2Cases)
- if cmd.chi2PValue < 1 && !(p < cmd.chi2PValue) {
- continue
- }
- onehot = append(onehot, bool2int8(obs[homcol+het]))
- xref = append(xref, onehotXref{
- tag: tag,
- variant: tileVariantID(homcol / 2),
- het: het == 1,
- pvalue: p,
- })
+ for col := 2; col < len(obs); col++ {
+ // col 0,1 correspond to tile variant 0, i.e.,
+ // no-call; col 2,3 correspond to the most common
+ // variant; so we (normally) start at col 4.
+ if col < 4 && !cmd.includeVariant1 {
+ continue
}
+ p := pvalue(obs[col], cmd.chi2Cases)
+ if cmd.chi2PValue < 1 && !(p < cmd.chi2PValue) {
+ continue
+ }
+ onehot = append(onehot, bool2int8(obs[col]))
+ xref = append(xref, onehotXref{
+ tag: tag,
+ variant: tileVariantID(col >> 1),
+ hom: col&1 == 0,
+ pvalue: p,
+ })
}
return onehot, xref
}
// P-value row contains 1000000x actual p-value.
func onehotXref2int32(xrefs []onehotXref) []int32 {
xcols := len(xrefs)
- xdata := make([]int32, 4*xcols)
+ xdata := make([]int32, 5*xcols)
for i, xref := range xrefs {
xdata[i] = int32(xref.tag)
xdata[xcols+i] = int32(xref.variant)
- if xref.het {
+ if xref.hom {
xdata[xcols*2+i] = 1
}
xdata[xcols*3+i] = int32(xref.pvalue * 1000000)
+ xdata[xcols*4+i] = int32(-math.Log10(xref.pvalue) * 1000000)
}
return xdata
}