-package main
+// Copyright (C) The Lightning Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package lightning
import (
"bufio"
+ "context"
"errors"
"flag"
"fmt"
)
type annotatecmd struct {
- variantHash bool
- maxTileSize int
+ dropTiles []bool
+ variantHash bool
+ maxTileSize int
+ tag2tagid map[string]tagID
+ reportAnnotation func(tag tagID, outcol int, variant tileVariantID, refname string, seqname string, pdi hgvs.Variant)
}
func (cmd *annotatecmd) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
if err != nil {
return 1
}
- runner.Args = []string{"annotate", "-local=true", fmt.Sprintf("-variant-hash=%v", cmd.variantHash), "-max-tile-size", strconv.Itoa(cmd.maxTileSize), "-i", *inputFilename, "-o", "/mnt/output/tilevariants.tsv"}
+ runner.Args = []string{"annotate", "-local=true", fmt.Sprintf("-variant-hash=%v", cmd.variantHash), "-max-tile-size", strconv.Itoa(cmd.maxTileSize), "-i", *inputFilename, "-o", "/mnt/output/tilevariants.csv"}
var output string
output, err = runner.Run()
if err != nil {
return 1
}
- fmt.Fprintln(stdout, output+"/tilevariants.tsv")
+ fmt.Fprintln(stdout, output+"/tilevariants.csv")
return 0
}
}
defer output.Close()
}
- bufw := bufio.NewWriter(output)
+ bufw := bufio.NewWriterSize(output, 4*1024*1024)
- err = cmd.exportTileDiffs(bufw, input)
+ tilelib := &tileLibrary{
+ retainNoCalls: true,
+ retainTileSequences: true,
+ }
+ err = tilelib.LoadGob(context.Background(), input, strings.HasSuffix(*inputFilename, ".gz"))
+ if err != nil {
+ return 1
+ }
+ err = cmd.exportTileDiffs(bufw, tilelib)
if err != nil {
return 1
}
return 0
}
-func (cmd *annotatecmd) exportTileDiffs(outw io.Writer, librdr io.Reader) error {
- var refs []CompactSequence
- var tiles [][]TileVariant
- var tagset [][]byte
- var taglen int
- err := DecodeLibrary(librdr, func(ent *LibraryEntry) error {
- if len(ent.TagSet) > 0 {
- if tagset != nil {
- return errors.New("error: not implemented: input has multiple tagsets")
- }
- tagset = ent.TagSet
- taglen = len(tagset[0])
- tiles = make([][]TileVariant, len(tagset))
- }
- for _, tv := range ent.TileVariants {
- if tv.Tag >= tagID(len(tiles)) {
- return fmt.Errorf("error: reading tile variant for tag %d but only %d tags were loaded", tv.Tag, len(tiles))
- }
- for len(tiles[tv.Tag]) <= int(tv.Variant) {
- tiles[tv.Tag] = append(tiles[tv.Tag], TileVariant{})
- }
- tiles[tv.Tag][tv.Variant] = tv
- }
- for _, cs := range ent.CompactSequences {
- refs = append(refs, cs)
- }
- return nil
- })
- if err != nil {
- return err
+func (cmd *annotatecmd) exportTileDiffs(outw io.Writer, tilelib *tileLibrary) error {
+ tagset := tilelib.taglib.Tags()
+ if len(tagset) == 0 {
+ return errors.New("cannot annotate library without tags")
+ }
+ taglen := len(tagset[0])
+ var refs []string
+ for name := range tilelib.refseqs {
+ refs = append(refs, name)
}
- tag2tagid := make(map[string]tagID, len(tagset))
+ cmd.tag2tagid = make(map[string]tagID, len(tagset))
for tagid, tagseq := range tagset {
- tag2tagid[string(tagseq)] = tagID(tagid)
+ cmd.tag2tagid[string(tagseq)] = tagID(tagid)
}
- sort.Slice(refs, func(i, j int) bool { return refs[i].Name < refs[j].Name })
+ sort.Strings(refs)
log.Infof("len(refs) %d", len(refs))
- outch := make(chan string, 1)
+ outch := make(chan string, runtime.NumCPU()*2)
var outwg sync.WaitGroup
defer outwg.Wait()
outwg.Add(1)
}()
defer close(outch)
- limiter := make(chan bool, runtime.NumCPU()+1)
- var diffwg sync.WaitGroup
- defer diffwg.Wait()
+ nseqs := 0
+ for _, refcs := range tilelib.refseqs {
+ nseqs += len(refcs)
+ }
+
+ throttle := &throttle{Max: runtime.NumCPU()*2 + nseqs*2 + 1}
+ defer throttle.Wait()
- for _, refcs := range refs {
- refcs := refcs
+ for _, refname := range refs {
+ refname := refname
+ refcs := tilelib.refseqs[refname]
var seqnames []string
- for seqname := range refcs.TileSequences {
+ for seqname := range refcs {
seqnames = append(seqnames, seqname)
}
sort.Strings(seqnames)
for _, seqname := range seqnames {
seqname := seqname
- var refseq []byte
- // tilestart[123] is the index into refseq
- // where the tile for tag 123 was placed.
- tilestart := map[tagID]int{}
- tileend := map[tagID]int{}
- for _, libref := range refcs.TileSequences[seqname] {
- if libref.Variant < 1 {
- return fmt.Errorf("reference %q seq %q uses variant zero at tag %d", refcs.Name, seqname, libref.Tag)
- }
- seq := tiles[libref.Tag][libref.Variant].Sequence
- if len(seq) < taglen {
- return fmt.Errorf("reference %q seq %q uses tile %d variant %d with sequence len %d < taglen %d", refcs.Name, seqname, libref.Tag, libref.Variant, len(seq), taglen)
- }
- overlap := taglen
- if len(refseq) == 0 {
- overlap = 0
- }
- tilestart[libref.Tag] = len(refseq) - overlap
- refseq = append(refseq, seq[overlap:]...)
- tileend[libref.Tag] = len(refseq)
+ throttle.Acquire()
+ if throttle.Err() != nil {
+ break
}
- log.Infof("seq %s len(refseq) %d len(tilestart) %d", seqname, len(refseq), len(tilestart))
- for tag, tvs := range tiles {
- tag := tagID(tag)
- refstart, ok := tilestart[tag]
- if !ok {
- // Tag didn't place on this
- // reference sequence. (It
- // might place on the same
- // chromosome in a genome
- // anyway, but we don't output
- // the annotations that would
- // result.)
- continue
- }
- for variant, tv := range tvs {
- variant, tv := variant, tv
- if variant == 0 {
- continue
- }
- if len(tv.Sequence) < taglen {
- return fmt.Errorf("tilevar %d,%d has sequence len %d < taglen %d", tag, variant, len(tv.Sequence), taglen)
- }
- var refpart []byte
- endtag := string(tv.Sequence[len(tv.Sequence)-taglen:])
- if endtagid, ok := tag2tagid[endtag]; !ok {
- // Tile variant doesn't end on a tag, so it can only place at the end of a chromosome.
- refpart = refseq[refstart:]
- log.Warnf("%x tilevar %d,%d endtag not in ref: %s", tv.Blake2b[:13], tag, variant, endtag)
- } else if refendtagstart, ok := tilestart[endtagid]; !ok {
- // Ref ends a chromsome with a (possibly very large) variant of this tile, but genomes with this tile don't.
- // Give up. (TODO: something smarter)
- log.Debugf("%x not annotating tilevar %d,%d because end tag %d is not in ref", tv.Blake2b[:13], tag, variant, endtagid)
- continue
+ go func() {
+ defer throttle.Release()
+ throttle.Report(cmd.annotateSequence(throttle, outch, tilelib, taglen, refname, seqname, refcs[seqname], len(refs) > 1))
+ }()
+ }
+ }
+ throttle.Wait()
+ return throttle.Err()
+}
+
+func (cmd *annotatecmd) annotateSequence(throttle *throttle, outch chan<- string, tilelib *tileLibrary, taglen int, refname, seqname string, reftiles []tileLibRef, refnamecol bool) error {
+ refnamefield := ""
+ if refnamecol {
+ refnamefield = "," + trimFilenameForLabel(refname)
+ }
+ var refseq []byte
+ // tilestart[123] is the index into refseq
+ // where the tile for tag 123 was placed.
+ tilestart := map[tagID]int{}
+ tileend := map[tagID]int{}
+ for _, libref := range reftiles {
+ if libref.Variant < 1 {
+ return fmt.Errorf("reference %q seq %q uses variant zero at tag %d", refname, seqname, libref.Tag)
+ }
+ seq := tilelib.TileVariantSequence(libref)
+ if len(seq) < taglen {
+ return fmt.Errorf("reference %q seq %q uses tile %d variant %d with sequence len %d < taglen %d", refname, seqname, libref.Tag, libref.Variant, len(seq), taglen)
+ }
+ overlap := taglen
+ if len(refseq) == 0 {
+ overlap = 0
+ }
+ tilestart[libref.Tag] = len(refseq) - overlap
+ refseq = append(refseq, seq[overlap:]...)
+ tileend[libref.Tag] = len(refseq)
+ }
+ log.Infof("seq %s len(refseq) %d len(tilestart) %d", seqname, len(refseq), len(tilestart))
+ // outtag is tag's index in the subset of tags that aren't
+ // dropped. If there are 10M tags and half are dropped by
+ // dropTiles, tag ranges from 0 to 10M-1 and outtag ranges
+ // from 0 to 5M-1.
+ //
+ // IOW, in the matrix built by cgs2array(), {tag} is
+ // represented by columns {outtag}*2 and {outtag}*2+1.
+ outcol := -1
+ for tag, tvs := range tilelib.variant {
+ if len(cmd.dropTiles) > tag && cmd.dropTiles[tag] {
+ continue
+ }
+ tag := tagID(tag)
+ outcol++
+ // Must shadow outcol var to use safely in goroutine below.
+ outcol := outcol
+ refstart, ok := tilestart[tag]
+ if !ok {
+ // Tag didn't place on this reference
+ // sequence. (It might place on the same
+ // chromosome in a genome anyway, but we don't
+ // output the annotations that would result.)
+ // outch <- fmt.Sprintf("%d,%d,-1%s\n", tag, outcol, refnamefield)
+ continue
+ }
+ for variant := 1; variant <= len(tvs); variant++ {
+ variant, hash := tileVariantID(variant), tvs[variant-1]
+ tileseq := tilelib.TileVariantSequence(tileLibRef{Tag: tag, Variant: variant})
+ if len(tileseq) == 0 {
+ continue
+ } else if len(tileseq) < taglen {
+ return fmt.Errorf("tilevar %d,%d has sequence len %d < taglen %d", tag, variant, len(tileseq), taglen)
+ }
+ var refpart []byte
+ endtag := string(tileseq[len(tileseq)-taglen:])
+ if endtagid, ok := cmd.tag2tagid[endtag]; !ok {
+ // Tile variant doesn't end on a tag, so it can only place at the end of a chromosome.
+ refpart = refseq[refstart:]
+ log.Warnf("%x tilevar %d,%d endtag not in ref: %s", hash[:13], tag, variant, endtag)
+ } else if refendtagstart, ok := tilestart[endtagid]; !ok {
+ // Ref ends a chromsome with a (possibly very large) variant of this tile, but genomes with this tile don't.
+ // Give up. (TODO: something smarter)
+ log.Debugf("%x not annotating tilevar %d,%d because end tag %d is not in ref", hash[:13], tag, variant, endtagid)
+ continue
+ } else {
+ // Non-terminal tile vs. non-terminal reference.
+ refpart = refseq[refstart : refendtagstart+taglen]
+ log.Tracef("\n%x tilevar %d,%d endtag %s endtagid %d refendtagstart %d", hash[:13], tag, variant, endtag, endtagid, refendtagstart)
+ }
+ if len(refpart) > cmd.maxTileSize {
+ log.Warnf("%x tilevar %d,%d skipping long diff, ref %s seq %s pos %d ref len %d", hash[:13], tag, variant, refname, seqname, refstart, len(refpart))
+ continue
+ }
+ if len(tileseq) > cmd.maxTileSize {
+ log.Warnf("%x tilevar %d,%d skipping long diff, ref %s seq %s variant len %d", hash[:13], tag, variant, refname, seqname, len(tileseq))
+ continue
+ }
+ // log.Printf("\n%x @ refstart %d \n< %s\n> %s\n", tv.Blake2b, refstart, refpart, tileseq)
+
+ throttle.Acquire()
+ go func() {
+ defer throttle.Release()
+ diffs, _ := hgvs.Diff(strings.ToUpper(string(refpart)), strings.ToUpper(string(tileseq)), 0)
+ for _, diff := range diffs {
+ diff.Position += refstart
+ var varid string
+ if cmd.variantHash {
+ varid = fmt.Sprintf("%x", hash)[:13]
} else {
- // Non-terminal tile vs. non-terminal reference.
- refpart = refseq[refstart : refendtagstart+taglen]
- log.Tracef("\n%x tilevar %d,%d endtag %s endtagid %d refendtagstart %d", tv.Blake2b[:13], tag, variant, endtag, endtagid, refendtagstart)
- }
- if len(refpart) > cmd.maxTileSize {
- log.Warnf("%x tilevar %d,%d skipping long diff, ref %s seq %s pos %d ref len %d", tv.Blake2b[:13], tag, variant, refcs.Name, seqname, refstart, len(refpart))
- continue
+ varid = fmt.Sprintf("%d", variant)
}
- if len(tv.Sequence) > cmd.maxTileSize {
- log.Warnf("%x tilevar %d,%d skipping long diff, ref %s seq %s variant len %d", tv.Blake2b[:13], tag, variant, refcs.Name, seqname, len(tv.Sequence))
- continue
+ outch <- fmt.Sprintf("%d,%d,%s%s,%s:g.%s\n", tag, outcol, varid, refnamefield, seqname, diff.String())
+ if cmd.reportAnnotation != nil {
+ cmd.reportAnnotation(tag, outcol, variant, refname, seqname, diff)
}
- // log.Printf("\n%x @ refstart %d \n< %s\n> %s\n", tv.Blake2b, refstart, refpart, tv.Sequence)
-
- diffwg.Add(1)
- limiter <- true
- go func() {
- defer func() {
- <-limiter
- diffwg.Done()
- }()
- diffs, _ := hgvs.Diff(strings.ToUpper(string(refpart)), strings.ToUpper(string(tv.Sequence)), 0)
- for _, diff := range diffs {
- diff.Position += refstart
- var varid string
- if cmd.variantHash {
- varid = fmt.Sprintf("%x", tv.Blake2b)[:13]
- } else {
- varid = strconv.Itoa(variant)
- }
- outch <- fmt.Sprintf("%d\t%s\t%s\t%s:g.%s\n", tag, varid, refcs.Name, seqname, diff.String())
- }
- }()
}
- }
+ }()
}
}
return nil