b4ecd4a3afcfb122908fabf4f9631b623cd20cc5
[lightning.git] / tilelib.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "context"
7         "encoding/gob"
8         "fmt"
9         "io"
10         "strings"
11         "sync"
12
13         log "github.com/sirupsen/logrus"
14         "golang.org/x/crypto/blake2b"
15 )
16
17 type tileVariantID uint16 // 1-based
18
19 type tileLibRef struct {
20         tag     tagID
21         variant tileVariantID
22 }
23
24 type tileSeq map[string][]tileLibRef
25
26 func (tseq tileSeq) Variants() ([]tileVariantID, int, int) {
27         maxtag := 0
28         for _, refs := range tseq {
29                 for _, ref := range refs {
30                         if maxtag < int(ref.tag) {
31                                 maxtag = int(ref.tag)
32                         }
33                 }
34         }
35         vars := make([]tileVariantID, maxtag+1)
36         var kept, dropped int
37         for _, refs := range tseq {
38                 for _, ref := range refs {
39                         if vars[int(ref.tag)] != 0 {
40                                 dropped++
41                         } else {
42                                 kept++
43                         }
44                         vars[int(ref.tag)] = ref.variant
45                 }
46         }
47         return vars, kept, dropped
48 }
49
50 type tileLibrary struct {
51         includeNoCalls bool
52         skipOOO        bool
53         taglib         *tagLibrary
54         variant        [][][blake2b.Size256]byte
55         // count [][]int
56         // seq map[[blake2b.Size]byte][]byte
57         variants int
58         // if non-nil, write out any tile variants added while tiling
59         encoder *gob.Encoder
60         // if non-nil, call this func upon loading a genome
61         onLoadGenome func(CompactGenome)
62
63         mtx sync.Mutex
64 }
65
66 func (tilelib *tileLibrary) loadTagSet(newtagset [][]byte) error {
67         // Loading a tagset means either passing it through to the
68         // output (if it's the first one we've seen), or just ensuring
69         // it doesn't disagree with what we already have.
70         if len(newtagset) == 0 {
71                 return nil
72         }
73         tilelib.mtx.Lock()
74         defer tilelib.mtx.Unlock()
75         if tilelib.taglib == nil || tilelib.taglib.Len() == 0 {
76                 tilelib.taglib = &tagLibrary{}
77                 err := tilelib.taglib.setTags(newtagset)
78                 if err != nil {
79                         return err
80                 }
81                 if tilelib.encoder != nil {
82                         err = tilelib.encoder.Encode(LibraryEntry{
83                                 TagSet: newtagset,
84                         })
85                         if err != nil {
86                                 return err
87                         }
88                 }
89         } else if tilelib.taglib.Len() != len(newtagset) {
90                 return fmt.Errorf("cannot merge libraries with differing tagsets")
91         } else {
92                 current := tilelib.taglib.Tags()
93                 for i := range newtagset {
94                         if !bytes.Equal(newtagset[i], current[i]) {
95                                 return fmt.Errorf("cannot merge libraries with differing tagsets")
96                         }
97                 }
98         }
99         return nil
100 }
101
102 func (tilelib *tileLibrary) loadTileVariants(tvs []TileVariant, variantmap map[tileLibRef]tileVariantID) error {
103         for _, tv := range tvs {
104                 // Assign a new variant ID (unique across all inputs)
105                 // for each input variant.
106                 variantmap[tileLibRef{tag: tv.Tag, variant: tv.Variant}] = tilelib.getRef(tv.Tag, tv.Sequence).variant
107         }
108         return nil
109 }
110
111 func (tilelib *tileLibrary) loadGenomes(genomes map[string][]tileVariantID, variantmap map[tileLibRef]tileVariantID, onLoadGenome func(CompactGenome)) error {
112         var wg sync.WaitGroup
113         errs := make(chan error, 1)
114         for name, variants := range genomes {
115                 name, variants := name, variants
116                 wg.Add(1)
117                 go func() {
118                         defer wg.Done()
119                         for i, variant := range variants {
120                                 if len(errs) > 0 {
121                                         return
122                                 }
123                                 if variant == 0 {
124                                         continue
125                                 }
126                                 tag := tagID(i / 2)
127                                 newvariant, ok := variantmap[tileLibRef{tag: tag, variant: variant}]
128                                 if !ok {
129                                         err := fmt.Errorf("oops: genome %q has variant %d for tag %d, but that variant was not in its library", name, variant, tag)
130                                         select {
131                                         case errs <- err:
132                                         default:
133                                         }
134                                         return
135                                 }
136                                 variants[i] = newvariant
137                         }
138                         if tilelib.encoder != nil {
139                                 for name, variants := range genomes {
140                                         cg := CompactGenome{
141                                                 Name:     name,
142                                                 Variants: variants,
143                                         }
144                                         if onLoadGenome != nil {
145                                                 onLoadGenome(cg)
146                                         }
147                                         err := tilelib.encoder.Encode(LibraryEntry{
148                                                 CompactGenomes: []CompactGenome{cg},
149                                         })
150                                         if err != nil {
151                                                 select {
152                                                 case errs <- err:
153                                                 default:
154                                                 }
155                                                 return
156                                         }
157                                 }
158                         }
159                 }()
160         }
161         wg.Wait()
162         go close(errs)
163         return <-errs
164 }
165
166 func (tilelib *tileLibrary) LoadGob(ctx context.Context, rdr io.Reader, onLoadGenome func(CompactGenome)) error {
167         genomes := map[string][]tileVariantID{}
168         variantmap := map[tileLibRef]tileVariantID{}
169         err := DecodeLibrary(rdr, func(ent *LibraryEntry) error {
170                 if ctx.Err() != nil {
171                         return ctx.Err()
172                 }
173                 if err := tilelib.loadTagSet(ent.TagSet); err != nil {
174                         return err
175                 } else if err = tilelib.loadTileVariants(ent.TileVariants, variantmap); err != nil {
176                         return err
177                 } else {
178                         for _, cg := range ent.CompactGenomes {
179                                 genomes[cg.Name] = cg.Variants
180                         }
181                         return nil
182                 }
183         })
184         if err != nil {
185                 return err
186         }
187         if ctx.Err() != nil {
188                 return ctx.Err()
189         }
190         err = tilelib.loadGenomes(genomes, variantmap, onLoadGenome)
191         if err != nil {
192                 return err
193         }
194         return nil
195 }
196
197 func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader) (tileSeq, error) {
198         ret := tileSeq{}
199         type jobT struct {
200                 label string
201                 fasta []byte
202         }
203         todo := make(chan jobT)
204         scanner := bufio.NewScanner(rdr)
205         go func() {
206                 defer close(todo)
207                 var fasta []byte
208                 var seqlabel string
209                 for scanner.Scan() {
210                         buf := scanner.Bytes()
211                         if len(buf) > 0 && buf[0] == '>' {
212                                 todo <- jobT{seqlabel, fasta}
213                                 seqlabel, fasta = string(buf[1:]), nil
214                                 log.Debugf("%s %s reading fasta", filelabel, seqlabel)
215                         } else {
216                                 fasta = append(fasta, bytes.ToLower(buf)...)
217                         }
218                 }
219                 todo <- jobT{seqlabel, fasta}
220         }()
221         type foundtag struct {
222                 pos    int
223                 tagid  tagID
224                 taglen int
225         }
226         found := make([]foundtag, 2000000)
227         path := make([]tileLibRef, 2000000)
228         totalFoundTags := 0
229         totalPathLen := 0
230         skippedSequences := 0
231         for job := range todo {
232                 if len(job.fasta) == 0 {
233                         continue
234                 } else if strings.Contains(job.label, "_") {
235                         skippedSequences++
236                         continue
237                 }
238                 log.Debugf("%s %s tiling", filelabel, job.label)
239
240                 found = found[:0]
241                 tilelib.taglib.FindAll(job.fasta, func(tagid tagID, pos, taglen int) {
242                         found = append(found, foundtag{pos: pos, tagid: tagid, taglen: taglen})
243                 })
244                 totalFoundTags += len(found)
245
246                 skipped := 0
247                 path = path[:0]
248                 last := foundtag{tagid: -1}
249                 if tilelib.skipOOO {
250                         keep := longestIncreasingSubsequence(len(found), func(i int) int { return int(found[i].tagid) })
251                         for i, x := range keep {
252                                 found[i] = found[x]
253                         }
254                         skipped = len(found) - len(keep)
255                         found = found[:len(keep)]
256                 }
257                 for i, f := range found {
258                         log.Tracef("%s %s found[%d] == %#v", filelabel, job.label, i, f)
259                         if last.taglen > 0 {
260                                 path = append(path, tilelib.getRef(last.tagid, job.fasta[last.pos:f.pos+f.taglen]))
261                         }
262                         last = f
263                 }
264                 if last.taglen > 0 {
265                         path = append(path, tilelib.getRef(last.tagid, job.fasta[last.pos:]))
266                 }
267
268                 pathcopy := make([]tileLibRef, len(path))
269                 copy(pathcopy, path)
270                 ret[job.label] = pathcopy
271                 log.Debugf("%s %s tiled with path len %d, skipped %d", filelabel, job.label, len(path), skipped)
272                 totalPathLen += len(path)
273         }
274         log.Printf("%s tiled with total path len %d in %d sequences (skipped %d sequences with '_' in name, skipped %d out-of-order tags)", filelabel, totalPathLen, len(ret), skippedSequences, totalFoundTags-totalPathLen)
275         return ret, scanner.Err()
276 }
277
278 func (tilelib *tileLibrary) Len() int {
279         tilelib.mtx.Lock()
280         defer tilelib.mtx.Unlock()
281         return tilelib.variants
282 }
283
284 // Return a tileLibRef for a tile with the given tag and sequence,
285 // adding the sequence to the library if needed.
286 func (tilelib *tileLibrary) getRef(tag tagID, seq []byte) tileLibRef {
287         if !tilelib.includeNoCalls {
288                 for _, b := range seq {
289                         if b != 'a' && b != 'c' && b != 'g' && b != 't' {
290                                 // return "tile not found" if seq has any
291                                 // no-calls
292                                 return tileLibRef{tag: tag}
293                         }
294                 }
295         }
296         tilelib.mtx.Lock()
297         // if tilelib.seq == nil {
298         //      tilelib.seq = map[[blake2b.Size]byte][]byte{}
299         // }
300         if tilelib.variant == nil && tilelib.taglib != nil {
301                 tilelib.variant = make([][][blake2b.Size256]byte, tilelib.taglib.Len())
302         }
303         if int(tag) >= len(tilelib.variant) {
304                 // If we haven't seen the tag library yet (as in a
305                 // merge), tilelib.taglib.Len() is zero. We can still
306                 // behave correctly, we just need to expand the
307                 // tilelib.variant slice as needed.
308                 if int(tag) >= cap(tilelib.variant) {
309                         // Allocate 2x capacity.
310                         newslice := make([][][blake2b.Size256]byte, int(tag)+1, (int(tag)+1)*2)
311                         copy(newslice, tilelib.variant)
312                         tilelib.variant = newslice[:int(tag)+1]
313                 } else {
314                         // Use previously allocated capacity, avoiding
315                         // copy.
316                         tilelib.variant = tilelib.variant[:int(tag)+1]
317                 }
318         }
319         seqhash := blake2b.Sum256(seq)
320         for i, varhash := range tilelib.variant[tag] {
321                 if varhash == seqhash {
322                         tilelib.mtx.Unlock()
323                         return tileLibRef{tag: tag, variant: tileVariantID(i + 1)}
324                 }
325         }
326         tilelib.variants++
327         tilelib.variant[tag] = append(tilelib.variant[tag], seqhash)
328         // tilelib.seq[seqhash] = append([]byte(nil), seq...)
329         variant := tileVariantID(len(tilelib.variant[tag]))
330         tilelib.mtx.Unlock()
331
332         if tilelib.encoder != nil {
333                 tilelib.encoder.Encode(LibraryEntry{
334                         TileVariants: []TileVariant{{
335                                 Tag:      tag,
336                                 Variant:  variant,
337                                 Blake2b:  seqhash,
338                                 Sequence: seq,
339                         }},
340                 })
341         }
342         return tileLibRef{tag: tag, variant: variant}
343 }