Write annotations along with numpy.
[lightning.git] / tilelib.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "context"
7         "encoding/gob"
8         "fmt"
9         "io"
10         "strings"
11         "sync"
12
13         log "github.com/sirupsen/logrus"
14         "golang.org/x/crypto/blake2b"
15 )
16
17 type tileVariantID uint16 // 1-based
18
19 type tileLibRef struct {
20         Tag     tagID
21         Variant tileVariantID
22 }
23
24 type tileSeq map[string][]tileLibRef
25
26 func (tseq tileSeq) Variants() ([]tileVariantID, int, int) {
27         maxtag := 0
28         for _, refs := range tseq {
29                 for _, ref := range refs {
30                         if maxtag < int(ref.Tag) {
31                                 maxtag = int(ref.Tag)
32                         }
33                 }
34         }
35         vars := make([]tileVariantID, maxtag+1)
36         var kept, dropped int
37         for _, refs := range tseq {
38                 for _, ref := range refs {
39                         if vars[int(ref.Tag)] != 0 {
40                                 dropped++
41                         } else {
42                                 kept++
43                         }
44                         vars[int(ref.Tag)] = ref.Variant
45                 }
46         }
47         return vars, kept, dropped
48 }
49
50 type tileLibrary struct {
51         includeNoCalls      bool
52         skipOOO             bool
53         retainTileSequences bool
54
55         taglib         *tagLibrary
56         variant        [][][blake2b.Size256]byte
57         refseqs        map[string]map[string][]tileLibRef
58         compactGenomes map[string][]tileVariantID
59         // count [][]int
60         seq      map[[blake2b.Size256]byte][]byte
61         variants int
62         // if non-nil, write out any tile variants added while tiling
63         encoder *gob.Encoder
64
65         mtx sync.Mutex
66 }
67
68 func (tilelib *tileLibrary) loadTagSet(newtagset [][]byte) error {
69         // Loading a tagset means either passing it through to the
70         // output (if it's the first one we've seen), or just ensuring
71         // it doesn't disagree with what we already have.
72         if len(newtagset) == 0 {
73                 return nil
74         }
75         tilelib.mtx.Lock()
76         defer tilelib.mtx.Unlock()
77         if tilelib.taglib == nil || tilelib.taglib.Len() == 0 {
78                 tilelib.taglib = &tagLibrary{}
79                 err := tilelib.taglib.setTags(newtagset)
80                 if err != nil {
81                         return err
82                 }
83                 if tilelib.encoder != nil {
84                         err = tilelib.encoder.Encode(LibraryEntry{
85                                 TagSet: newtagset,
86                         })
87                         if err != nil {
88                                 return err
89                         }
90                 }
91         } else if tilelib.taglib.Len() != len(newtagset) {
92                 return fmt.Errorf("cannot merge libraries with differing tagsets")
93         } else {
94                 current := tilelib.taglib.Tags()
95                 for i := range newtagset {
96                         if !bytes.Equal(newtagset[i], current[i]) {
97                                 return fmt.Errorf("cannot merge libraries with differing tagsets")
98                         }
99                 }
100         }
101         return nil
102 }
103
104 func (tilelib *tileLibrary) loadTileVariants(tvs []TileVariant, variantmap map[tileLibRef]tileVariantID) error {
105         for _, tv := range tvs {
106                 // Assign a new variant ID (unique across all inputs)
107                 // for each input variant.
108                 variantmap[tileLibRef{Tag: tv.Tag, Variant: tv.Variant}] = tilelib.getRef(tv.Tag, tv.Sequence).Variant
109         }
110         return nil
111 }
112
113 func (tilelib *tileLibrary) loadCompactGenomes(cgs []CompactGenome, variantmap map[tileLibRef]tileVariantID, onLoadGenome func(CompactGenome)) error {
114         log.Debugf("loadCompactGenomes: %d", len(cgs))
115         var wg sync.WaitGroup
116         errs := make(chan error, 1)
117         for _, cg := range cgs {
118                 wg.Add(1)
119                 cg := cg
120                 go func() {
121                         defer wg.Done()
122                         for i, variant := range cg.Variants {
123                                 if len(errs) > 0 {
124                                         return
125                                 }
126                                 if variant == 0 {
127                                         continue
128                                 }
129                                 tag := tagID(i / 2)
130                                 newvariant, ok := variantmap[tileLibRef{Tag: tag, Variant: variant}]
131                                 if !ok {
132                                         err := fmt.Errorf("oops: genome %q has variant %d for tag %d, but that variant was not in its library", cg.Name, variant, tag)
133                                         select {
134                                         case errs <- err:
135                                         default:
136                                         }
137                                         return
138                                 }
139                                 log.Tracef("loadCompactGenomes: cg %s tag %d variant %d => %d", cg.Name, tag, variant, newvariant)
140                                 cg.Variants[i] = newvariant
141                         }
142                         if onLoadGenome != nil {
143                                 onLoadGenome(cg)
144                         }
145                         if tilelib.encoder != nil {
146                                 err := tilelib.encoder.Encode(LibraryEntry{
147                                         CompactGenomes: []CompactGenome{cg},
148                                 })
149                                 if err != nil {
150                                         select {
151                                         case errs <- err:
152                                         default:
153                                         }
154                                         return
155                                 }
156                         }
157                         if tilelib.compactGenomes != nil {
158                                 tilelib.mtx.Lock()
159                                 defer tilelib.mtx.Unlock()
160                                 tilelib.compactGenomes[cg.Name] = cg.Variants
161                         }
162                 }()
163         }
164         wg.Wait()
165         go close(errs)
166         return <-errs
167 }
168
169 func (tilelib *tileLibrary) loadCompactSequences(cseqs []CompactSequence, variantmap map[tileLibRef]tileVariantID) error {
170         log.Debugf("loadCompactSequences: %d", len(cseqs))
171         for _, cseq := range cseqs {
172                 for _, tseq := range cseq.TileSequences {
173                         for i, libref := range tseq {
174                                 if libref.Variant == 0 {
175                                         // No variant (e.g., import
176                                         // dropped tiles with
177                                         // no-calls) = no translation.
178                                         continue
179                                 }
180                                 v, ok := variantmap[libref]
181                                 if !ok {
182                                         return fmt.Errorf("oops: CompactSequence %q has variant %d for tag %d, but that variant was not in its library", cseq.Name, libref.Variant, libref.Tag)
183                                 }
184                                 tseq[i].Variant = v
185                         }
186                 }
187                 if tilelib.encoder != nil {
188                         if err := tilelib.encoder.Encode(LibraryEntry{
189                                 CompactSequences: []CompactSequence{cseq},
190                         }); err != nil {
191                                 return err
192                         }
193                 }
194         }
195         tilelib.mtx.Lock()
196         defer tilelib.mtx.Unlock()
197         if tilelib.refseqs == nil {
198                 tilelib.refseqs = map[string]map[string][]tileLibRef{}
199         }
200         for _, cseq := range cseqs {
201                 tilelib.refseqs[cseq.Name] = cseq.TileSequences
202         }
203         return nil
204 }
205
206 // Load library data from rdr. Tile variants might be renumbered in
207 // the process; in that case, genomes variants will be renumbered to
208 // match.
209 //
210 // If onLoadGenome is non-nil, call it on each CompactGenome entry.
211 func (tilelib *tileLibrary) LoadGob(ctx context.Context, rdr io.Reader, onLoadGenome func(CompactGenome)) error {
212         cgs := []CompactGenome{}
213         cseqs := []CompactSequence{}
214         variantmap := map[tileLibRef]tileVariantID{}
215         err := DecodeLibrary(rdr, func(ent *LibraryEntry) error {
216                 if ctx.Err() != nil {
217                         return ctx.Err()
218                 }
219                 if err := tilelib.loadTagSet(ent.TagSet); err != nil {
220                         return err
221                 }
222                 if err := tilelib.loadTileVariants(ent.TileVariants, variantmap); err != nil {
223                         return err
224                 }
225                 cgs = append(cgs, ent.CompactGenomes...)
226                 cseqs = append(cseqs, ent.CompactSequences...)
227                 return nil
228         })
229         if err != nil {
230                 return err
231         }
232         if ctx.Err() != nil {
233                 return ctx.Err()
234         }
235         err = tilelib.loadCompactGenomes(cgs, variantmap, onLoadGenome)
236         if err != nil {
237                 return err
238         }
239         err = tilelib.loadCompactSequences(cseqs, variantmap)
240         if err != nil {
241                 return err
242         }
243         return nil
244 }
245
246 type importStats struct {
247         InputFile              string
248         InputLabel             string
249         InputLength            int
250         InputCoverage          int
251         TileCoverage           int
252         PathLength             int
253         DroppedOutOfOrderTiles int
254 }
255
256 func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader) (tileSeq, []importStats, error) {
257         ret := tileSeq{}
258         type jobT struct {
259                 label string
260                 fasta []byte
261         }
262         todo := make(chan jobT)
263         scanner := bufio.NewScanner(rdr)
264         go func() {
265                 defer close(todo)
266                 var fasta []byte
267                 var seqlabel string
268                 for scanner.Scan() {
269                         buf := scanner.Bytes()
270                         if len(buf) > 0 && buf[0] == '>' {
271                                 todo <- jobT{seqlabel, fasta}
272                                 seqlabel, fasta = strings.SplitN(string(buf[1:]), " ", 2)[0], nil
273                                 log.Debugf("%s %s reading fasta", filelabel, seqlabel)
274                         } else {
275                                 fasta = append(fasta, bytes.ToLower(buf)...)
276                         }
277                 }
278                 todo <- jobT{seqlabel, fasta}
279         }()
280         type foundtag struct {
281                 pos    int
282                 tagid  tagID
283                 taglen int
284         }
285         found := make([]foundtag, 2000000)
286         path := make([]tileLibRef, 2000000)
287         totalFoundTags := 0
288         totalPathLen := 0
289         skippedSequences := 0
290         stats := make([]importStats, 0, len(todo))
291         for job := range todo {
292                 if len(job.fasta) == 0 {
293                         continue
294                 } else if strings.Contains(job.label, "_") {
295                         skippedSequences++
296                         continue
297                 }
298                 log.Debugf("%s %s tiling", filelabel, job.label)
299
300                 found = found[:0]
301                 tilelib.taglib.FindAll(job.fasta, func(tagid tagID, pos, taglen int) {
302                         found = append(found, foundtag{pos: pos, tagid: tagid, taglen: taglen})
303                 })
304                 totalFoundTags += len(found)
305
306                 basesOut := 0
307                 skipped := 0
308                 path = path[:0]
309                 last := foundtag{tagid: -1}
310                 if tilelib.skipOOO {
311                         keep := longestIncreasingSubsequence(len(found), func(i int) int { return int(found[i].tagid) })
312                         for i, x := range keep {
313                                 found[i] = found[x]
314                         }
315                         skipped = len(found) - len(keep)
316                         found = found[:len(keep)]
317                 }
318                 for i, f := range found {
319                         log.Tracef("%s %s found[%d] == %#v", filelabel, job.label, i, f)
320                         if last.tagid < 0 {
321                                 // first tag in sequence
322                                 last = foundtag{tagid: f.tagid}
323                                 continue
324                         }
325                         libref := tilelib.getRef(last.tagid, job.fasta[last.pos:f.pos+f.taglen])
326                         path = append(path, libref)
327                         if libref.Variant > 0 {
328                                 // Count output coverage from
329                                 // the end of the previous tag
330                                 // (if any) to the end of the
331                                 // current tag, IOW don't
332                                 // double-count coverage for
333                                 // the previous tag.
334                                 basesOut += countBases(job.fasta[last.pos+last.taglen : f.pos+f.taglen])
335                         } else {
336                                 // If we dropped this tile
337                                 // (because !includeNoCalls),
338                                 // set taglen=0 so the
339                                 // overlapping tag is counted
340                                 // toward coverage on the
341                                 // following tile.
342                                 f.taglen = 0
343                         }
344                         last = f
345                 }
346                 if last.tagid < 0 {
347                         log.Warnf("%s %s no tags found", filelabel, job.label)
348                 } else {
349                         libref := tilelib.getRef(last.tagid, job.fasta[last.pos:])
350                         path = append(path, libref)
351                         if libref.Variant > 0 {
352                                 basesOut += countBases(job.fasta[last.pos+last.taglen:])
353                         }
354                 }
355
356                 pathcopy := make([]tileLibRef, len(path))
357                 copy(pathcopy, path)
358                 ret[job.label] = pathcopy
359
360                 basesIn := countBases(job.fasta)
361                 log.Infof("%s %s fasta in %d coverage in %d coverage out %d path len %d skipped %d", filelabel, job.label, len(job.fasta), basesIn, basesOut, len(path), skipped)
362                 stats = append(stats, importStats{
363                         InputFile:              filelabel,
364                         InputLabel:             job.label,
365                         InputLength:            len(job.fasta),
366                         InputCoverage:          basesIn,
367                         TileCoverage:           basesOut,
368                         PathLength:             len(path),
369                         DroppedOutOfOrderTiles: skipped,
370                 })
371
372                 totalPathLen += len(path)
373         }
374         log.Printf("%s tiled with total path len %d in %d sequences (skipped %d sequences with '_' in name, skipped %d out-of-order tags)", filelabel, totalPathLen, len(ret), skippedSequences, totalFoundTags-totalPathLen)
375         return ret, stats, scanner.Err()
376 }
377
378 func (tilelib *tileLibrary) Len() int {
379         tilelib.mtx.Lock()
380         defer tilelib.mtx.Unlock()
381         return tilelib.variants
382 }
383
384 // Return a tileLibRef for a tile with the given tag and sequence,
385 // adding the sequence to the library if needed.
386 func (tilelib *tileLibrary) getRef(tag tagID, seq []byte) tileLibRef {
387         if !tilelib.includeNoCalls {
388                 for _, b := range seq {
389                         if b != 'a' && b != 'c' && b != 'g' && b != 't' {
390                                 // return "tile not found" if seq has any
391                                 // no-calls
392                                 return tileLibRef{Tag: tag}
393                         }
394                 }
395         }
396         tilelib.mtx.Lock()
397         if tilelib.variant == nil && tilelib.taglib != nil {
398                 tilelib.variant = make([][][blake2b.Size256]byte, tilelib.taglib.Len())
399         }
400         if int(tag) >= len(tilelib.variant) {
401                 // If we haven't seen the tag library yet (as in a
402                 // merge), tilelib.taglib.Len() is zero. We can still
403                 // behave correctly, we just need to expand the
404                 // tilelib.variant slice as needed.
405                 if int(tag) >= cap(tilelib.variant) {
406                         // Allocate 2x capacity.
407                         newslice := make([][][blake2b.Size256]byte, int(tag)+1, (int(tag)+1)*2)
408                         copy(newslice, tilelib.variant)
409                         tilelib.variant = newslice[:int(tag)+1]
410                 } else {
411                         // Use previously allocated capacity, avoiding
412                         // copy.
413                         tilelib.variant = tilelib.variant[:int(tag)+1]
414                 }
415         }
416         seqhash := blake2b.Sum256(seq)
417         for i, varhash := range tilelib.variant[tag] {
418                 if varhash == seqhash {
419                         tilelib.mtx.Unlock()
420                         return tileLibRef{Tag: tag, Variant: tileVariantID(i + 1)}
421                 }
422         }
423         tilelib.variants++
424         tilelib.variant[tag] = append(tilelib.variant[tag], seqhash)
425         if tilelib.retainTileSequences {
426                 if tilelib.seq == nil {
427                         tilelib.seq = map[[blake2b.Size256]byte][]byte{}
428                 }
429                 tilelib.seq[seqhash] = append([]byte(nil), seq...)
430         }
431         variant := tileVariantID(len(tilelib.variant[tag]))
432         tilelib.mtx.Unlock()
433
434         if tilelib.encoder != nil {
435                 tilelib.encoder.Encode(LibraryEntry{
436                         TileVariants: []TileVariant{{
437                                 Tag:      tag,
438                                 Variant:  variant,
439                                 Blake2b:  seqhash,
440                                 Sequence: seq,
441                         }},
442                 })
443         }
444         return tileLibRef{Tag: tag, Variant: variant}
445 }
446
447 func (tilelib *tileLibrary) TileVariantSequence(libref tileLibRef) []byte {
448         if libref.Variant == 0 || len(tilelib.variant) <= int(libref.Tag) || len(tilelib.variant[libref.Tag]) < int(libref.Variant) {
449                 return nil
450         }
451         return tilelib.seq[tilelib.variant[libref.Tag][libref.Variant-1]]
452 }
453
454 func countBases(seq []byte) int {
455         n := 0
456         for _, c := range seq {
457                 if isbase[c] {
458                         n++
459                 }
460         }
461         return n
462 }