import (
"bufio"
- "bytes"
"fmt"
"io"
)
return taglib.setTags(seqs)
}
-func (taglib *tagLibrary) FindAll(buf []byte, fn func(id tagID, pos, taglen int)) {
+func (taglib *tagLibrary) FindAll(in *bufio.Reader, passthrough io.Writer, fn func(id tagID, pos, taglen int)) error {
+ var window = make([]byte, 0, taglib.keylen*1000)
var key tagmapKey
- valid := 0 // if valid < taglib.keylen, key has "no data" zeroes that are otherwise indistinguishable from "A"
- for i, base := range buf {
+ for offset := 0; ; {
+ base, err := in.ReadByte()
+ if err == io.EOF {
+ return nil
+ } else if err != nil {
+ return err
+ } else if base == '\r' || base == '\n' {
+ if buf, err := in.Peek(1); err == nil && len(buf) > 0 && buf[0] == '>' {
+ return nil
+ } else if err == io.EOF {
+ return nil
+ }
+ continue
+ } else if base == '>' || base == ' ' {
+ return fmt.Errorf("unexpected char %q at offset %d in fasta data", base, offset)
+ }
+
+ if passthrough != nil {
+ _, err = passthrough.Write([]byte{base})
+ if err != nil {
+ return err
+ }
+ }
if !isbase[int(base)] {
- valid = 0
+ // 'N' or various other chars meaning exact
+ // base not known
+ window = window[:0]
continue
}
+ offset++
+ window = append(window, base)
+ if len(window) == cap(window) {
+ copy(window, window[len(window)-taglib.keylen:])
+ window = window[:taglib.keylen]
+ }
key = ((key << 2) | twobit[int(base)]) & taglib.keymask
- valid++
- if valid < taglib.keylen {
+ if len(window) < taglib.keylen {
continue
} else if taginfo, ok := taglib.tagmap[key]; !ok {
continue
- } else if tagstart := i - taglib.keylen + 1; len(taginfo.tagseq) > taglib.keylen && (len(buf) < i+len(taginfo.tagseq) || !bytes.Equal(taginfo.tagseq, buf[i:i+len(taginfo.tagseq)])) {
- // key portion matches, but not the entire tag
- continue
+ } else if len(taginfo.tagseq) != taglib.keylen {
+ return fmt.Errorf("assertion failed: len(%q) != keylen %d", taginfo.tagseq, taglib.keylen)
} else {
- fn(taginfo.id, tagstart, len(taginfo.tagseq))
- valid = 0 // don't try to match overlapping tags
+ fn(taginfo.id, offset-taglib.keylen, len(taginfo.tagseq))
+ window = window[:0] // don't try to match overlapping tags
}
}
+ return nil
}
func (taglib *tagLibrary) Len() int {
import (
"bufio"
+ "bytes"
"fmt"
"io"
"math/rand"
c.Assert(err, check.IsNil)
haystack := []byte(`ggagaactgtgctccgccttcagaccccccccccccccccccccacacatgctagcgcgtcggggtgggggggggggggggggggggggggggactctagcagagtggccagccac`)
var matches []tagMatch
- taglib.FindAll(haystack, func(id tagID, pos, taglen int) {
+ taglib.FindAll(bufio.NewReader(bytes.NewBuffer(haystack)), nil, func(id tagID, pos, taglen int) {
matches = append(matches, tagMatch{id, pos, taglen})
})
c.Check(matches, check.DeepEquals, []tagMatch{{0, 0, 24}, {1, 44, 24}, {2, 92, 24}})
c.Assert(err, check.IsNil)
c.Logf("@%v find tags in input", time.Since(start))
var matches []tagMatch
- taglib.FindAll(haystack, func(id tagID, pos, taglen int) {
+ taglib.FindAll(bufio.NewReader(bytes.NewBuffer(haystack)), nil, func(id tagID, pos, taglen int) {
matches = append(matches, tagMatch{id, pos, taglen})
})
c.Logf("@%v done", time.Since(start))
func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader, matchChromosome *regexp.Regexp, isRef bool) (tileSeq, []importStats, error) {
ret := tileSeq{}
- type jobT struct {
- label string
- fasta []byte
- }
- todo := make(chan jobT, 1)
- scanner := bufio.NewScanner(rdr)
- scanner.Buffer(make([]byte, 256), 1<<29) // 512 MiB, in case fasta does not have line breaks
- go func() {
- defer close(todo)
- var fasta []byte
- var seqlabel string
- for scanner.Scan() {
- buf := scanner.Bytes()
- if len(buf) > 0 && buf[0] == '>' {
- todo <- jobT{seqlabel, append([]byte(nil), fasta...)}
- seqlabel, fasta = strings.SplitN(string(buf[1:]), " ", 2)[0], fasta[:0]
- log.Debugf("%s %s reading fasta", filelabel, seqlabel)
- } else if len(buf) > 0 && buf[0] == '#' {
- // ignore testdata comment
- } else {
- fasta = append(fasta, bytes.ToLower(buf)...)
- }
- }
- todo <- jobT{seqlabel, fasta}
- }()
type foundtag struct {
pos int
tagid tagID
skippedSequences := 0
taglen := tilelib.taglib.TagLen()
var stats []importStats
- for job := range todo {
- if len(job.fasta) == 0 {
- continue
- } else if !matchChromosome.MatchString(job.label) {
+
+ in := bufio.NewReader(rdr)
+readall:
+ for {
+ var seqlabel string
+ // Advance to next '>', then
+ // read seqlabel up to \r?\n
+ readseqlabel:
+ for seqlabelStarted := false; ; {
+ rune, _, err := in.ReadRune()
+ if err == io.EOF {
+ break readall
+ } else if err != nil {
+ return nil, nil, err
+ }
+ switch {
+ case rune == '\r':
+ case seqlabelStarted && rune == '\n':
+ break readseqlabel
+ case seqlabelStarted:
+ seqlabel += string(rune)
+ case rune == '>':
+ seqlabelStarted = true
+ default:
+ }
+ }
+ log.Debugf("%s %s reading fasta", filelabel, seqlabel)
+ if !matchChromosome.MatchString(seqlabel) {
skippedSequences++
continue
}
- log.Debugf("%s %s tiling", filelabel, job.label)
+ log.Debugf("%s %s tiling", filelabel, seqlabel)
+ fasta := bytes.NewBuffer(nil)
found = found[:0]
- tilelib.taglib.FindAll(job.fasta, func(tagid tagID, pos, taglen int) {
+ err := tilelib.taglib.FindAll(in, fasta, func(tagid tagID, pos, taglen int) {
found = append(found, foundtag{pos: pos, tagid: tagid})
})
+ if err != nil {
+ return nil, nil, err
+ }
totalFoundTags += len(found)
if len(found) == 0 {
- log.Warnf("%s %s no tags found", filelabel, job.label)
+ log.Warnf("%s %s no tags found", filelabel, seqlabel)
}
droppedDup := 0
}
}
droppedDup = len(found) - dst
- log.Infof("%s %s dropping %d non-unique tags", filelabel, job.label, droppedDup)
+ log.Infof("%s %s dropping %d non-unique tags", filelabel, seqlabel, droppedDup)
found = found[:dst]
}
found[i] = found[x]
}
droppedOOO = len(found) - len(keep)
- log.Infof("%s %s dropping %d out-of-order tags", filelabel, job.label, droppedOOO)
+ log.Infof("%s %s dropping %d out-of-order tags", filelabel, seqlabel, droppedOOO)
found = found[:len(keep)]
}
- log.Infof("%s %s getting %d librefs", filelabel, job.label, len(found))
+ log.Infof("%s %s getting %d librefs", filelabel, seqlabel, len(found))
throttle := &throttle{Max: runtime.NumCPU()}
path = path[:len(found)]
var lowquality int64
startpos = f.pos
}
if i == len(found)-1 {
- endpos = len(job.fasta)
+ endpos = fasta.Len()
} else {
endpos = found[i+1].pos + taglen
}
- path[i] = tilelib.getRef(f.tagid, job.fasta[startpos:endpos], isRef)
- if countBases(job.fasta[startpos:endpos]) != endpos-startpos {
+ path[i] = tilelib.getRef(f.tagid, fasta.Bytes()[startpos:endpos], isRef)
+ if countBases(fasta.Bytes()[startpos:endpos]) != endpos-startpos {
atomic.AddInt64(&lowquality, 1)
}
}()
}
throttle.Wait()
- log.Infof("%s %s copying path", filelabel, job.label)
+ log.Infof("%s %s copying path", filelabel, seqlabel)
pathcopy := make([]tileLibRef, len(path))
copy(pathcopy, path)
- ret[job.label] = pathcopy
+ ret[seqlabel] = pathcopy
- basesIn := countBases(job.fasta)
- log.Infof("%s %s fasta in %d coverage in %d path len %d low-quality %d", filelabel, job.label, len(job.fasta), basesIn, len(path), lowquality)
+ basesIn := countBases(fasta.Bytes())
+ log.Infof("%s %s fasta in %d coverage in %d path len %d low-quality %d", filelabel, seqlabel, fasta.Len(), basesIn, len(path), lowquality)
stats = append(stats, importStats{
InputFile: filelabel,
- InputLabel: job.label,
- InputLength: len(job.fasta),
+ InputLabel: seqlabel,
+ InputLength: fasta.Len(),
InputCoverage: basesIn,
PathLength: len(path),
DroppedOutOfOrderTags: droppedOOO,
totalPathLen += len(path)
}
log.Printf("%s tiled with total path len %d in %d sequences (skipped %d sequences that did not match chromosome regexp, skipped %d out-of-order tags)", filelabel, totalPathLen, len(ret), skippedSequences, totalFoundTags-totalPathLen)
- return ret, stats, scanner.Err()
+ return ret, stats, nil
}
func (tilelib *tileLibrary) Len() int64 {