13 log "github.com/sirupsen/logrus"
14 "golang.org/x/crypto/blake2b"
17 type tileVariantID uint16 // 1-based
19 type tileLibRef struct {
24 type tileSeq map[string][]tileLibRef
26 func (tseq tileSeq) Variants() ([]tileVariantID, int, int) {
28 for _, refs := range tseq {
29 for _, ref := range refs {
30 if maxtag < int(ref.tag) {
35 vars := make([]tileVariantID, maxtag+1)
37 for _, refs := range tseq {
38 for _, ref := range refs {
39 if vars[int(ref.tag)] != 0 {
44 vars[int(ref.tag)] = ref.variant
47 return vars, kept, dropped
50 type tileLibrary struct {
54 variant [][][blake2b.Size256]byte
56 // seq map[[blake2b.Size]byte][]byte
58 // if non-nil, write out any tile variants added while tiling
60 // if non-nil, call this func upon loading a genome
61 onLoadGenome func(CompactGenome)
66 func (tilelib *tileLibrary) loadTagSet(newtagset [][]byte) error {
67 // Loading a tagset means either passing it through to the
68 // output (if it's the first one we've seen), or just ensuring
69 // it doesn't disagree with what we already have.
70 if len(newtagset) == 0 {
74 defer tilelib.mtx.Unlock()
75 if tilelib.taglib == nil || tilelib.taglib.Len() == 0 {
76 tilelib.taglib = &tagLibrary{}
77 err := tilelib.taglib.setTags(newtagset)
81 if tilelib.encoder != nil {
82 err = tilelib.encoder.Encode(LibraryEntry{
89 } else if tilelib.taglib.Len() != len(newtagset) {
90 return fmt.Errorf("cannot merge libraries with differing tagsets")
92 current := tilelib.taglib.Tags()
93 for i := range newtagset {
94 if !bytes.Equal(newtagset[i], current[i]) {
95 return fmt.Errorf("cannot merge libraries with differing tagsets")
102 func (tilelib *tileLibrary) loadTileVariants(tvs []TileVariant, variantmap map[tileLibRef]tileVariantID) error {
103 for _, tv := range tvs {
104 // Assign a new variant ID (unique across all inputs)
105 // for each input variant.
106 variantmap[tileLibRef{tag: tv.Tag, variant: tv.Variant}] = tilelib.getRef(tv.Tag, tv.Sequence).variant
111 func (tilelib *tileLibrary) loadGenomes(genomes map[string][]tileVariantID, variantmap map[tileLibRef]tileVariantID, onLoadGenome func(CompactGenome)) error {
112 var wg sync.WaitGroup
113 errs := make(chan error, 1)
114 for name, variants := range genomes {
115 name, variants := name, variants
119 for i, variant := range variants {
127 newvariant, ok := variantmap[tileLibRef{tag: tag, variant: variant}]
129 err := fmt.Errorf("oops: genome %q has variant %d for tag %d, but that variant was not in its library", name, variant, tag)
136 variants[i] = newvariant
138 if tilelib.encoder != nil {
139 for name, variants := range genomes {
144 if onLoadGenome != nil {
147 err := tilelib.encoder.Encode(LibraryEntry{
148 CompactGenomes: []CompactGenome{cg},
166 func (tilelib *tileLibrary) LoadGob(ctx context.Context, rdr io.Reader, onLoadGenome func(CompactGenome)) error {
167 genomes := map[string][]tileVariantID{}
168 variantmap := map[tileLibRef]tileVariantID{}
169 err := DecodeLibrary(rdr, func(ent *LibraryEntry) error {
170 if ctx.Err() != nil {
173 if err := tilelib.loadTagSet(ent.TagSet); err != nil {
175 } else if err = tilelib.loadTileVariants(ent.TileVariants, variantmap); err != nil {
178 for _, cg := range ent.CompactGenomes {
179 genomes[cg.Name] = cg.Variants
187 if ctx.Err() != nil {
190 err = tilelib.loadGenomes(genomes, variantmap, onLoadGenome)
197 func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader) (tileSeq, error) {
203 todo := make(chan jobT)
204 scanner := bufio.NewScanner(rdr)
210 buf := scanner.Bytes()
211 if len(buf) > 0 && buf[0] == '>' {
212 todo <- jobT{seqlabel, fasta}
213 seqlabel, fasta = string(buf[1:]), nil
214 log.Debugf("%s %s reading fasta", filelabel, seqlabel)
216 fasta = append(fasta, bytes.ToLower(buf)...)
219 todo <- jobT{seqlabel, fasta}
221 type foundtag struct {
226 found := make([]foundtag, 2000000)
227 path := make([]tileLibRef, 2000000)
230 skippedSequences := 0
231 for job := range todo {
232 if len(job.fasta) == 0 {
234 } else if strings.Contains(job.label, "_") {
238 log.Debugf("%s %s tiling", filelabel, job.label)
241 tilelib.taglib.FindAll(job.fasta, func(tagid tagID, pos, taglen int) {
242 found = append(found, foundtag{pos: pos, tagid: tagid, taglen: taglen})
244 totalFoundTags += len(found)
248 last := foundtag{tagid: -1}
250 keep := longestIncreasingSubsequence(len(found), func(i int) int { return int(found[i].tagid) })
251 for i, x := range keep {
254 skipped = len(found) - len(keep)
255 found = found[:len(keep)]
257 for i, f := range found {
258 log.Tracef("%s %s found[%d] == %#v", filelabel, job.label, i, f)
260 path = append(path, tilelib.getRef(last.tagid, job.fasta[last.pos:f.pos+f.taglen]))
265 path = append(path, tilelib.getRef(last.tagid, job.fasta[last.pos:]))
268 pathcopy := make([]tileLibRef, len(path))
270 ret[job.label] = pathcopy
271 log.Debugf("%s %s tiled with path len %d, skipped %d", filelabel, job.label, len(path), skipped)
272 totalPathLen += len(path)
274 log.Printf("%s tiled with total path len %d in %d sequences (skipped %d sequences with '_' in name, skipped %d out-of-order tags)", filelabel, totalPathLen, len(ret), skippedSequences, totalFoundTags-totalPathLen)
275 return ret, scanner.Err()
278 func (tilelib *tileLibrary) Len() int {
280 defer tilelib.mtx.Unlock()
281 return tilelib.variants
284 // Return a tileLibRef for a tile with the given tag and sequence,
285 // adding the sequence to the library if needed.
286 func (tilelib *tileLibrary) getRef(tag tagID, seq []byte) tileLibRef {
287 if !tilelib.includeNoCalls {
288 for _, b := range seq {
289 if b != 'a' && b != 'c' && b != 'g' && b != 't' {
290 // return "tile not found" if seq has any
292 return tileLibRef{tag: tag}
297 // if tilelib.seq == nil {
298 // tilelib.seq = map[[blake2b.Size]byte][]byte{}
300 if tilelib.variant == nil && tilelib.taglib != nil {
301 tilelib.variant = make([][][blake2b.Size256]byte, tilelib.taglib.Len())
303 if int(tag) >= len(tilelib.variant) {
304 // If we haven't seen the tag library yet (as in a
305 // merge), tilelib.taglib.Len() is zero. We can still
306 // behave correctly, we just need to expand the
307 // tilelib.variant slice as needed.
308 if int(tag) >= cap(tilelib.variant) {
309 // Allocate 2x capacity.
310 newslice := make([][][blake2b.Size256]byte, int(tag)+1, (int(tag)+1)*2)
311 copy(newslice, tilelib.variant)
312 tilelib.variant = newslice[:int(tag)+1]
314 // Use previously allocated capacity, avoiding
316 tilelib.variant = tilelib.variant[:int(tag)+1]
319 seqhash := blake2b.Sum256(seq)
320 for i, varhash := range tilelib.variant[tag] {
321 if varhash == seqhash {
323 return tileLibRef{tag: tag, variant: tileVariantID(i + 1)}
327 tilelib.variant[tag] = append(tilelib.variant[tag], seqhash)
328 // tilelib.seq[seqhash] = append([]byte(nil), seq...)
329 variant := tileVariantID(len(tilelib.variant[tag]))
332 if tilelib.encoder != nil {
333 tilelib.encoder.Encode(LibraryEntry{
334 TileVariants: []TileVariant{{
342 return tileLibRef{tag: tag, variant: variant}