Use only first field of fasta comment as sequence label.
[lightning.git] / tilelib.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "context"
7         "encoding/gob"
8         "fmt"
9         "io"
10         "strings"
11         "sync"
12
13         log "github.com/sirupsen/logrus"
14         "golang.org/x/crypto/blake2b"
15 )
16
17 type tileVariantID uint16 // 1-based
18
19 type tileLibRef struct {
20         Tag     tagID
21         Variant tileVariantID
22 }
23
24 type tileSeq map[string][]tileLibRef
25
26 func (tseq tileSeq) Variants() ([]tileVariantID, int, int) {
27         maxtag := 0
28         for _, refs := range tseq {
29                 for _, ref := range refs {
30                         if maxtag < int(ref.Tag) {
31                                 maxtag = int(ref.Tag)
32                         }
33                 }
34         }
35         vars := make([]tileVariantID, maxtag+1)
36         var kept, dropped int
37         for _, refs := range tseq {
38                 for _, ref := range refs {
39                         if vars[int(ref.Tag)] != 0 {
40                                 dropped++
41                         } else {
42                                 kept++
43                         }
44                         vars[int(ref.Tag)] = ref.Variant
45                 }
46         }
47         return vars, kept, dropped
48 }
49
50 type tileLibrary struct {
51         includeNoCalls bool
52         skipOOO        bool
53         taglib         *tagLibrary
54         variant        [][][blake2b.Size256]byte
55         refseqs        map[string]map[string][]tileLibRef
56         // count [][]int
57         // seq map[[blake2b.Size]byte][]byte
58         variants int
59         // if non-nil, write out any tile variants added while tiling
60         encoder *gob.Encoder
61
62         mtx sync.Mutex
63 }
64
65 func (tilelib *tileLibrary) loadTagSet(newtagset [][]byte) error {
66         // Loading a tagset means either passing it through to the
67         // output (if it's the first one we've seen), or just ensuring
68         // it doesn't disagree with what we already have.
69         if len(newtagset) == 0 {
70                 return nil
71         }
72         tilelib.mtx.Lock()
73         defer tilelib.mtx.Unlock()
74         if tilelib.taglib == nil || tilelib.taglib.Len() == 0 {
75                 tilelib.taglib = &tagLibrary{}
76                 err := tilelib.taglib.setTags(newtagset)
77                 if err != nil {
78                         return err
79                 }
80                 if tilelib.encoder != nil {
81                         err = tilelib.encoder.Encode(LibraryEntry{
82                                 TagSet: newtagset,
83                         })
84                         if err != nil {
85                                 return err
86                         }
87                 }
88         } else if tilelib.taglib.Len() != len(newtagset) {
89                 return fmt.Errorf("cannot merge libraries with differing tagsets")
90         } else {
91                 current := tilelib.taglib.Tags()
92                 for i := range newtagset {
93                         if !bytes.Equal(newtagset[i], current[i]) {
94                                 return fmt.Errorf("cannot merge libraries with differing tagsets")
95                         }
96                 }
97         }
98         return nil
99 }
100
101 func (tilelib *tileLibrary) loadTileVariants(tvs []TileVariant, variantmap map[tileLibRef]tileVariantID) error {
102         for _, tv := range tvs {
103                 // Assign a new variant ID (unique across all inputs)
104                 // for each input variant.
105                 variantmap[tileLibRef{Tag: tv.Tag, Variant: tv.Variant}] = tilelib.getRef(tv.Tag, tv.Sequence).Variant
106         }
107         return nil
108 }
109
110 func (tilelib *tileLibrary) loadCompactGenomes(cgs []CompactGenome, variantmap map[tileLibRef]tileVariantID, onLoadGenome func(CompactGenome)) error {
111         log.Debugf("loadCompactGenomes: %d", len(cgs))
112         var wg sync.WaitGroup
113         errs := make(chan error, 1)
114         for _, cg := range cgs {
115                 name, variants := cg.Name, cg.Variants
116                 wg.Add(1)
117                 go func() {
118                         defer wg.Done()
119                         for i, variant := range variants {
120                                 if len(errs) > 0 {
121                                         return
122                                 }
123                                 if variant == 0 {
124                                         continue
125                                 }
126                                 tag := tagID(i / 2)
127                                 newvariant, ok := variantmap[tileLibRef{Tag: tag, Variant: variant}]
128                                 if !ok {
129                                         err := fmt.Errorf("oops: genome %q has variant %d for tag %d, but that variant was not in its library", name, variant, tag)
130                                         select {
131                                         case errs <- err:
132                                         default:
133                                         }
134                                         return
135                                 }
136                                 log.Tracef("loadCompactGenomes: cg %s tag %d variant %d => %d", name, tag, variant, newvariant)
137                                 variants[i] = newvariant
138                         }
139                         if onLoadGenome != nil {
140                                 onLoadGenome(cg)
141                         }
142                         if tilelib.encoder != nil {
143                                 err := tilelib.encoder.Encode(LibraryEntry{
144                                         CompactGenomes: []CompactGenome{cg},
145                                 })
146                                 if err != nil {
147                                         select {
148                                         case errs <- err:
149                                         default:
150                                         }
151                                         return
152                                 }
153                         }
154                 }()
155         }
156         wg.Wait()
157         go close(errs)
158         return <-errs
159 }
160
161 func (tilelib *tileLibrary) loadCompactSequences(cseqs []CompactSequence, variantmap map[tileLibRef]tileVariantID) error {
162         log.Debugf("loadCompactSequences: %d", len(cseqs))
163         for _, cseq := range cseqs {
164                 for _, tseq := range cseq.TileSequences {
165                         for i, libref := range tseq {
166                                 if libref.Variant == 0 {
167                                         // No variant (e.g., import
168                                         // dropped tiles with
169                                         // no-calls) = no translation.
170                                         continue
171                                 }
172                                 v, ok := variantmap[libref]
173                                 if !ok {
174                                         return fmt.Errorf("oops: CompactSequence %q has variant %d for tag %d, but that variant was not in its library", cseq.Name, libref.Variant, libref.Tag)
175                                 }
176                                 tseq[i].Variant = v
177                         }
178                 }
179                 if tilelib.encoder != nil {
180                         if err := tilelib.encoder.Encode(LibraryEntry{
181                                 CompactSequences: []CompactSequence{cseq},
182                         }); err != nil {
183                                 return err
184                         }
185                 }
186         }
187         tilelib.mtx.Lock()
188         defer tilelib.mtx.Unlock()
189         if tilelib.refseqs == nil {
190                 tilelib.refseqs = map[string]map[string][]tileLibRef{}
191         }
192         for _, cseq := range cseqs {
193                 tilelib.refseqs[cseq.Name] = cseq.TileSequences
194         }
195         return nil
196 }
197
198 // Load library data from rdr. Tile variants might be renumbered in
199 // the process; in that case, genomes variants will be renumbered to
200 // match.
201 //
202 // If onLoadGenome is non-nil, call it on each CompactGenome entry.
203 func (tilelib *tileLibrary) LoadGob(ctx context.Context, rdr io.Reader, onLoadGenome func(CompactGenome)) error {
204         cgs := []CompactGenome{}
205         cseqs := []CompactSequence{}
206         variantmap := map[tileLibRef]tileVariantID{}
207         err := DecodeLibrary(rdr, func(ent *LibraryEntry) error {
208                 if ctx.Err() != nil {
209                         return ctx.Err()
210                 }
211                 if err := tilelib.loadTagSet(ent.TagSet); err != nil {
212                         return err
213                 }
214                 if err := tilelib.loadTileVariants(ent.TileVariants, variantmap); err != nil {
215                         return err
216                 }
217                 cgs = append(cgs, ent.CompactGenomes...)
218                 cseqs = append(cseqs, ent.CompactSequences...)
219                 return nil
220         })
221         if err != nil {
222                 return err
223         }
224         if ctx.Err() != nil {
225                 return ctx.Err()
226         }
227         err = tilelib.loadCompactGenomes(cgs, variantmap, onLoadGenome)
228         if err != nil {
229                 return err
230         }
231         err = tilelib.loadCompactSequences(cseqs, variantmap)
232         if err != nil {
233                 return err
234         }
235         return nil
236 }
237
238 func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader) (tileSeq, error) {
239         ret := tileSeq{}
240         type jobT struct {
241                 label string
242                 fasta []byte
243         }
244         todo := make(chan jobT)
245         scanner := bufio.NewScanner(rdr)
246         go func() {
247                 defer close(todo)
248                 var fasta []byte
249                 var seqlabel string
250                 for scanner.Scan() {
251                         buf := scanner.Bytes()
252                         if len(buf) > 0 && buf[0] == '>' {
253                                 todo <- jobT{seqlabel, fasta}
254                                 seqlabel, fasta = strings.SplitN(string(buf[1:]), " ", 2)[0], nil
255                                 log.Debugf("%s %s reading fasta", filelabel, seqlabel)
256                         } else {
257                                 fasta = append(fasta, bytes.ToLower(buf)...)
258                         }
259                 }
260                 todo <- jobT{seqlabel, fasta}
261         }()
262         type foundtag struct {
263                 pos    int
264                 tagid  tagID
265                 taglen int
266         }
267         found := make([]foundtag, 2000000)
268         path := make([]tileLibRef, 2000000)
269         totalFoundTags := 0
270         totalPathLen := 0
271         skippedSequences := 0
272         for job := range todo {
273                 if len(job.fasta) == 0 {
274                         continue
275                 } else if strings.Contains(job.label, "_") {
276                         skippedSequences++
277                         continue
278                 }
279                 log.Debugf("%s %s tiling", filelabel, job.label)
280
281                 found = found[:0]
282                 tilelib.taglib.FindAll(job.fasta, func(tagid tagID, pos, taglen int) {
283                         found = append(found, foundtag{pos: pos, tagid: tagid, taglen: taglen})
284                 })
285                 totalFoundTags += len(found)
286
287                 skipped := 0
288                 path = path[:0]
289                 last := foundtag{tagid: -1}
290                 if tilelib.skipOOO {
291                         keep := longestIncreasingSubsequence(len(found), func(i int) int { return int(found[i].tagid) })
292                         for i, x := range keep {
293                                 found[i] = found[x]
294                         }
295                         skipped = len(found) - len(keep)
296                         found = found[:len(keep)]
297                 }
298                 for i, f := range found {
299                         log.Tracef("%s %s found[%d] == %#v", filelabel, job.label, i, f)
300                         if last.taglen > 0 {
301                                 path = append(path, tilelib.getRef(last.tagid, job.fasta[last.pos:f.pos+f.taglen]))
302                         } else {
303                                 f.pos = 0
304                         }
305                         last = f
306                 }
307                 if last.taglen > 0 {
308                         path = append(path, tilelib.getRef(last.tagid, job.fasta[last.pos:]))
309                 } else {
310                         log.Warnf("%s %s no tags found", filelabel, job.label)
311                 }
312
313                 pathcopy := make([]tileLibRef, len(path))
314                 copy(pathcopy, path)
315                 ret[job.label] = pathcopy
316                 log.Debugf("%s %s tiled with path len %d, skipped %d", filelabel, job.label, len(path), skipped)
317                 totalPathLen += len(path)
318         }
319         log.Printf("%s tiled with total path len %d in %d sequences (skipped %d sequences with '_' in name, skipped %d out-of-order tags)", filelabel, totalPathLen, len(ret), skippedSequences, totalFoundTags-totalPathLen)
320         return ret, scanner.Err()
321 }
322
323 func (tilelib *tileLibrary) Len() int {
324         tilelib.mtx.Lock()
325         defer tilelib.mtx.Unlock()
326         return tilelib.variants
327 }
328
329 // Return a tileLibRef for a tile with the given tag and sequence,
330 // adding the sequence to the library if needed.
331 func (tilelib *tileLibrary) getRef(tag tagID, seq []byte) tileLibRef {
332         if !tilelib.includeNoCalls {
333                 for _, b := range seq {
334                         if b != 'a' && b != 'c' && b != 'g' && b != 't' {
335                                 // return "tile not found" if seq has any
336                                 // no-calls
337                                 return tileLibRef{Tag: tag}
338                         }
339                 }
340         }
341         tilelib.mtx.Lock()
342         // if tilelib.seq == nil {
343         //      tilelib.seq = map[[blake2b.Size]byte][]byte{}
344         // }
345         if tilelib.variant == nil && tilelib.taglib != nil {
346                 tilelib.variant = make([][][blake2b.Size256]byte, tilelib.taglib.Len())
347         }
348         if int(tag) >= len(tilelib.variant) {
349                 // If we haven't seen the tag library yet (as in a
350                 // merge), tilelib.taglib.Len() is zero. We can still
351                 // behave correctly, we just need to expand the
352                 // tilelib.variant slice as needed.
353                 if int(tag) >= cap(tilelib.variant) {
354                         // Allocate 2x capacity.
355                         newslice := make([][][blake2b.Size256]byte, int(tag)+1, (int(tag)+1)*2)
356                         copy(newslice, tilelib.variant)
357                         tilelib.variant = newslice[:int(tag)+1]
358                 } else {
359                         // Use previously allocated capacity, avoiding
360                         // copy.
361                         tilelib.variant = tilelib.variant[:int(tag)+1]
362                 }
363         }
364         seqhash := blake2b.Sum256(seq)
365         for i, varhash := range tilelib.variant[tag] {
366                 if varhash == seqhash {
367                         tilelib.mtx.Unlock()
368                         return tileLibRef{Tag: tag, Variant: tileVariantID(i + 1)}
369                 }
370         }
371         tilelib.variants++
372         tilelib.variant[tag] = append(tilelib.variant[tag], seqhash)
373         // tilelib.seq[seqhash] = append([]byte(nil), seq...)
374         variant := tileVariantID(len(tilelib.variant[tag]))
375         tilelib.mtx.Unlock()
376
377         if tilelib.encoder != nil {
378                 tilelib.encoder.Encode(LibraryEntry{
379                         TileVariants: []TileVariant{{
380                                 Tag:      tag,
381                                 Variant:  variant,
382                                 Blake2b:  seqhash,
383                                 Sequence: seq,
384                         }},
385                 })
386         }
387         return tileLibRef{Tag: tag, Variant: variant}
388 }