Export HGVS.
[lightning.git] / tilelib.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "context"
7         "encoding/gob"
8         "fmt"
9         "io"
10         "strings"
11         "sync"
12
13         log "github.com/sirupsen/logrus"
14         "golang.org/x/crypto/blake2b"
15 )
16
17 type tileVariantID uint16 // 1-based
18
19 type tileLibRef struct {
20         Tag     tagID
21         Variant tileVariantID
22 }
23
24 type tileSeq map[string][]tileLibRef
25
26 func (tseq tileSeq) Variants() ([]tileVariantID, int, int) {
27         maxtag := 0
28         for _, refs := range tseq {
29                 for _, ref := range refs {
30                         if maxtag < int(ref.Tag) {
31                                 maxtag = int(ref.Tag)
32                         }
33                 }
34         }
35         vars := make([]tileVariantID, maxtag+1)
36         var kept, dropped int
37         for _, refs := range tseq {
38                 for _, ref := range refs {
39                         if vars[int(ref.Tag)] != 0 {
40                                 dropped++
41                         } else {
42                                 kept++
43                         }
44                         vars[int(ref.Tag)] = ref.Variant
45                 }
46         }
47         return vars, kept, dropped
48 }
49
50 type tileLibrary struct {
51         includeNoCalls bool
52         skipOOO        bool
53         taglib         *tagLibrary
54         variant        [][][blake2b.Size256]byte
55         refseqs        map[string]map[string][]tileLibRef
56         // count [][]int
57         // seq map[[blake2b.Size]byte][]byte
58         variants int
59         // if non-nil, write out any tile variants added while tiling
60         encoder *gob.Encoder
61
62         mtx sync.Mutex
63 }
64
65 func (tilelib *tileLibrary) loadTagSet(newtagset [][]byte) error {
66         // Loading a tagset means either passing it through to the
67         // output (if it's the first one we've seen), or just ensuring
68         // it doesn't disagree with what we already have.
69         if len(newtagset) == 0 {
70                 return nil
71         }
72         tilelib.mtx.Lock()
73         defer tilelib.mtx.Unlock()
74         if tilelib.taglib == nil || tilelib.taglib.Len() == 0 {
75                 tilelib.taglib = &tagLibrary{}
76                 err := tilelib.taglib.setTags(newtagset)
77                 if err != nil {
78                         return err
79                 }
80                 if tilelib.encoder != nil {
81                         err = tilelib.encoder.Encode(LibraryEntry{
82                                 TagSet: newtagset,
83                         })
84                         if err != nil {
85                                 return err
86                         }
87                 }
88         } else if tilelib.taglib.Len() != len(newtagset) {
89                 return fmt.Errorf("cannot merge libraries with differing tagsets")
90         } else {
91                 current := tilelib.taglib.Tags()
92                 for i := range newtagset {
93                         if !bytes.Equal(newtagset[i], current[i]) {
94                                 return fmt.Errorf("cannot merge libraries with differing tagsets")
95                         }
96                 }
97         }
98         return nil
99 }
100
101 func (tilelib *tileLibrary) loadTileVariants(tvs []TileVariant, variantmap map[tileLibRef]tileVariantID) error {
102         for _, tv := range tvs {
103                 // Assign a new variant ID (unique across all inputs)
104                 // for each input variant.
105                 variantmap[tileLibRef{Tag: tv.Tag, Variant: tv.Variant}] = tilelib.getRef(tv.Tag, tv.Sequence).Variant
106         }
107         return nil
108 }
109
110 func (tilelib *tileLibrary) loadCompactGenomes(cgs []CompactGenome, variantmap map[tileLibRef]tileVariantID, onLoadGenome func(CompactGenome)) error {
111         log.Debugf("loadCompactGenomes: %d", len(cgs))
112         var wg sync.WaitGroup
113         errs := make(chan error, 1)
114         for _, cg := range cgs {
115                 name, variants := cg.Name, cg.Variants
116                 wg.Add(1)
117                 go func() {
118                         defer wg.Done()
119                         for i, variant := range variants {
120                                 if len(errs) > 0 {
121                                         return
122                                 }
123                                 if variant == 0 {
124                                         continue
125                                 }
126                                 tag := tagID(i / 2)
127                                 newvariant, ok := variantmap[tileLibRef{Tag: tag, Variant: variant}]
128                                 if !ok {
129                                         err := fmt.Errorf("oops: genome %q has variant %d for tag %d, but that variant was not in its library", name, variant, tag)
130                                         select {
131                                         case errs <- err:
132                                         default:
133                                         }
134                                         return
135                                 }
136                                 log.Tracef("loadCompactGenomes: cg %s tag %d variant %d => %d", name, tag, variant, newvariant)
137                                 variants[i] = newvariant
138                         }
139                         if onLoadGenome != nil {
140                                 onLoadGenome(cg)
141                         }
142                         if tilelib.encoder != nil {
143                                 err := tilelib.encoder.Encode(LibraryEntry{
144                                         CompactGenomes: []CompactGenome{cg},
145                                 })
146                                 if err != nil {
147                                         select {
148                                         case errs <- err:
149                                         default:
150                                         }
151                                         return
152                                 }
153                         }
154                 }()
155         }
156         wg.Wait()
157         go close(errs)
158         return <-errs
159 }
160
161 func (tilelib *tileLibrary) loadCompactSequences(cseqs []CompactSequence, variantmap map[tileLibRef]tileVariantID) error {
162         log.Debugf("loadCompactSequences: %d", len(cseqs))
163         for _, cseq := range cseqs {
164                 for _, tseq := range cseq.TileSequences {
165                         for i, libref := range tseq {
166                                 v, ok := variantmap[libref]
167                                 if !ok {
168                                         return fmt.Errorf("oops: CompactSequence %q has variant %d for tag %d, but that variant was not in its library", cseq.Name, libref.Variant, libref.Tag)
169                                 }
170                                 tseq[i].Variant = v
171                         }
172                 }
173                 if tilelib.encoder != nil {
174                         if err := tilelib.encoder.Encode(LibraryEntry{
175                                 CompactSequences: []CompactSequence{cseq},
176                         }); err != nil {
177                                 return err
178                         }
179                 }
180         }
181         tilelib.mtx.Lock()
182         defer tilelib.mtx.Unlock()
183         if tilelib.refseqs == nil {
184                 tilelib.refseqs = map[string]map[string][]tileLibRef{}
185         }
186         for _, cseq := range cseqs {
187                 tilelib.refseqs[cseq.Name] = cseq.TileSequences
188         }
189         return nil
190 }
191
192 // Load library data from rdr. Tile variants might be renumbered in
193 // the process; in that case, genomes variants will be renumbered to
194 // match.
195 //
196 // If onLoadGenome is non-nil, call it on each CompactGenome entry.
197 func (tilelib *tileLibrary) LoadGob(ctx context.Context, rdr io.Reader, onLoadGenome func(CompactGenome)) error {
198         cgs := []CompactGenome{}
199         cseqs := []CompactSequence{}
200         variantmap := map[tileLibRef]tileVariantID{}
201         err := DecodeLibrary(rdr, func(ent *LibraryEntry) error {
202                 if ctx.Err() != nil {
203                         return ctx.Err()
204                 }
205                 if err := tilelib.loadTagSet(ent.TagSet); err != nil {
206                         return err
207                 }
208                 if err := tilelib.loadTileVariants(ent.TileVariants, variantmap); err != nil {
209                         return err
210                 }
211                 cgs = append(cgs, ent.CompactGenomes...)
212                 cseqs = append(cseqs, ent.CompactSequences...)
213                 return nil
214         })
215         if err != nil {
216                 return err
217         }
218         if ctx.Err() != nil {
219                 return ctx.Err()
220         }
221         err = tilelib.loadCompactGenomes(cgs, variantmap, onLoadGenome)
222         if err != nil {
223                 return err
224         }
225         err = tilelib.loadCompactSequences(cseqs, variantmap)
226         if err != nil {
227                 return err
228         }
229         return nil
230 }
231
232 func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader) (tileSeq, error) {
233         ret := tileSeq{}
234         type jobT struct {
235                 label string
236                 fasta []byte
237         }
238         todo := make(chan jobT)
239         scanner := bufio.NewScanner(rdr)
240         go func() {
241                 defer close(todo)
242                 var fasta []byte
243                 var seqlabel string
244                 for scanner.Scan() {
245                         buf := scanner.Bytes()
246                         if len(buf) > 0 && buf[0] == '>' {
247                                 todo <- jobT{seqlabel, fasta}
248                                 seqlabel, fasta = string(buf[1:]), nil
249                                 log.Debugf("%s %s reading fasta", filelabel, seqlabel)
250                         } else {
251                                 fasta = append(fasta, bytes.ToLower(buf)...)
252                         }
253                 }
254                 todo <- jobT{seqlabel, fasta}
255         }()
256         type foundtag struct {
257                 pos    int
258                 tagid  tagID
259                 taglen int
260         }
261         found := make([]foundtag, 2000000)
262         path := make([]tileLibRef, 2000000)
263         totalFoundTags := 0
264         totalPathLen := 0
265         skippedSequences := 0
266         for job := range todo {
267                 if len(job.fasta) == 0 {
268                         continue
269                 } else if strings.Contains(job.label, "_") {
270                         skippedSequences++
271                         continue
272                 }
273                 log.Debugf("%s %s tiling", filelabel, job.label)
274
275                 found = found[:0]
276                 tilelib.taglib.FindAll(job.fasta, func(tagid tagID, pos, taglen int) {
277                         found = append(found, foundtag{pos: pos, tagid: tagid, taglen: taglen})
278                 })
279                 totalFoundTags += len(found)
280
281                 skipped := 0
282                 path = path[:0]
283                 last := foundtag{tagid: -1}
284                 if tilelib.skipOOO {
285                         keep := longestIncreasingSubsequence(len(found), func(i int) int { return int(found[i].tagid) })
286                         for i, x := range keep {
287                                 found[i] = found[x]
288                         }
289                         skipped = len(found) - len(keep)
290                         found = found[:len(keep)]
291                 }
292                 for i, f := range found {
293                         log.Tracef("%s %s found[%d] == %#v", filelabel, job.label, i, f)
294                         if last.taglen > 0 {
295                                 path = append(path, tilelib.getRef(last.tagid, job.fasta[last.pos:f.pos+f.taglen]))
296                         } else {
297                                 f.pos = 0
298                         }
299                         last = f
300                 }
301                 if last.taglen > 0 {
302                         path = append(path, tilelib.getRef(last.tagid, job.fasta[last.pos:]))
303                 } else {
304                         log.Warnf("%s %s no tags found", filelabel, job.label)
305                 }
306
307                 pathcopy := make([]tileLibRef, len(path))
308                 copy(pathcopy, path)
309                 ret[job.label] = pathcopy
310                 log.Debugf("%s %s tiled with path len %d, skipped %d", filelabel, job.label, len(path), skipped)
311                 totalPathLen += len(path)
312         }
313         log.Printf("%s tiled with total path len %d in %d sequences (skipped %d sequences with '_' in name, skipped %d out-of-order tags)", filelabel, totalPathLen, len(ret), skippedSequences, totalFoundTags-totalPathLen)
314         return ret, scanner.Err()
315 }
316
317 func (tilelib *tileLibrary) Len() int {
318         tilelib.mtx.Lock()
319         defer tilelib.mtx.Unlock()
320         return tilelib.variants
321 }
322
323 // Return a tileLibRef for a tile with the given tag and sequence,
324 // adding the sequence to the library if needed.
325 func (tilelib *tileLibrary) getRef(tag tagID, seq []byte) tileLibRef {
326         if !tilelib.includeNoCalls {
327                 for _, b := range seq {
328                         if b != 'a' && b != 'c' && b != 'g' && b != 't' {
329                                 // return "tile not found" if seq has any
330                                 // no-calls
331                                 return tileLibRef{Tag: tag}
332                         }
333                 }
334         }
335         tilelib.mtx.Lock()
336         // if tilelib.seq == nil {
337         //      tilelib.seq = map[[blake2b.Size]byte][]byte{}
338         // }
339         if tilelib.variant == nil && tilelib.taglib != nil {
340                 tilelib.variant = make([][][blake2b.Size256]byte, tilelib.taglib.Len())
341         }
342         if int(tag) >= len(tilelib.variant) {
343                 // If we haven't seen the tag library yet (as in a
344                 // merge), tilelib.taglib.Len() is zero. We can still
345                 // behave correctly, we just need to expand the
346                 // tilelib.variant slice as needed.
347                 if int(tag) >= cap(tilelib.variant) {
348                         // Allocate 2x capacity.
349                         newslice := make([][][blake2b.Size256]byte, int(tag)+1, (int(tag)+1)*2)
350                         copy(newslice, tilelib.variant)
351                         tilelib.variant = newslice[:int(tag)+1]
352                 } else {
353                         // Use previously allocated capacity, avoiding
354                         // copy.
355                         tilelib.variant = tilelib.variant[:int(tag)+1]
356                 }
357         }
358         seqhash := blake2b.Sum256(seq)
359         for i, varhash := range tilelib.variant[tag] {
360                 if varhash == seqhash {
361                         tilelib.mtx.Unlock()
362                         return tileLibRef{Tag: tag, Variant: tileVariantID(i + 1)}
363                 }
364         }
365         tilelib.variants++
366         tilelib.variant[tag] = append(tilelib.variant[tag], seqhash)
367         // tilelib.seq[seqhash] = append([]byte(nil), seq...)
368         variant := tileVariantID(len(tilelib.variant[tag]))
369         tilelib.mtx.Unlock()
370
371         if tilelib.encoder != nil {
372                 tilelib.encoder.Encode(LibraryEntry{
373                         TileVariants: []TileVariant{{
374                                 Tag:      tag,
375                                 Variant:  variant,
376                                 Blake2b:  seqhash,
377                                 Sequence: seq,
378                         }},
379                 })
380         }
381         return tileLibRef{Tag: tag, Variant: variant}
382 }