import (
"bufio"
+ "context"
"errors"
"flag"
"fmt"
}
bufw := bufio.NewWriter(output)
- err = cmd.exportTileDiffs(bufw, input)
+ tilelib := &tileLibrary{
+ includeNoCalls: true,
+ retainTileSequences: true,
+ }
+ err = tilelib.LoadGob(context.Background(), input, nil)
+ if err != nil {
+ return 1
+ }
+ err = cmd.exportTileDiffs(bufw, tilelib)
if err != nil {
return 1
}
return 0
}
-func (cmd *annotatecmd) exportTileDiffs(outw io.Writer, librdr io.Reader) error {
- var refs []CompactSequence
- var tiles [][]TileVariant
- var tagset [][]byte
- var taglen int
- err := DecodeLibrary(librdr, func(ent *LibraryEntry) error {
- if len(ent.TagSet) > 0 {
- if tagset != nil {
- return errors.New("error: not implemented: input has multiple tagsets")
- }
- tagset = ent.TagSet
- taglen = len(tagset[0])
- tiles = make([][]TileVariant, len(tagset))
- }
- for _, tv := range ent.TileVariants {
- if tv.Tag >= tagID(len(tiles)) {
- return fmt.Errorf("error: reading tile variant for tag %d but only %d tags were loaded", tv.Tag, len(tiles))
- }
- for len(tiles[tv.Tag]) <= int(tv.Variant) {
- tiles[tv.Tag] = append(tiles[tv.Tag], TileVariant{})
- }
- tiles[tv.Tag][tv.Variant] = tv
- }
- for _, cs := range ent.CompactSequences {
- refs = append(refs, cs)
- }
- return nil
- })
- if err != nil {
- return err
+func (cmd *annotatecmd) exportTileDiffs(outw io.Writer, tilelib *tileLibrary) error {
+ tagset := tilelib.taglib.Tags()
+ if len(tagset) == 0 {
+ return errors.New("cannot annotate library without tags")
+ }
+ taglen := len(tagset[0])
+ var refs []string
+ for name := range tilelib.refseqs {
+ refs = append(refs, name)
}
tag2tagid := make(map[string]tagID, len(tagset))
for tagid, tagseq := range tagset {
tag2tagid[string(tagseq)] = tagID(tagid)
}
- sort.Slice(refs, func(i, j int) bool { return refs[i].Name < refs[j].Name })
+ sort.Strings(refs)
log.Infof("len(refs) %d", len(refs))
outch := make(chan string, 1)
var diffwg sync.WaitGroup
defer diffwg.Wait()
- for _, refcs := range refs {
- refcs := refcs
+ for _, refname := range refs {
+ refname := refname
+ refcs := tilelib.refseqs[refname]
var seqnames []string
- for seqname := range refcs.TileSequences {
+ for seqname := range refcs {
seqnames = append(seqnames, seqname)
}
sort.Strings(seqnames)
// where the tile for tag 123 was placed.
tilestart := map[tagID]int{}
tileend := map[tagID]int{}
- for _, libref := range refcs.TileSequences[seqname] {
+ for _, libref := range refcs[seqname] {
if libref.Variant < 1 {
- return fmt.Errorf("reference %q seq %q uses variant zero at tag %d", refcs.Name, seqname, libref.Tag)
+ return fmt.Errorf("reference %q seq %q uses variant zero at tag %d", refname, seqname, libref.Tag)
}
- seq := tiles[libref.Tag][libref.Variant].Sequence
+ seq := tilelib.TileVariantSequence(libref)
if len(seq) < taglen {
- return fmt.Errorf("reference %q seq %q uses tile %d variant %d with sequence len %d < taglen %d", refcs.Name, seqname, libref.Tag, libref.Variant, len(seq), taglen)
+ return fmt.Errorf("reference %q seq %q uses tile %d variant %d with sequence len %d < taglen %d", refname, seqname, libref.Tag, libref.Variant, len(seq), taglen)
}
overlap := taglen
if len(refseq) == 0 {
tileend[libref.Tag] = len(refseq)
}
log.Infof("seq %s len(refseq) %d len(tilestart) %d", seqname, len(refseq), len(tilestart))
- for tag, tvs := range tiles {
+ for tag, tvs := range tilelib.variant {
tag := tagID(tag)
refstart, ok := tilestart[tag]
if !ok {
// result.)
continue
}
- for variant, tv := range tvs {
- variant, tv := variant, tv
- if variant == 0 {
- continue
- }
- if len(tv.Sequence) < taglen {
- return fmt.Errorf("tilevar %d,%d has sequence len %d < taglen %d", tag, variant, len(tv.Sequence), taglen)
+ for variant := 1; variant <= len(tvs); variant++ {
+ variant, hash := tileVariantID(variant), tvs[variant-1]
+ tileseq := tilelib.TileVariantSequence(tileLibRef{Tag: tag, Variant: variant})
+ if len(tileseq) < taglen {
+ return fmt.Errorf("tilevar %d,%d has sequence len %d < taglen %d", tag, variant, len(tileseq), taglen)
}
var refpart []byte
- endtag := string(tv.Sequence[len(tv.Sequence)-taglen:])
+ endtag := string(tileseq[len(tileseq)-taglen:])
if endtagid, ok := tag2tagid[endtag]; !ok {
// Tile variant doesn't end on a tag, so it can only place at the end of a chromosome.
refpart = refseq[refstart:]
- log.Warnf("%x tilevar %d,%d endtag not in ref: %s", tv.Blake2b[:13], tag, variant, endtag)
+ log.Warnf("%x tilevar %d,%d endtag not in ref: %s", hash[:13], tag, variant, endtag)
} else if refendtagstart, ok := tilestart[endtagid]; !ok {
// Ref ends a chromsome with a (possibly very large) variant of this tile, but genomes with this tile don't.
// Give up. (TODO: something smarter)
- log.Debugf("%x not annotating tilevar %d,%d because end tag %d is not in ref", tv.Blake2b[:13], tag, variant, endtagid)
+ log.Debugf("%x not annotating tilevar %d,%d because end tag %d is not in ref", hash[:13], tag, variant, endtagid)
continue
} else {
// Non-terminal tile vs. non-terminal reference.
refpart = refseq[refstart : refendtagstart+taglen]
- log.Tracef("\n%x tilevar %d,%d endtag %s endtagid %d refendtagstart %d", tv.Blake2b[:13], tag, variant, endtag, endtagid, refendtagstart)
+ log.Tracef("\n%x tilevar %d,%d endtag %s endtagid %d refendtagstart %d", hash[:13], tag, variant, endtag, endtagid, refendtagstart)
}
if len(refpart) > cmd.maxTileSize {
- log.Warnf("%x tilevar %d,%d skipping long diff, ref %s seq %s pos %d ref len %d", tv.Blake2b[:13], tag, variant, refcs.Name, seqname, refstart, len(refpart))
+ log.Warnf("%x tilevar %d,%d skipping long diff, ref %s seq %s pos %d ref len %d", hash[:13], tag, variant, refname, seqname, refstart, len(refpart))
continue
}
- if len(tv.Sequence) > cmd.maxTileSize {
- log.Warnf("%x tilevar %d,%d skipping long diff, ref %s seq %s variant len %d", tv.Blake2b[:13], tag, variant, refcs.Name, seqname, len(tv.Sequence))
+ if len(tileseq) > cmd.maxTileSize {
+ log.Warnf("%x tilevar %d,%d skipping long diff, ref %s seq %s variant len %d", hash[:13], tag, variant, refname, seqname, len(tileseq))
continue
}
- // log.Printf("\n%x @ refstart %d \n< %s\n> %s\n", tv.Blake2b, refstart, refpart, tv.Sequence)
+ // log.Printf("\n%x @ refstart %d \n< %s\n> %s\n", tv.Blake2b, refstart, refpart, tileseq)
diffwg.Add(1)
limiter <- true
<-limiter
diffwg.Done()
}()
- diffs, _ := hgvs.Diff(strings.ToUpper(string(refpart)), strings.ToUpper(string(tv.Sequence)), 0)
+ diffs, _ := hgvs.Diff(strings.ToUpper(string(refpart)), strings.ToUpper(string(tileseq)), 0)
for _, diff := range diffs {
diff.Position += refstart
var varid string
if cmd.variantHash {
- varid = fmt.Sprintf("%x", tv.Blake2b)[:13]
+ varid = fmt.Sprintf("%x", hash)[:13]
} else {
- varid = strconv.Itoa(variant)
+ varid = fmt.Sprintf("%d", variant)
}
- outch <- fmt.Sprintf("%d\t%s\t%s\t%s:g.%s\n", tag, varid, refcs.Name, seqname, diff.String())
+ outch <- fmt.Sprintf("%d\t%s\t%s\t%s:g.%s\n", tag, varid, refname, seqname, diff.String())
}
}()
}
"annotate": &annotatecmd{},
"export": &exporter{},
"export-numpy": &exportNumpy{},
- "filter": &filterer{},
+ "filter": &filtercmd{},
"build-docker-image": &buildDockerImage{},
"pca-go": &goPCA{},
"pca-py": &pythonPCA{},
echo >&2 "https://workbench2.${plot%%-*}.arvadosapi.com/collections/${plot}"
echo ${plot%%/*}
-numpy=$(lightning export-numpy -project ${project} -priority ${priority} -i ${filtered} -one-hot)
+numpy=$(lightning export-numpy -project ${project} -priority ${priority} -i ${merged} -one-hot -max-variants "30" -min-coverage "0.9")
log "github.com/sirupsen/logrus"
)
-type exportNumpy struct{}
+type exportNumpy struct {
+ filter filter
+}
func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
var err error
priority := flags.Int("priority", 500, "container request priority")
inputFilename := flags.String("i", "-", "input `file`")
outputFilename := flags.String("o", "-", "output `file`")
+ annotationsFilename := flags.String("output-annotations", "", "output `file` for tile variant annotations tsv")
+ librefsFilename := flags.String("output-onehot2tilevar", "", "when using -one-hot, create tsv `file` mapping column# to tag# and variant#")
onehot := flags.Bool("one-hot", false, "recode tile variants as one-hot")
+ cmd.filter.Flags(flags)
err = flags.Parse(args)
if err == flag.ErrHelp {
err = nil
Client: arvados.NewClientFromEnv(),
ProjectUUID: *projectUUID,
RAM: 128000000000,
- VCPUs: 2,
+ VCPUs: 32,
Priority: *priority,
}
err = runner.TranslatePaths(inputFilename)
if err != nil {
return 1
}
- runner.Args = []string{"export-numpy", "-local=true", fmt.Sprintf("-one-hot=%v", *onehot), "-i", *inputFilename, "-o", "/mnt/output/library.npy"}
+ runner.Args = []string{"export-numpy", "-local=true", fmt.Sprintf("-one-hot=%v", *onehot), "-i", *inputFilename, "-o", "/mnt/output/matrix.npy", "-output-annotations", "/mnt/output/annotations.tsv", "-output-onehot2tilevar", "/mnt/output/onehot2tilevar.tsv"}
var output string
output, err = runner.Run()
if err != nil {
return 1
}
- fmt.Fprintln(stdout, output+"/library.npy")
+ fmt.Fprintln(stdout, output+"/matrix.npy")
return 0
}
}
defer input.Close()
}
- tilelib := tileLibrary{
- includeNoCalls: true,
- compactGenomes: map[string][]tileVariantID{},
+ tilelib := &tileLibrary{
+ includeNoCalls: true,
+ retainTileSequences: true,
+ compactGenomes: map[string][]tileVariantID{},
}
err = tilelib.LoadGob(context.Background(), input, nil)
if err != nil {
return 1
}
- out, rows, cols := cgs2array(tilelib.compactGenomes)
+ log.Info("filtering")
+ cmd.filter.Apply(tilelib)
+
+ if *annotationsFilename != "" {
+ log.Infof("writing annotations")
+ var annow io.WriteCloser
+ annow, err = os.OpenFile(*annotationsFilename, os.O_CREATE|os.O_WRONLY, 0666)
+ if err != nil {
+ return 1
+ }
+ defer annow.Close()
+ err = (&annotatecmd{maxTileSize: 50000}).exportTileDiffs(annow, tilelib)
+ if err != nil {
+ return 1
+ }
+ err = annow.Close()
+ if err != nil {
+ return 1
+ }
+ }
+ log.Info("building numpy array")
+ out, rows, cols := cgs2array(tilelib.compactGenomes)
var output io.WriteCloser
if *outputFilename == "-" {
output = nopCloser{stdout}
return 1
}
if *onehot {
- out, cols = recodeOnehot(out, cols)
+ log.Info("recoding to onehot")
+ recoded, librefs, recodedcols := recodeOnehot(out, cols)
+ out, cols = recoded, recodedcols
+ if *librefsFilename != "" {
+ log.Infof("writing onehot column mapping")
+ err = cmd.writeLibRefs(*librefsFilename, tilelib, librefs)
+ if err != nil {
+ return 1
+ }
+ }
}
+ log.Info("writing numpy")
npw.Shape = []int{rows, cols}
npw.WriteUint16(out)
err = bufw.Flush()
return 0
}
+func (*exportNumpy) writeLibRefs(fnm string, tilelib *tileLibrary, librefs []tileLibRef) error {
+ f, err := os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0666)
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+ for i, libref := range librefs {
+ _, err = fmt.Fprintf(f, "%d\t%d\t%d\n", i, libref.Tag, libref.Variant)
+ if err != nil {
+ return err
+ }
+ }
+ return f.Close()
+}
+
func cgs2array(cgs map[string][]tileVariantID) (data []uint16, rows, cols int) {
var cgnames []string
for name := range cgs {
return
}
-func recodeOnehot(in []uint16, incols int) ([]uint16, int) {
+func recodeOnehot(in []uint16, incols int) (out []uint16, librefs []tileLibRef, outcols int) {
rows := len(in) / incols
maxvalue := make([]uint16, incols)
for row := 0; row < rows; row++ {
}
}
outcol := make([]int, incols)
- outcols := 0
dropped := 0
- for incol, v := range maxvalue {
+ for incol, maxv := range maxvalue {
outcol[incol] = outcols
- if v == 0 {
+ if maxv == 0 {
dropped++
- } else {
- outcols += int(v)
+ }
+ for v := 1; v <= int(maxv); v++ {
+ librefs = append(librefs, tileLibRef{Tag: tagID(incol), Variant: tileVariantID(v)})
+ outcols++
}
}
log.Printf("recodeOnehot: dropped %d input cols with zero maxvalue", dropped)
- out := make([]uint16, rows*outcols)
+ out = make([]uint16, rows*outcols)
for inidx, row := 0, 0; row < rows; row++ {
outrow := out[row*outcols:]
for col := 0; col < incols; col++ {
inidx++
}
}
- return out, outcols
+ return
}
type nopCloser struct {
},
},
} {
- out, outcols := recodeOnehot(trial.in, trial.incols)
+ out, _, outcols := recodeOnehot(trial.in, trial.incols)
c.Check(out, check.DeepEquals, trial.out)
c.Check(outcols, check.Equals, trial.outcols)
}
log "github.com/sirupsen/logrus"
)
-type filterer struct {
+type filter struct {
+ MaxVariants int
+ MinCoverage float64
+ MaxTag int
+}
+
+func (f *filter) Flags(flags *flag.FlagSet) {
+ flags.IntVar(&f.MaxVariants, "max-variants", -1, "drop tiles with more than `N` variants")
+ flags.Float64Var(&f.MinCoverage, "min-coverage", 0, "drop tiles with coverage less than `P` across all haplotypes (0 < P ≤ 1)")
+ flags.IntVar(&f.MaxTag, "max-tag", -1, "drop tiles with tag ID > `N`")
+}
+
+func (f *filter) Apply(tilelib *tileLibrary) {
+ // Zero out variants at tile positions that have more than
+ // f.MaxVariants tile variants.
+ if f.MaxVariants >= 0 {
+ for tag, variants := range tilelib.variant {
+ if f.MaxTag >= 0 && tag >= f.MaxTag {
+ break
+ }
+ if len(variants) <= f.MaxVariants {
+ continue
+ }
+ tilelib.variant[tag] = nil
+ for _, cg := range tilelib.compactGenomes {
+ if len(cg) > tag*2 {
+ cg[tag*2] = 0
+ cg[tag*2+1] = 0
+ }
+ }
+ }
+ }
+
+ // Zero out variants at tile positions that have less than
+ // f.MinCoverage.
+ mincov := int(2*f.MinCoverage*float64(len(tilelib.compactGenomes)) + 1)
+TAG:
+ for tag := 0; tag < len(tilelib.variant) && tag < f.MaxTag; tag++ {
+ tagcov := 0
+ for _, cg := range tilelib.compactGenomes {
+ if cg[tag*2] > 0 {
+ tagcov++
+ }
+ if cg[tag*2+1] > 0 {
+ tagcov++
+ }
+ if tagcov >= mincov {
+ continue TAG
+ }
+ }
+ for _, cg := range tilelib.compactGenomes {
+ cg[tag*2] = 0
+ cg[tag*2+1] = 0
+ }
+ }
+
+ // Truncate genomes and tile data to f.MaxTag (TODO: truncate
+ // refseqs too)
+ if f.MaxTag >= 0 {
+ if len(tilelib.variant) > f.MaxTag {
+ tilelib.variant = tilelib.variant[:f.MaxTag]
+ }
+ for name, cg := range tilelib.compactGenomes {
+ if len(cg) > 2*f.MaxTag {
+ tilelib.compactGenomes[name] = cg[:2*f.MaxTag]
+ }
+ }
+ }
+}
+
+type filtercmd struct {
output io.Writer
+ filter
}
-func (cmd *filterer) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+func (cmd *filtercmd) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
var err error
defer func() {
if err != nil {
priority := flags.Int("priority", 500, "container request priority")
inputFilename := flags.String("i", "-", "input `file`")
outputFilename := flags.String("o", "-", "output `file`")
- maxvariants := flags.Int("max-variants", -1, "drop tiles with more than `N` variants")
- mincoverage := flags.Float64("min-coverage", 0, "drop tiles with coverage less than `P` across all haplotypes (0 < P ≤ 1)")
- maxtag := flags.Int("max-tag", -1, "drop tiles with tag ID > `N`")
+ cmd.filter.Flags(flags)
err = flags.Parse(args)
if err == flag.ErrHelp {
err = nil
runner.Args = []string{"filter", "-local=true",
"-i", *inputFilename,
"-o", "/mnt/output/library.gob",
- "-max-variants", fmt.Sprintf("%d", *maxvariants),
- "-min-coverage", fmt.Sprintf("%f", *mincoverage),
- "-max-tag", fmt.Sprintf("%d", *maxtag),
+ "-max-variants", fmt.Sprintf("%d", cmd.MaxVariants),
+ "-min-coverage", fmt.Sprintf("%f", cmd.MinCoverage),
+ "-max-tag", fmt.Sprintf("%d", cmd.MaxTag),
}
var output string
output, err = runner.Run()
if ntags < len(cg.Variants)/2 {
ntags = len(cg.Variants) / 2
}
- if *maxvariants < 0 {
+ if cmd.MaxVariants < 0 {
continue
}
- maxVariantID := tileVariantID(*maxvariants)
for idx, variant := range cg.Variants {
- if variant > maxVariantID {
+ if variant > tileVariantID(cmd.MaxVariants) {
for _, cg := range cgs {
if len(cg.Variants) > idx {
cg.Variants[idx & ^1] = 0
}
}
- if *maxtag >= 0 && ntags > *maxtag {
- ntags = *maxtag
+ if cmd.MaxTag >= 0 && ntags > cmd.MaxTag {
+ ntags = cmd.MaxTag
for i, cg := range cgs {
- if len(cg.Variants) > *maxtag*2 {
- cgs[i].Variants = cg.Variants[:*maxtag*2]
+ if len(cg.Variants) > cmd.MaxTag*2 {
+ cgs[i].Variants = cg.Variants[:cmd.MaxTag*2]
}
}
}
- if *mincoverage > 0 {
- mincov := int(*mincoverage * float64(len(cgs)*2))
+ if cmd.MinCoverage > 0 {
+ mincov := int(cmd.MinCoverage * float64(len(cgs)*2))
cov := make([]int, ntags)
for _, cg := range cgs {
for idx, variant := range cg.Variants {
data, rows, cols := cgs2array(tilelib.compactGenomes)
if *onehot {
log.Printf("recode one-hot: %d rows, %d cols", rows, cols)
- data, cols = recodeOnehot(data, cols)
+ data, _, cols = recodeOnehot(data, cols)
}
log.Printf("creating matrix backed by array: %d rows, %d cols", rows, cols)
}
type tileLibrary struct {
- includeNoCalls bool
- skipOOO bool
+ includeNoCalls bool
+ skipOOO bool
+ retainTileSequences bool
+
taglib *tagLibrary
variant [][][blake2b.Size256]byte
refseqs map[string]map[string][]tileLibRef
compactGenomes map[string][]tileVariantID
// count [][]int
- // seq map[[blake2b.Size]byte][]byte
+ seq map[[blake2b.Size256]byte][]byte
variants int
// if non-nil, write out any tile variants added while tiling
encoder *gob.Encoder
}
}
tilelib.mtx.Lock()
- // if tilelib.seq == nil {
- // tilelib.seq = map[[blake2b.Size]byte][]byte{}
- // }
if tilelib.variant == nil && tilelib.taglib != nil {
tilelib.variant = make([][][blake2b.Size256]byte, tilelib.taglib.Len())
}
}
tilelib.variants++
tilelib.variant[tag] = append(tilelib.variant[tag], seqhash)
- // tilelib.seq[seqhash] = append([]byte(nil), seq...)
+ if tilelib.retainTileSequences {
+ if tilelib.seq == nil {
+ tilelib.seq = map[[blake2b.Size256]byte][]byte{}
+ }
+ tilelib.seq[seqhash] = append([]byte(nil), seq...)
+ }
variant := tileVariantID(len(tilelib.variant[tag]))
tilelib.mtx.Unlock()
return tileLibRef{Tag: tag, Variant: variant}
}
+func (tilelib *tileLibrary) TileVariantSequence(libref tileLibRef) []byte {
+ if libref.Variant == 0 || len(tilelib.variant) <= int(libref.Tag) || len(tilelib.variant[libref.Tag]) < int(libref.Variant) {
+ return nil
+ }
+ return tilelib.seq[tilelib.variant[libref.Tag][libref.Variant-1]]
+}
+
func countBases(seq []byte) int {
n := 0
for _, c := range seq {