Gzip gob files.
[lightning.git] / tilelib.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "context"
7         "encoding/gob"
8         "fmt"
9         "io"
10         "runtime"
11         "sort"
12         "strings"
13         "sync"
14
15         log "github.com/sirupsen/logrus"
16         "golang.org/x/crypto/blake2b"
17 )
18
19 type tileVariantID uint16 // 1-based
20
21 type tileLibRef struct {
22         Tag     tagID
23         Variant tileVariantID
24 }
25
26 type tileSeq map[string][]tileLibRef
27
28 func (tseq tileSeq) Variants() ([]tileVariantID, int, int) {
29         maxtag := 0
30         for _, refs := range tseq {
31                 for _, ref := range refs {
32                         if maxtag < int(ref.Tag) {
33                                 maxtag = int(ref.Tag)
34                         }
35                 }
36         }
37         vars := make([]tileVariantID, maxtag+1)
38         var kept, dropped int
39         for _, refs := range tseq {
40                 for _, ref := range refs {
41                         if vars[int(ref.Tag)] != 0 {
42                                 dropped++
43                         } else {
44                                 kept++
45                         }
46                         vars[int(ref.Tag)] = ref.Variant
47                 }
48         }
49         return vars, kept, dropped
50 }
51
52 type tileLibrary struct {
53         retainNoCalls       bool
54         skipOOO             bool
55         retainTileSequences bool
56
57         taglib         *tagLibrary
58         variant        [][][blake2b.Size256]byte
59         refseqs        map[string]map[string][]tileLibRef
60         compactGenomes map[string][]tileVariantID
61         // count [][]int
62         seq      map[[blake2b.Size256]byte][]byte
63         variants int
64         // if non-nil, write out any tile variants added while tiling
65         encoder *gob.Encoder
66
67         mtx sync.Mutex
68 }
69
70 func (tilelib *tileLibrary) loadTagSet(newtagset [][]byte) error {
71         // Loading a tagset means either passing it through to the
72         // output (if it's the first one we've seen), or just ensuring
73         // it doesn't disagree with what we already have.
74         if len(newtagset) == 0 {
75                 return nil
76         }
77         tilelib.mtx.Lock()
78         defer tilelib.mtx.Unlock()
79         if tilelib.taglib == nil || tilelib.taglib.Len() == 0 {
80                 tilelib.taglib = &tagLibrary{}
81                 err := tilelib.taglib.setTags(newtagset)
82                 if err != nil {
83                         return err
84                 }
85                 if tilelib.encoder != nil {
86                         err = tilelib.encoder.Encode(LibraryEntry{
87                                 TagSet: newtagset,
88                         })
89                         if err != nil {
90                                 return err
91                         }
92                 }
93         } else if tilelib.taglib.Len() != len(newtagset) {
94                 return fmt.Errorf("cannot merge libraries with differing tagsets")
95         } else {
96                 current := tilelib.taglib.Tags()
97                 for i := range newtagset {
98                         if !bytes.Equal(newtagset[i], current[i]) {
99                                 return fmt.Errorf("cannot merge libraries with differing tagsets")
100                         }
101                 }
102         }
103         return nil
104 }
105
106 func (tilelib *tileLibrary) loadTileVariants(tvs []TileVariant, variantmap map[tileLibRef]tileVariantID) error {
107         for _, tv := range tvs {
108                 // Assign a new variant ID (unique across all inputs)
109                 // for each input variant.
110                 variantmap[tileLibRef{Tag: tv.Tag, Variant: tv.Variant}] = tilelib.getRef(tv.Tag, tv.Sequence).Variant
111         }
112         return nil
113 }
114
115 func (tilelib *tileLibrary) loadCompactGenomes(cgs []CompactGenome, variantmap map[tileLibRef]tileVariantID, onLoadGenome func(CompactGenome)) error {
116         log.Debugf("loadCompactGenomes: %d", len(cgs))
117         var wg sync.WaitGroup
118         errs := make(chan error, 1)
119         for _, cg := range cgs {
120                 wg.Add(1)
121                 cg := cg
122                 go func() {
123                         defer wg.Done()
124                         for i, variant := range cg.Variants {
125                                 if len(errs) > 0 {
126                                         return
127                                 }
128                                 if variant == 0 {
129                                         continue
130                                 }
131                                 tag := tagID(i / 2)
132                                 newvariant, ok := variantmap[tileLibRef{Tag: tag, Variant: variant}]
133                                 if !ok {
134                                         err := fmt.Errorf("oops: genome %q has variant %d for tag %d, but that variant was not in its library", cg.Name, variant, tag)
135                                         select {
136                                         case errs <- err:
137                                         default:
138                                         }
139                                         return
140                                 }
141                                 log.Tracef("loadCompactGenomes: cg %s tag %d variant %d => %d", cg.Name, tag, variant, newvariant)
142                                 cg.Variants[i] = newvariant
143                         }
144                         if onLoadGenome != nil {
145                                 onLoadGenome(cg)
146                         }
147                         if tilelib.encoder != nil {
148                                 err := tilelib.encoder.Encode(LibraryEntry{
149                                         CompactGenomes: []CompactGenome{cg},
150                                 })
151                                 if err != nil {
152                                         select {
153                                         case errs <- err:
154                                         default:
155                                         }
156                                         return
157                                 }
158                         }
159                         if tilelib.compactGenomes != nil {
160                                 tilelib.mtx.Lock()
161                                 defer tilelib.mtx.Unlock()
162                                 tilelib.compactGenomes[cg.Name] = cg.Variants
163                         }
164                 }()
165         }
166         wg.Wait()
167         go close(errs)
168         return <-errs
169 }
170
171 func (tilelib *tileLibrary) loadCompactSequences(cseqs []CompactSequence, variantmap map[tileLibRef]tileVariantID) error {
172         log.Debugf("loadCompactSequences: %d", len(cseqs))
173         for _, cseq := range cseqs {
174                 for _, tseq := range cseq.TileSequences {
175                         for i, libref := range tseq {
176                                 if libref.Variant == 0 {
177                                         // No variant (e.g., import
178                                         // dropped tiles with
179                                         // no-calls) = no translation.
180                                         continue
181                                 }
182                                 v, ok := variantmap[libref]
183                                 if !ok {
184                                         return fmt.Errorf("oops: CompactSequence %q has variant %d for tag %d, but that variant was not in its library", cseq.Name, libref.Variant, libref.Tag)
185                                 }
186                                 tseq[i].Variant = v
187                         }
188                 }
189                 if tilelib.encoder != nil {
190                         if err := tilelib.encoder.Encode(LibraryEntry{
191                                 CompactSequences: []CompactSequence{cseq},
192                         }); err != nil {
193                                 return err
194                         }
195                 }
196         }
197         tilelib.mtx.Lock()
198         defer tilelib.mtx.Unlock()
199         if tilelib.refseqs == nil {
200                 tilelib.refseqs = map[string]map[string][]tileLibRef{}
201         }
202         for _, cseq := range cseqs {
203                 tilelib.refseqs[cseq.Name] = cseq.TileSequences
204         }
205         return nil
206 }
207
208 // Load library data from rdr. Tile variants might be renumbered in
209 // the process; in that case, genomes variants will be renumbered to
210 // match.
211 //
212 // If onLoadGenome is non-nil, call it on each CompactGenome entry.
213 func (tilelib *tileLibrary) LoadGob(ctx context.Context, rdr io.Reader, gz bool, onLoadGenome func(CompactGenome)) error {
214         cgs := []CompactGenome{}
215         cseqs := []CompactSequence{}
216         variantmap := map[tileLibRef]tileVariantID{}
217         err := DecodeLibrary(rdr, gz, func(ent *LibraryEntry) error {
218                 if ctx.Err() != nil {
219                         return ctx.Err()
220                 }
221                 if err := tilelib.loadTagSet(ent.TagSet); err != nil {
222                         return err
223                 }
224                 if err := tilelib.loadTileVariants(ent.TileVariants, variantmap); err != nil {
225                         return err
226                 }
227                 cgs = append(cgs, ent.CompactGenomes...)
228                 cseqs = append(cseqs, ent.CompactSequences...)
229                 return nil
230         })
231         if err != nil {
232                 return err
233         }
234         if ctx.Err() != nil {
235                 return ctx.Err()
236         }
237         err = tilelib.loadCompactGenomes(cgs, variantmap, onLoadGenome)
238         if err != nil {
239                 return err
240         }
241         err = tilelib.loadCompactSequences(cseqs, variantmap)
242         if err != nil {
243                 return err
244         }
245         return nil
246 }
247
248 type importStats struct {
249         InputFile              string
250         InputLabel             string
251         InputLength            int
252         InputCoverage          int
253         TileCoverage           int
254         PathLength             int
255         DroppedOutOfOrderTiles int
256 }
257
258 func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader) (tileSeq, []importStats, error) {
259         ret := tileSeq{}
260         type jobT struct {
261                 label string
262                 fasta []byte
263         }
264         todo := make(chan jobT)
265         scanner := bufio.NewScanner(rdr)
266         go func() {
267                 defer close(todo)
268                 var fasta []byte
269                 var seqlabel string
270                 for scanner.Scan() {
271                         buf := scanner.Bytes()
272                         if len(buf) > 0 && buf[0] == '>' {
273                                 todo <- jobT{seqlabel, fasta}
274                                 seqlabel, fasta = strings.SplitN(string(buf[1:]), " ", 2)[0], nil
275                                 log.Debugf("%s %s reading fasta", filelabel, seqlabel)
276                         } else {
277                                 fasta = append(fasta, bytes.ToLower(buf)...)
278                         }
279                 }
280                 todo <- jobT{seqlabel, fasta}
281         }()
282         type foundtag struct {
283                 pos    int
284                 tagid  tagID
285                 taglen int
286         }
287         found := make([]foundtag, 2000000)
288         path := make([]tileLibRef, 2000000)
289         totalFoundTags := 0
290         totalPathLen := 0
291         skippedSequences := 0
292         stats := make([]importStats, 0, len(todo))
293         for job := range todo {
294                 if len(job.fasta) == 0 {
295                         continue
296                 } else if strings.Contains(job.label, "_") {
297                         skippedSequences++
298                         continue
299                 }
300                 log.Debugf("%s %s tiling", filelabel, job.label)
301
302                 found = found[:0]
303                 tilelib.taglib.FindAll(job.fasta, func(tagid tagID, pos, taglen int) {
304                         found = append(found, foundtag{pos: pos, tagid: tagid, taglen: taglen})
305                 })
306                 totalFoundTags += len(found)
307
308                 basesOut := 0
309                 skipped := 0
310                 path = path[:0]
311                 last := foundtag{tagid: -1}
312                 if tilelib.skipOOO {
313                         keep := longestIncreasingSubsequence(len(found), func(i int) int { return int(found[i].tagid) })
314                         for i, x := range keep {
315                                 found[i] = found[x]
316                         }
317                         skipped = len(found) - len(keep)
318                         found = found[:len(keep)]
319                 }
320                 for i, f := range found {
321                         log.Tracef("%s %s found[%d] == %#v", filelabel, job.label, i, f)
322                         if last.tagid < 0 {
323                                 // first tag in sequence
324                                 last = foundtag{tagid: f.tagid}
325                                 continue
326                         }
327                         libref := tilelib.getRef(last.tagid, job.fasta[last.pos:f.pos+f.taglen])
328                         path = append(path, libref)
329                         if libref.Variant > 0 {
330                                 // Count output coverage from
331                                 // the end of the previous tag
332                                 // (if any) to the end of the
333                                 // current tag, IOW don't
334                                 // double-count coverage for
335                                 // the previous tag.
336                                 basesOut += countBases(job.fasta[last.pos+last.taglen : f.pos+f.taglen])
337                         } else {
338                                 // If we dropped this tile
339                                 // (because !retainNoCalls),
340                                 // set taglen=0 so the
341                                 // overlapping tag is counted
342                                 // toward coverage on the
343                                 // following tile.
344                                 f.taglen = 0
345                         }
346                         last = f
347                 }
348                 if last.tagid < 0 {
349                         log.Warnf("%s %s no tags found", filelabel, job.label)
350                 } else {
351                         libref := tilelib.getRef(last.tagid, job.fasta[last.pos:])
352                         path = append(path, libref)
353                         if libref.Variant > 0 {
354                                 basesOut += countBases(job.fasta[last.pos+last.taglen:])
355                         }
356                 }
357
358                 pathcopy := make([]tileLibRef, len(path))
359                 copy(pathcopy, path)
360                 ret[job.label] = pathcopy
361
362                 basesIn := countBases(job.fasta)
363                 log.Infof("%s %s fasta in %d coverage in %d coverage out %d path len %d skipped %d", filelabel, job.label, len(job.fasta), basesIn, basesOut, len(path), skipped)
364                 stats = append(stats, importStats{
365                         InputFile:              filelabel,
366                         InputLabel:             job.label,
367                         InputLength:            len(job.fasta),
368                         InputCoverage:          basesIn,
369                         TileCoverage:           basesOut,
370                         PathLength:             len(path),
371                         DroppedOutOfOrderTiles: skipped,
372                 })
373
374                 totalPathLen += len(path)
375         }
376         log.Printf("%s tiled with total path len %d in %d sequences (skipped %d sequences with '_' in name, skipped %d out-of-order tags)", filelabel, totalPathLen, len(ret), skippedSequences, totalFoundTags-totalPathLen)
377         return ret, stats, scanner.Err()
378 }
379
380 func (tilelib *tileLibrary) Len() int {
381         tilelib.mtx.Lock()
382         defer tilelib.mtx.Unlock()
383         return tilelib.variants
384 }
385
386 // Return a tileLibRef for a tile with the given tag and sequence,
387 // adding the sequence to the library if needed.
388 func (tilelib *tileLibrary) getRef(tag tagID, seq []byte) tileLibRef {
389         dropSeq := false
390         if !tilelib.retainNoCalls {
391                 for _, b := range seq {
392                         if b != 'a' && b != 'c' && b != 'g' && b != 't' {
393                                 dropSeq = true
394                                 break
395                         }
396                 }
397         }
398         tilelib.mtx.Lock()
399         if tilelib.variant == nil && tilelib.taglib != nil {
400                 tilelib.variant = make([][][blake2b.Size256]byte, tilelib.taglib.Len())
401         }
402         if int(tag) >= len(tilelib.variant) {
403                 // If we haven't seen the tag library yet (as in a
404                 // merge), tilelib.taglib.Len() is zero. We can still
405                 // behave correctly, we just need to expand the
406                 // tilelib.variant slice as needed.
407                 if int(tag) >= cap(tilelib.variant) {
408                         // Allocate 2x capacity.
409                         newslice := make([][][blake2b.Size256]byte, int(tag)+1, (int(tag)+1)*2)
410                         copy(newslice, tilelib.variant)
411                         tilelib.variant = newslice[:int(tag)+1]
412                 } else {
413                         // Use previously allocated capacity, avoiding
414                         // copy.
415                         tilelib.variant = tilelib.variant[:int(tag)+1]
416                 }
417         }
418         seqhash := blake2b.Sum256(seq)
419         for i, varhash := range tilelib.variant[tag] {
420                 if varhash == seqhash {
421                         tilelib.mtx.Unlock()
422                         return tileLibRef{Tag: tag, Variant: tileVariantID(i + 1)}
423                 }
424         }
425         tilelib.variants++
426         tilelib.variant[tag] = append(tilelib.variant[tag], seqhash)
427         if tilelib.retainTileSequences && !dropSeq {
428                 if tilelib.seq == nil {
429                         tilelib.seq = map[[blake2b.Size256]byte][]byte{}
430                 }
431                 tilelib.seq[seqhash] = append([]byte(nil), seq...)
432         }
433         variant := tileVariantID(len(tilelib.variant[tag]))
434         tilelib.mtx.Unlock()
435
436         if tilelib.encoder != nil {
437                 saveSeq := seq
438                 if dropSeq {
439                         // Save the hash, but not the sequence
440                         saveSeq = nil
441                 }
442                 tilelib.encoder.Encode(LibraryEntry{
443                         TileVariants: []TileVariant{{
444                                 Tag:      tag,
445                                 Variant:  variant,
446                                 Blake2b:  seqhash,
447                                 Sequence: saveSeq,
448                         }},
449                 })
450         }
451         return tileLibRef{Tag: tag, Variant: variant}
452 }
453
454 func (tilelib *tileLibrary) TileVariantSequence(libref tileLibRef) []byte {
455         if libref.Variant == 0 || len(tilelib.variant) <= int(libref.Tag) || len(tilelib.variant[libref.Tag]) < int(libref.Variant) {
456                 return nil
457         }
458         return tilelib.seq[tilelib.variant[libref.Tag][libref.Variant-1]]
459 }
460
461 // Tidy deletes unreferenced tile variants and renumbers variants so
462 // more common variants have smaller IDs.
463 func (tilelib *tileLibrary) Tidy() {
464         log.Print("Tidy: compute inref")
465         inref := map[tileLibRef]bool{}
466         for _, refseq := range tilelib.refseqs {
467                 for _, librefs := range refseq {
468                         for _, libref := range librefs {
469                                 inref[libref] = true
470                         }
471                 }
472         }
473         log.Print("Tidy: compute remap")
474         remap := make([][]tileVariantID, len(tilelib.variant))
475         throttle := throttle{Max: runtime.NumCPU() + 1}
476         for tag, oldvariants := range tilelib.variant {
477                 tag, oldvariants := tagID(tag), oldvariants
478                 if tag%10000 == 0 {
479                         log.Printf("Tidy: tag %d", tag)
480                 }
481                 throttle.Acquire()
482                 go func() {
483                         defer throttle.Release()
484                         uses := make([]int, len(oldvariants))
485                         for _, cg := range tilelib.compactGenomes {
486                                 for phase := 0; phase < 2; phase++ {
487                                         cgi := int(tag)*2 + phase
488                                         if cgi < len(cg) && cg[cgi] > 0 {
489                                                 uses[cg[cgi]-1]++
490                                         }
491                                 }
492                         }
493
494                         // Compute desired order of variants:
495                         // neworder[x] == index in oldvariants that
496                         // should move to position x.
497                         neworder := make([]int, len(oldvariants))
498                         for i := range neworder {
499                                 neworder[i] = i
500                         }
501                         sort.Slice(neworder, func(i, j int) bool {
502                                 if cmp := uses[neworder[i]] - uses[neworder[j]]; cmp != 0 {
503                                         return cmp > 0
504                                 } else {
505                                         return bytes.Compare(oldvariants[neworder[i]][:], oldvariants[neworder[j]][:]) < 0
506                                 }
507                         })
508
509                         // Replace tilelib.variants[tag] with a new
510                         // re-ordered slice of hashes, and make a
511                         // mapping from old to new variant IDs.
512                         remaptag := make([]tileVariantID, len(oldvariants)+1)
513                         newvariants := make([][blake2b.Size256]byte, 0, len(neworder))
514                         for _, oldi := range neworder {
515                                 if uses[oldi] > 0 || inref[tileLibRef{Tag: tag, Variant: tileVariantID(oldi + 1)}] {
516                                         newvariants = append(newvariants, oldvariants[oldi])
517                                         remaptag[oldi+1] = tileVariantID(len(newvariants))
518                                 }
519                         }
520                         tilelib.variant[tag] = newvariants
521                         remap[tag] = remaptag
522                 }()
523         }
524         throttle.Wait()
525
526         // Apply remap to genomes and reference sequences, so they
527         // refer to the same tile variants using the changed IDs.
528         log.Print("Tidy: apply remap")
529         for _, cg := range tilelib.compactGenomes {
530                 for idx, variant := range cg {
531                         cg[idx] = remap[tagID(idx/2)][variant]
532                 }
533         }
534         for _, refcs := range tilelib.refseqs {
535                 for _, refseq := range refcs {
536                         for i, tv := range refseq {
537                                 refseq[i].Variant = remap[tv.Tag][tv.Variant]
538                         }
539                 }
540         }
541         log.Print("Tidy: done")
542 }
543
544 func countBases(seq []byte) int {
545         n := 0
546         for _, c := range seq {
547                 if isbase[c] {
548                         n++
549                 }
550         }
551         return n
552 }