"encoding/gob"
"fmt"
"io"
+ "math/rand"
"os"
"regexp"
"runtime"
retainNoCalls bool
skipOOO bool
retainTileSequences bool
+ useDups bool
taglib *tagLibrary
variant [][][blake2b.Size256]byte
}
type importStats struct {
- InputFile string
- InputLabel string
- InputLength int
- InputCoverage int
- PathLength int
- DroppedOutOfOrderTiles int
+ InputFile string
+ InputLabel string
+ InputLength int
+ InputCoverage int
+ PathLength int
+ DroppedRepeatedTags int
+ DroppedOutOfOrderTags int
}
func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader, matchChromosome *regexp.Regexp, isRef bool) (tileSeq, []importStats, error) {
ret := tileSeq{}
- type jobT struct {
- label string
- fasta []byte
- }
- todo := make(chan jobT, 1)
- scanner := bufio.NewScanner(rdr)
- scanner.Buffer(make([]byte, 256), 1<<29) // 512 MiB, in case fasta does not have line breaks
- go func() {
- defer close(todo)
- var fasta []byte
- var seqlabel string
- for scanner.Scan() {
- buf := scanner.Bytes()
- if len(buf) > 0 && buf[0] == '>' {
- todo <- jobT{seqlabel, append([]byte(nil), fasta...)}
- seqlabel, fasta = strings.SplitN(string(buf[1:]), " ", 2)[0], fasta[:0]
- log.Debugf("%s %s reading fasta", filelabel, seqlabel)
- } else {
- fasta = append(fasta, bytes.ToLower(buf)...)
- }
- }
- todo <- jobT{seqlabel, fasta}
- }()
type foundtag struct {
pos int
tagid tagID
skippedSequences := 0
taglen := tilelib.taglib.TagLen()
var stats []importStats
- for job := range todo {
- if len(job.fasta) == 0 {
- continue
- } else if !matchChromosome.MatchString(job.label) {
+
+ in := bufio.NewReader(rdr)
+readall:
+ for {
+ var seqlabel string
+ // Advance to next '>', then
+ // read seqlabel up to \r?\n
+ readseqlabel:
+ for seqlabelStarted := false; ; {
+ rune, _, err := in.ReadRune()
+ if err == io.EOF {
+ break readall
+ } else if err != nil {
+ return nil, nil, err
+ }
+ switch {
+ case rune == '\r':
+ case seqlabelStarted && rune == '\n':
+ break readseqlabel
+ case seqlabelStarted:
+ seqlabel += string(rune)
+ case rune == '>':
+ seqlabelStarted = true
+ default:
+ }
+ }
+ log.Debugf("%s %s reading fasta", filelabel, seqlabel)
+ if !matchChromosome.MatchString(seqlabel) {
skippedSequences++
continue
}
- log.Debugf("%s %s tiling", filelabel, job.label)
+ log.Debugf("%s %s tiling", filelabel, seqlabel)
+ fasta := bytes.NewBuffer(nil)
found = found[:0]
- tilelib.taglib.FindAll(job.fasta, func(tagid tagID, pos, taglen int) {
+ err := tilelib.taglib.FindAll(in, fasta, func(tagid tagID, pos, taglen int) {
found = append(found, foundtag{pos: pos, tagid: tagid})
})
+ if err != nil {
+ return nil, nil, err
+ }
totalFoundTags += len(found)
if len(found) == 0 {
- log.Warnf("%s %s no tags found", filelabel, job.label)
+ log.Warnf("%s %s no tags found", filelabel, seqlabel)
+ }
+
+ droppedDup := 0
+ if !tilelib.useDups {
+ // Remove any tags that appeared more than once
+ dup := map[tagID]bool{}
+ for _, ft := range found {
+ _, dup[ft.tagid] = dup[ft.tagid]
+ }
+ dst := 0
+ for _, ft := range found {
+ if !dup[ft.tagid] {
+ found[dst] = ft
+ dst++
+ }
+ }
+ droppedDup = len(found) - dst
+ log.Infof("%s %s dropping %d non-unique tags", filelabel, seqlabel, droppedDup)
+ found = found[:dst]
}
- skipped := 0
+ droppedOOO := 0
if tilelib.skipOOO {
- log.Infof("%s %s keeping longest increasing subsequence", filelabel, job.label)
keep := longestIncreasingSubsequence(len(found), func(i int) int { return int(found[i].tagid) })
for i, x := range keep {
found[i] = found[x]
}
- skipped = len(found) - len(keep)
+ droppedOOO = len(found) - len(keep)
+ log.Infof("%s %s dropping %d out-of-order tags", filelabel, seqlabel, droppedOOO)
found = found[:len(keep)]
}
- log.Infof("%s %s getting %d librefs", filelabel, job.label, len(found))
- throttle := &throttle{Max: runtime.NumCPU()}
+ log.Infof("%s %s getting %d librefs", filelabel, seqlabel, len(found))
path = path[:len(found)]
var lowquality int64
- for i, f := range found {
- i, f := i, f
- throttle.Acquire()
- go func() {
- defer throttle.Release()
- var startpos, endpos int
- if i == 0 {
- startpos = 0
- } else {
- startpos = f.pos
- }
- if i == len(found)-1 {
- endpos = len(job.fasta)
- } else {
- endpos = found[i+1].pos + taglen
- }
- path[i] = tilelib.getRef(f.tagid, job.fasta[startpos:endpos], isRef)
- if countBases(job.fasta[startpos:endpos]) != endpos-startpos {
- atomic.AddInt64(&lowquality, 1)
- }
- }()
+ // Visit each element of found, but start at a random
+ // index, to reduce the likelihood of lock contention
+ // when importing many samples concurrently.
+ startpoint := rand.Int() % len(found)
+ for offset := range found {
+ i := startpoint + offset
+ if i >= len(found) {
+ i -= len(found)
+ }
+ f := found[i]
+ var startpos, endpos int
+ if i == 0 {
+ startpos = 0
+ } else {
+ startpos = f.pos
+ }
+ if i == len(found)-1 {
+ endpos = fasta.Len()
+ } else {
+ endpos = found[i+1].pos + taglen
+ }
+ path[i] = tilelib.getRef(f.tagid, fasta.Bytes()[startpos:endpos], isRef)
+ if countBases(fasta.Bytes()[startpos:endpos]) != endpos-startpos {
+ lowquality++
+ }
}
- throttle.Wait()
- log.Infof("%s %s copying path", filelabel, job.label)
+ log.Infof("%s %s copying path", filelabel, seqlabel)
pathcopy := make([]tileLibRef, len(path))
copy(pathcopy, path)
- ret[job.label] = pathcopy
+ ret[seqlabel] = pathcopy
- basesIn := countBases(job.fasta)
- log.Infof("%s %s fasta in %d coverage in %d path len %d low-quality %d skipped-out-of-order %d", filelabel, job.label, len(job.fasta), basesIn, len(path), lowquality, skipped)
+ basesIn := countBases(fasta.Bytes())
+ log.Infof("%s %s fasta in %d coverage in %d path len %d low-quality %d", filelabel, seqlabel, fasta.Len(), basesIn, len(path), lowquality)
stats = append(stats, importStats{
- InputFile: filelabel,
- InputLabel: job.label,
- InputLength: len(job.fasta),
- InputCoverage: basesIn,
- PathLength: len(path),
- DroppedOutOfOrderTiles: skipped,
+ InputFile: filelabel,
+ InputLabel: seqlabel,
+ InputLength: fasta.Len(),
+ InputCoverage: basesIn,
+ PathLength: len(path),
+ DroppedOutOfOrderTags: droppedOOO,
+ DroppedRepeatedTags: droppedDup,
})
totalPathLen += len(path)
}
log.Printf("%s tiled with total path len %d in %d sequences (skipped %d sequences that did not match chromosome regexp, skipped %d out-of-order tags)", filelabel, totalPathLen, len(ret), skippedSequences, totalFoundTags-totalPathLen)
- return ret, stats, scanner.Err()
+ return ret, stats, nil
}
func (tilelib *tileLibrary) Len() int64 {