Write import stats to stats.json.
[lightning.git] / tilelib.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "context"
7         "encoding/gob"
8         "fmt"
9         "io"
10         "strings"
11         "sync"
12
13         log "github.com/sirupsen/logrus"
14         "golang.org/x/crypto/blake2b"
15 )
16
17 type tileVariantID uint16 // 1-based
18
19 type tileLibRef struct {
20         Tag     tagID
21         Variant tileVariantID
22 }
23
24 type tileSeq map[string][]tileLibRef
25
26 func (tseq tileSeq) Variants() ([]tileVariantID, int, int) {
27         maxtag := 0
28         for _, refs := range tseq {
29                 for _, ref := range refs {
30                         if maxtag < int(ref.Tag) {
31                                 maxtag = int(ref.Tag)
32                         }
33                 }
34         }
35         vars := make([]tileVariantID, maxtag+1)
36         var kept, dropped int
37         for _, refs := range tseq {
38                 for _, ref := range refs {
39                         if vars[int(ref.Tag)] != 0 {
40                                 dropped++
41                         } else {
42                                 kept++
43                         }
44                         vars[int(ref.Tag)] = ref.Variant
45                 }
46         }
47         return vars, kept, dropped
48 }
49
50 type tileLibrary struct {
51         includeNoCalls bool
52         skipOOO        bool
53         taglib         *tagLibrary
54         variant        [][][blake2b.Size256]byte
55         refseqs        map[string]map[string][]tileLibRef
56         // count [][]int
57         // seq map[[blake2b.Size]byte][]byte
58         variants int
59         // if non-nil, write out any tile variants added while tiling
60         encoder *gob.Encoder
61
62         mtx sync.Mutex
63 }
64
65 func (tilelib *tileLibrary) loadTagSet(newtagset [][]byte) error {
66         // Loading a tagset means either passing it through to the
67         // output (if it's the first one we've seen), or just ensuring
68         // it doesn't disagree with what we already have.
69         if len(newtagset) == 0 {
70                 return nil
71         }
72         tilelib.mtx.Lock()
73         defer tilelib.mtx.Unlock()
74         if tilelib.taglib == nil || tilelib.taglib.Len() == 0 {
75                 tilelib.taglib = &tagLibrary{}
76                 err := tilelib.taglib.setTags(newtagset)
77                 if err != nil {
78                         return err
79                 }
80                 if tilelib.encoder != nil {
81                         err = tilelib.encoder.Encode(LibraryEntry{
82                                 TagSet: newtagset,
83                         })
84                         if err != nil {
85                                 return err
86                         }
87                 }
88         } else if tilelib.taglib.Len() != len(newtagset) {
89                 return fmt.Errorf("cannot merge libraries with differing tagsets")
90         } else {
91                 current := tilelib.taglib.Tags()
92                 for i := range newtagset {
93                         if !bytes.Equal(newtagset[i], current[i]) {
94                                 return fmt.Errorf("cannot merge libraries with differing tagsets")
95                         }
96                 }
97         }
98         return nil
99 }
100
101 func (tilelib *tileLibrary) loadTileVariants(tvs []TileVariant, variantmap map[tileLibRef]tileVariantID) error {
102         for _, tv := range tvs {
103                 // Assign a new variant ID (unique across all inputs)
104                 // for each input variant.
105                 variantmap[tileLibRef{Tag: tv.Tag, Variant: tv.Variant}] = tilelib.getRef(tv.Tag, tv.Sequence).Variant
106         }
107         return nil
108 }
109
110 func (tilelib *tileLibrary) loadCompactGenomes(cgs []CompactGenome, variantmap map[tileLibRef]tileVariantID, onLoadGenome func(CompactGenome)) error {
111         log.Debugf("loadCompactGenomes: %d", len(cgs))
112         var wg sync.WaitGroup
113         errs := make(chan error, 1)
114         for _, cg := range cgs {
115                 wg.Add(1)
116                 cg := cg
117                 go func() {
118                         defer wg.Done()
119                         for i, variant := range cg.Variants {
120                                 if len(errs) > 0 {
121                                         return
122                                 }
123                                 if variant == 0 {
124                                         continue
125                                 }
126                                 tag := tagID(i / 2)
127                                 newvariant, ok := variantmap[tileLibRef{Tag: tag, Variant: variant}]
128                                 if !ok {
129                                         err := fmt.Errorf("oops: genome %q has variant %d for tag %d, but that variant was not in its library", cg.Name, variant, tag)
130                                         select {
131                                         case errs <- err:
132                                         default:
133                                         }
134                                         return
135                                 }
136                                 log.Tracef("loadCompactGenomes: cg %s tag %d variant %d => %d", cg.Name, tag, variant, newvariant)
137                                 cg.Variants[i] = newvariant
138                         }
139                         if onLoadGenome != nil {
140                                 onLoadGenome(cg)
141                         }
142                         if tilelib.encoder != nil {
143                                 err := tilelib.encoder.Encode(LibraryEntry{
144                                         CompactGenomes: []CompactGenome{cg},
145                                 })
146                                 if err != nil {
147                                         select {
148                                         case errs <- err:
149                                         default:
150                                         }
151                                         return
152                                 }
153                         }
154                 }()
155         }
156         wg.Wait()
157         go close(errs)
158         return <-errs
159 }
160
161 func (tilelib *tileLibrary) loadCompactSequences(cseqs []CompactSequence, variantmap map[tileLibRef]tileVariantID) error {
162         log.Debugf("loadCompactSequences: %d", len(cseqs))
163         for _, cseq := range cseqs {
164                 for _, tseq := range cseq.TileSequences {
165                         for i, libref := range tseq {
166                                 if libref.Variant == 0 {
167                                         // No variant (e.g., import
168                                         // dropped tiles with
169                                         // no-calls) = no translation.
170                                         continue
171                                 }
172                                 v, ok := variantmap[libref]
173                                 if !ok {
174                                         return fmt.Errorf("oops: CompactSequence %q has variant %d for tag %d, but that variant was not in its library", cseq.Name, libref.Variant, libref.Tag)
175                                 }
176                                 tseq[i].Variant = v
177                         }
178                 }
179                 if tilelib.encoder != nil {
180                         if err := tilelib.encoder.Encode(LibraryEntry{
181                                 CompactSequences: []CompactSequence{cseq},
182                         }); err != nil {
183                                 return err
184                         }
185                 }
186         }
187         tilelib.mtx.Lock()
188         defer tilelib.mtx.Unlock()
189         if tilelib.refseqs == nil {
190                 tilelib.refseqs = map[string]map[string][]tileLibRef{}
191         }
192         for _, cseq := range cseqs {
193                 tilelib.refseqs[cseq.Name] = cseq.TileSequences
194         }
195         return nil
196 }
197
198 // Load library data from rdr. Tile variants might be renumbered in
199 // the process; in that case, genomes variants will be renumbered to
200 // match.
201 //
202 // If onLoadGenome is non-nil, call it on each CompactGenome entry.
203 func (tilelib *tileLibrary) LoadGob(ctx context.Context, rdr io.Reader, onLoadGenome func(CompactGenome)) error {
204         cgs := []CompactGenome{}
205         cseqs := []CompactSequence{}
206         variantmap := map[tileLibRef]tileVariantID{}
207         err := DecodeLibrary(rdr, func(ent *LibraryEntry) error {
208                 if ctx.Err() != nil {
209                         return ctx.Err()
210                 }
211                 if err := tilelib.loadTagSet(ent.TagSet); err != nil {
212                         return err
213                 }
214                 if err := tilelib.loadTileVariants(ent.TileVariants, variantmap); err != nil {
215                         return err
216                 }
217                 cgs = append(cgs, ent.CompactGenomes...)
218                 cseqs = append(cseqs, ent.CompactSequences...)
219                 return nil
220         })
221         if err != nil {
222                 return err
223         }
224         if ctx.Err() != nil {
225                 return ctx.Err()
226         }
227         err = tilelib.loadCompactGenomes(cgs, variantmap, onLoadGenome)
228         if err != nil {
229                 return err
230         }
231         err = tilelib.loadCompactSequences(cseqs, variantmap)
232         if err != nil {
233                 return err
234         }
235         return nil
236 }
237
238 type importStats struct {
239         InputFile              string
240         InputLabel             string
241         InputLength            int
242         InputCoverage          int
243         TileCoverage           int
244         PathLength             int
245         DroppedOutOfOrderTiles int
246 }
247
248 func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader) (tileSeq, []importStats, error) {
249         ret := tileSeq{}
250         type jobT struct {
251                 label string
252                 fasta []byte
253         }
254         todo := make(chan jobT)
255         scanner := bufio.NewScanner(rdr)
256         go func() {
257                 defer close(todo)
258                 var fasta []byte
259                 var seqlabel string
260                 for scanner.Scan() {
261                         buf := scanner.Bytes()
262                         if len(buf) > 0 && buf[0] == '>' {
263                                 todo <- jobT{seqlabel, fasta}
264                                 seqlabel, fasta = strings.SplitN(string(buf[1:]), " ", 2)[0], nil
265                                 log.Debugf("%s %s reading fasta", filelabel, seqlabel)
266                         } else {
267                                 fasta = append(fasta, bytes.ToLower(buf)...)
268                         }
269                 }
270                 todo <- jobT{seqlabel, fasta}
271         }()
272         type foundtag struct {
273                 pos    int
274                 tagid  tagID
275                 taglen int
276         }
277         found := make([]foundtag, 2000000)
278         path := make([]tileLibRef, 2000000)
279         totalFoundTags := 0
280         totalPathLen := 0
281         skippedSequences := 0
282         stats := make([]importStats, 0, len(todo))
283         for job := range todo {
284                 if len(job.fasta) == 0 {
285                         continue
286                 } else if strings.Contains(job.label, "_") {
287                         skippedSequences++
288                         continue
289                 }
290                 log.Debugf("%s %s tiling", filelabel, job.label)
291
292                 found = found[:0]
293                 tilelib.taglib.FindAll(job.fasta, func(tagid tagID, pos, taglen int) {
294                         found = append(found, foundtag{pos: pos, tagid: tagid, taglen: taglen})
295                 })
296                 totalFoundTags += len(found)
297
298                 basesOut := 0
299                 skipped := 0
300                 path = path[:0]
301                 last := foundtag{tagid: -1}
302                 if tilelib.skipOOO {
303                         keep := longestIncreasingSubsequence(len(found), func(i int) int { return int(found[i].tagid) })
304                         for i, x := range keep {
305                                 found[i] = found[x]
306                         }
307                         skipped = len(found) - len(keep)
308                         found = found[:len(keep)]
309                 }
310                 for i, f := range found {
311                         log.Tracef("%s %s found[%d] == %#v", filelabel, job.label, i, f)
312                         if last.tagid < 0 {
313                                 // first tag in sequence
314                                 last = foundtag{tagid: f.tagid}
315                                 continue
316                         }
317                         libref := tilelib.getRef(last.tagid, job.fasta[last.pos:f.pos+f.taglen])
318                         path = append(path, libref)
319                         if libref.Variant > 0 {
320                                 // Count output coverage from
321                                 // the end of the previous tag
322                                 // (if any) to the end of the
323                                 // current tag, IOW don't
324                                 // double-count coverage for
325                                 // the previous tag.
326                                 basesOut += countBases(job.fasta[last.pos+last.taglen : f.pos+f.taglen])
327                         } else {
328                                 // If we dropped this tile
329                                 // (because !includeNoCalls),
330                                 // set taglen=0 so the
331                                 // overlapping tag is counted
332                                 // toward coverage on the
333                                 // following tile.
334                                 f.taglen = 0
335                         }
336                         last = f
337                 }
338                 if last.tagid < 0 {
339                         log.Warnf("%s %s no tags found", filelabel, job.label)
340                 } else {
341                         libref := tilelib.getRef(last.tagid, job.fasta[last.pos:])
342                         path = append(path, libref)
343                         if libref.Variant > 0 {
344                                 basesOut += countBases(job.fasta[last.pos+last.taglen:])
345                         }
346                 }
347
348                 pathcopy := make([]tileLibRef, len(path))
349                 copy(pathcopy, path)
350                 ret[job.label] = pathcopy
351                 log.Debugf("%s %s tiled with path len %d, skipped %d", filelabel, job.label, len(path), skipped)
352
353                 basesIn := countBases(job.fasta)
354                 log.Infof("%s %s fasta in %d coverage in %d coverage out %d", filelabel, job.label, len(job.fasta), basesIn, basesOut)
355                 stats = append(stats, importStats{
356                         InputFile:              filelabel,
357                         InputLabel:             job.label,
358                         InputLength:            len(job.fasta),
359                         InputCoverage:          basesIn,
360                         TileCoverage:           basesOut,
361                         PathLength:             len(path),
362                         DroppedOutOfOrderTiles: skipped,
363                 })
364
365                 totalPathLen += len(path)
366         }
367         log.Printf("%s tiled with total path len %d in %d sequences (skipped %d sequences with '_' in name, skipped %d out-of-order tags)", filelabel, totalPathLen, len(ret), skippedSequences, totalFoundTags-totalPathLen)
368         return ret, stats, scanner.Err()
369 }
370
371 func (tilelib *tileLibrary) Len() int {
372         tilelib.mtx.Lock()
373         defer tilelib.mtx.Unlock()
374         return tilelib.variants
375 }
376
377 // Return a tileLibRef for a tile with the given tag and sequence,
378 // adding the sequence to the library if needed.
379 func (tilelib *tileLibrary) getRef(tag tagID, seq []byte) tileLibRef {
380         if !tilelib.includeNoCalls {
381                 for _, b := range seq {
382                         if b != 'a' && b != 'c' && b != 'g' && b != 't' {
383                                 // return "tile not found" if seq has any
384                                 // no-calls
385                                 return tileLibRef{Tag: tag}
386                         }
387                 }
388         }
389         tilelib.mtx.Lock()
390         // if tilelib.seq == nil {
391         //      tilelib.seq = map[[blake2b.Size]byte][]byte{}
392         // }
393         if tilelib.variant == nil && tilelib.taglib != nil {
394                 tilelib.variant = make([][][blake2b.Size256]byte, tilelib.taglib.Len())
395         }
396         if int(tag) >= len(tilelib.variant) {
397                 // If we haven't seen the tag library yet (as in a
398                 // merge), tilelib.taglib.Len() is zero. We can still
399                 // behave correctly, we just need to expand the
400                 // tilelib.variant slice as needed.
401                 if int(tag) >= cap(tilelib.variant) {
402                         // Allocate 2x capacity.
403                         newslice := make([][][blake2b.Size256]byte, int(tag)+1, (int(tag)+1)*2)
404                         copy(newslice, tilelib.variant)
405                         tilelib.variant = newslice[:int(tag)+1]
406                 } else {
407                         // Use previously allocated capacity, avoiding
408                         // copy.
409                         tilelib.variant = tilelib.variant[:int(tag)+1]
410                 }
411         }
412         seqhash := blake2b.Sum256(seq)
413         for i, varhash := range tilelib.variant[tag] {
414                 if varhash == seqhash {
415                         tilelib.mtx.Unlock()
416                         return tileLibRef{Tag: tag, Variant: tileVariantID(i + 1)}
417                 }
418         }
419         tilelib.variants++
420         tilelib.variant[tag] = append(tilelib.variant[tag], seqhash)
421         // tilelib.seq[seqhash] = append([]byte(nil), seq...)
422         variant := tileVariantID(len(tilelib.variant[tag]))
423         tilelib.mtx.Unlock()
424
425         if tilelib.encoder != nil {
426                 tilelib.encoder.Encode(LibraryEntry{
427                         TileVariants: []TileVariant{{
428                                 Tag:      tag,
429                                 Variant:  variant,
430                                 Blake2b:  seqhash,
431                                 Sequence: seq,
432                         }},
433                 })
434         }
435         return tileLibRef{Tag: tag, Variant: variant}
436 }
437
438 func countBases(seq []byte) int {
439         n := 0
440         for _, c := range seq {
441                 if isbase[c] {
442                         n++
443                 }
444         }
445         return n
446 }