"bufio"
"bytes"
"encoding/gob"
+ "errors"
"flag"
"fmt"
"io"
"git.arvados.org/arvados.git/sdk/go/arvados"
"github.com/arvados/lightning/hgvs"
"github.com/kshedden/gonpy"
+ "github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
"golang.org/x/crypto/blake2b"
)
chi2PValue float64
minCoverage int
cgnames []string
+ includeVariant1 bool
+ debugTag tagID
}
func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
hgvsChunked := flags.Bool("chunked-hgvs-matrix", false, "also generate hgvs-based matrix per chromosome")
onehotSingle := flags.Bool("single-onehot", false, "generate one-hot tile-based matrix")
onehotChunked := flags.Bool("chunked-onehot", false, "generate one-hot tile-based matrix per input chunk")
+ debugTag := flags.Int("debug-tag", -1, "log debugging details about specified tag")
flags.IntVar(&cmd.threads, "threads", 16, "number of memory-hungry assembly threads")
flags.StringVar(&cmd.chi2CaseControlFile, "chi2-case-control-file", "", "tsv file or directory indicating cases and controls for Χ² test (if directory, all .tsv files will be read)")
flags.StringVar(&cmd.chi2CaseControlColumn, "chi2-case-control-column", "", "name of case/control column in case-control files for Χ² test (value must be 0 for control, 1 for case)")
flags.Float64Var(&cmd.chi2PValue, "chi2-p-value", 1, "do Χ² test and omit columns with p-value above this threshold")
+ flags.BoolVar(&cmd.includeVariant1, "include-variant-1", false, "include most common variant when building one-hot matrix")
cmd.filter.Flags(flags)
err = flags.Parse(args)
if err == flag.ErrHelp {
return 2
}
+ cmd.debugTag = tagID(*debugTag)
+
if !*runlocal {
runner := arvadosContainerRunner{
Name: "lightning slice-numpy",
"-chi2-case-control-file=" + cmd.chi2CaseControlFile,
"-chi2-case-control-column=" + cmd.chi2CaseControlColumn,
"-chi2-p-value=" + fmt.Sprintf("%f", cmd.chi2PValue),
+ "-include-variant-1=" + fmt.Sprintf("%v", cmd.includeVariant1),
+ "-debug-tag=" + fmt.Sprintf("%d", cmd.debugTag),
}
runner.Args = append(runner.Args, cmd.filter.Args()...)
var output string
for seqname, cseq := range refseq {
pos := 0
for _, libref := range cseq {
+ if cmd.filter.MaxTag >= 0 && libref.Tag > tagID(cmd.filter.MaxTag) {
+ continue
+ }
tiledata := reftiledata[libref]
if len(tiledata) == 0 {
err = fmt.Errorf("missing tiledata for tag %d variant %d in %s in ref", libref.Tag, libref.Variant, seqname)
return
}
if dupref, ok := reftile[tagid]; ok {
- log.Printf("dropping reference tile %+v, tag not unique, also found inside %+v from %s @ %d", dupref, libref, seqname, pos+offset+1)
+ log.Printf("dropping reference tile %+v from %s @ %d, tag not unique, also found inside %+v from %s @ %d", tileLibRef{Tag: tagid, Variant: dupref.variant}, dupref.seqname, dupref.pos, libref, seqname, pos+offset+1)
delete(reftile, tagid)
} else {
log.Printf("found tag %d at offset %d inside tile variant %+v on %s @ %d", tagid, offset, libref, seqname, pos+offset+1)
toMerge = make([][]int16, len(infiles))
}
var onehotIndirect [][2][]uint32 // [chunkIndex][axis][index]
+ var onehotChunkSize []uint32
var onehotXrefs [][]onehotXref
if *onehotSingle {
onehotIndirect = make([][2][]uint32, len(infiles))
+ onehotChunkSize = make([]uint32, len(infiles))
onehotXrefs = make([][]onehotXref, len(infiles))
}
throttleMem := throttle{Max: cmd.threads} // TODO: estimate using mem and data size
throttleNumpyMem := throttle{Max: cmd.threads/2 + 1}
log.Info("generating annotations and numpy matrix for each slice")
+ var errSkip = errors.New("skip infile")
var done int64
for infileIdx, infile := range infiles {
infileIdx, infile := infileIdx, infile
// masked-out tiles.
continue
}
+ if tv.Tag == cmd.debugTag {
+ log.Printf("infile %d %s tag %d variant %d hash %x", infileIdx, infile, tv.Tag, tv.Variant, tv.Blake2b[:3])
+ }
variants := seq[tv.Tag]
if len(variants) == 0 {
variants = make([]TileVariant, 100)
seq[tv.Tag] = variants
}
for _, cg := range ent.CompactGenomes {
+ if cmd.filter.MaxTag >= 0 && cg.StartTag > tagID(cmd.filter.MaxTag) {
+ return errSkip
+ }
if !matchGenome.MatchString(cg.Name) {
continue
}
}
return nil
})
- if err != nil {
+ if err == errSkip {
+ return nil
+ } else if err != nil {
return err
}
tagstart := cgs[cmd.cgnames[0]].StartTag
count[blake2b.Sum256(rt.tiledata)] = 0
}
- for _, cg := range cgs {
+ for cgname, cg := range cgs {
idx := int(tag-tagstart) * 2
for allele := 0; allele < 2; allele++ {
v := cg.Variants[idx+allele]
if v > 0 && len(variants[v].Sequence) > 0 {
count[variants[v].Blake2b]++
+ if tag == cmd.debugTag {
+ log.Printf("tag %d cg %s allele %d tv %d hash %x count is now %d", tag, cgname, allele, v, variants[v].Blake2b[:3], count[variants[v].Blake2b])
+ }
}
}
}
for i, h := range hash {
rank[h] = tileVariantID(i + 1)
}
+ if tag == cmd.debugTag {
+ for h, r := range rank {
+ log.Printf("tag %d rank(%x) = %v", tag, h[:3], r)
+ }
+ }
// remap[v] will be the new
// variant number for original
// variant number v.
for i, tv := range variants {
remap[i] = rank[tv.Blake2b]
}
+ if tag == cmd.debugTag {
+ log.Printf("tag %d remap %+v", tag, remap)
+ }
variantRemap[tag-tagstart] = remap
if rt != nil {
- rt.variant = rank[blake2b.Sum256(rt.tiledata)]
+ refrank := rank[blake2b.Sum256(rt.tiledata)]
+ if tag == cmd.debugTag {
+ log.Printf("tag %d reftile variant %d => %d", tag, rt.variant, refrank)
+ }
+ rt.variant = refrank
}
}()
}
annow := bufio.NewWriterSize(annof, 1<<20)
outcol := 0
for tag := tagstart; tag < tagend; tag++ {
- rt, ok := reftile[tag]
- if !ok {
- if mask == nil {
- outcol++
- }
- // Excluded by specified
- // regions, or reference does
- // not use any variant of this
- // tile. (TODO: log this?
- // mention it in annotations?)
+ rt := reftile[tag]
+ if rt == nil && mask != nil {
+ // Excluded by specified regions
+ continue
+ }
+ if cmd.filter.MaxTag >= 0 && tag > tagID(cmd.filter.MaxTag) {
continue
}
remap := variantRemap[tag-tagstart]
}
}
if *onehotChunked || *onehotSingle {
+ if tag == cmd.debugTag {
+ log.WithFields(logrus.Fields{
+ "cgs[2].Variants[tag*2:(tag+1)*2]": cgs[cmd.cgnames[2]].Variants[(tag-tagstart)*2 : (tag-tagstart+1)*2],
+ }).Info("before tv2homhet")
+ }
onehot, xrefs := cmd.tv2homhet(cgs, maxv, remap, tag, tagstart)
+ if tag == cmd.debugTag {
+ log.WithFields(logrus.Fields{
+ "onehot": onehot,
+ "xrefs": xrefs,
+ }).Info("tv2homhet()")
+ }
onehotChunk = append(onehotChunk, onehot...)
onehotXref = append(onehotXref, xrefs...)
}
+ if rt == nil {
+ // Reference does not use any
+ // variant of this tile
+ outcol++
+ continue
+ }
fmt.Fprintf(annow, "%d,%d,%d,=,%s,%d,,,\n", tag, outcol, rt.variant, rt.seqname, rt.pos)
variants := seq[tag]
reftilestr := strings.ToUpper(string(rt.tiledata))
// transpose onehotChunk[col][row] to numpy[row*ncols+col]
rows := len(cmd.cgnames)
cols := len(onehotChunk)
- log.Infof("%04d: preparing onehot numpy (rows=%d, cols=%d, mem=%d)", infileIdx, len(cmd.cgnames), len(onehotChunk), len(cmd.cgnames)*len(onehotChunk))
+ log.Infof("%04d: preparing onehot numpy (rows=%d, cols=%d, mem=%d)", infileIdx, rows, cols, rows*cols)
throttleNumpyMem.Acquire()
out := onehotcols2int8(onehotChunk)
fnm := fmt.Sprintf("%s/onehot.%04d.npy", *outputDir, infileIdx)
}
if *onehotSingle {
onehotIndirect[infileIdx] = onehotChunk2Indirect(onehotChunk)
+ onehotChunkSize[infileIdx] = uint32(len(onehotChunk))
onehotXrefs[infileIdx] = onehotXref
n := len(onehotIndirect[infileIdx][0])
- log.Infof("%04d: keeping onehot coordinates in memory (n=%d, mem=%d)", infileIdx, n, n*8)
+ log.Infof("%04d: keeping onehot coordinates in memory (n=%d, mem=%d)", infileIdx, n, n*8*2)
}
if !(*onehotSingle || *onehotChunked) || *mergeOutput || *hgvsSingle {
- log.Infof("%04d: preparing numpy", infileIdx)
+ log.Infof("%04d: preparing numpy (rows=%d, cols=%d)", infileIdx, len(cmd.cgnames), 2*outcol)
throttleNumpyMem.Acquire()
rows := len(cmd.cgnames)
cols := 2 * outcol
outcol := 0
for col, v := range cgs[name].Variants {
tag := tagstart + tagID(col/2)
- if mask != nil && reftile[tag] == nil {
+ if mask != nil && reftile[tag] == nil || (cmd.filter.MaxTag >= 0 && tag > tagID(cmd.filter.MaxTag)) {
continue
}
if variants, ok := seq[tag]; ok && len(variants) > int(v) && len(variants[v].Sequence) > 0 {
}
onehot := make([]uint32, nzCount*2) // [r,r,r,...,c,c,c,...]
var xrefs []onehotXref
+ chunkOffset := uint32(0)
outcol := 0
for i, part := range onehotIndirect {
for i := range part[1] {
- part[1][i] += uint32(outcol)
+ part[1][i] += chunkOffset
}
copy(onehot[outcol:], part[0])
copy(onehot[outcol+nzCount:], part[1])
- outcol += len(part[0])
xrefs = append(xrefs, onehotXrefs[i]...)
+ outcol += len(part[0])
+ chunkOffset += onehotChunkSize[i]
+
part[0] = nil
part[1] = nil
onehotXrefs[i] = nil
const onehotXrefSize = unsafe.Sizeof(onehotXref{})
-// Build onehot matrix (m[variant*2+isHet][genome] == 0 or 1) for all
+// Build onehot matrix (m[tileVariantIndex][genome] == 0 or 1) for all
// variants of a single tile/tag#.
//
// Return nil if no tile variant passes Χ² filter.
func (cmd *sliceNumpy) tv2homhet(cgs map[string]CompactGenome, maxv tileVariantID, remap []tileVariantID, tag, chunkstarttag tagID) ([][]int8, []onehotXref) {
- if maxv < 2 {
- // everyone has the most common variant
+ if tag == cmd.debugTag {
+ tv := make([]tileVariantID, len(cmd.cgnames)*2)
+ for i, name := range cmd.cgnames {
+ copy(tv[i*2:(i+1)*2], cgs[name].Variants[(tag-chunkstarttag)*2:])
+ }
+ log.WithFields(logrus.Fields{
+ "cgs[i].Variants[tag*2+j]": tv,
+ "maxv": maxv,
+ "remap": remap,
+ "tag": tag,
+ "chunkstarttag": chunkstarttag,
+ }).Info("tv2homhet()")
+ }
+ if maxv < 1 || (maxv < 2 && !cmd.includeVariant1) {
+ // everyone has the most common variant (of the variants we don't drop)
return nil, nil
}
tagoffset := tag - chunkstarttag
obs[i] = make([]bool, len(cmd.cgnames))
}
for cgid, name := range cmd.cgnames {
- cgvars := cgs[name].Variants
- for v := tileVariantID(2); v <= maxv; v++ {
- if remap[cgvars[tagoffset*2]] == v && remap[cgvars[tagoffset*2+1]] == v {
+ cgvars := cgs[name].Variants[tagoffset*2:]
+ tv0, tv1 := remap[cgvars[0]], remap[cgvars[1]]
+ for v := tileVariantID(1); v <= maxv; v++ {
+ if tv0 == v && tv1 == v {
obs[v*2][cgid] = true
- } else if remap[cgvars[tagoffset*2]] == v || remap[cgvars[tagoffset*2+1]] == v {
+ } else if tv0 == v || tv1 == v {
obs[v*2+1][cgid] = true
}
}
}
var onehot [][]int8
var xref []onehotXref
- for homcol := 4; homcol < len(obs); homcol += 2 {
- p := [2]float64{
- pvalue(obs[homcol], cmd.chi2Cases),
- pvalue(obs[homcol+1], cmd.chi2Cases),
- }
- if cmd.chi2PValue < 1 && !(p[0] < cmd.chi2PValue || p[1] < cmd.chi2PValue) {
+ for col := 2; col < len(obs); col++ {
+ // col 0,1 correspond to tile variant 0, i.e.,
+ // no-call; col 2,3 correspond to the most common
+ // variant; so we (normally) start at col 4.
+ if col < 4 && !cmd.includeVariant1 {
continue
}
- for het := 0; het < 2; het++ {
- onehot = append(onehot, bool2int8(obs[homcol+het]))
- xref = append(xref, onehotXref{
- tag: tag,
- variant: tileVariantID(homcol / 2),
- het: het == 1,
- pvalue: p[het],
- })
+ p := pvalue(obs[col], cmd.chi2Cases)
+ if cmd.chi2PValue < 1 && !(p < cmd.chi2PValue) {
+ continue
}
+ onehot = append(onehot, bool2int8(obs[col]))
+ xref = append(xref, onehotXref{
+ tag: tag,
+ variant: tileVariantID(col >> 1),
+ het: col&1 == 1,
+ pvalue: p,
+ })
}
return onehot, xref
}