limit concurrency
[lightning.git] / tilelib.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "crypto/md5"
7         "io"
8         "log"
9         "sync"
10 )
11
12 type tileVariantID int32 // 1-based
13
14 type tileLibRef struct {
15         tag     tagID
16         variant tileVariantID
17 }
18
19 type tileSeq map[string][]tileLibRef
20
21 type tileLibrary struct {
22         taglib  *tagLibrary
23         variant [][][md5.Size]byte
24         // count [][]int
25         // seq map[[md5.Size]byte][]byte
26
27         mtx sync.Mutex
28 }
29
30 func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader) (tileSeq, error) {
31         ret := tileSeq{}
32         type jobT struct {
33                 label string
34                 fasta []byte
35         }
36         todo := make(chan jobT)
37         scanner := bufio.NewScanner(rdr)
38         go func() {
39                 defer close(todo)
40                 var fasta []byte
41                 var seqlabel string
42                 for scanner.Scan() {
43                         buf := scanner.Bytes()
44                         if len(buf) == 0 || buf[0] == '>' {
45                                 todo <- jobT{seqlabel, fasta}
46                                 seqlabel, fasta = string(buf[1:]), nil
47                                 log.Printf("%s %s reading fasta", filelabel, seqlabel)
48                         } else {
49                                 fasta = append(fasta, bytes.ToLower(buf)...)
50                         }
51                 }
52                 todo <- jobT{seqlabel, fasta}
53         }()
54         for job := range todo {
55                 if len(job.fasta) == 0 {
56                         continue
57                 }
58                 log.Printf("%s %s tiling", filelabel, job.label)
59                 var path []tileLibRef
60                 tilestart := -1        // position in fasta of tile that ends here
61                 tiletagid := tagID(-1) // tag id starting tile that ends here
62                 tilelib.taglib.FindAll(job.fasta, func(id tagID, pos int) {
63                         if tilestart >= 0 {
64                                 path = append(path, tilelib.getRef(tiletagid, job.fasta[tilestart:pos]))
65                         }
66                         tilestart = pos
67                         tiletagid = id
68                 })
69                 if tiletagid >= 0 {
70                         path = append(path, tilelib.getRef(tiletagid, job.fasta[tilestart:]))
71                 }
72                 ret[job.label] = path
73                 log.Printf("%s %s tiled with path len %d", filelabel, job.label, len(path))
74         }
75         return ret, scanner.Err()
76 }
77
78 // Return a tileLibRef for a tile with the given tag and sequence,
79 // adding the sequence to the library if needed.
80 func (tilelib *tileLibrary) getRef(tag tagID, seq []byte) tileLibRef {
81         tilelib.mtx.Lock()
82         defer tilelib.mtx.Unlock()
83         // if tilelib.seq == nil {
84         //      tilelib.seq = map[[md5.Size]byte][]byte{}
85         // }
86         if len(tilelib.variant) <= int(tag) {
87                 tilelib.variant = append(tilelib.variant, make([][][md5.Size]byte, int(tag)-len(tilelib.variant)+1)...)
88         }
89         seqhash := md5.Sum(seq)
90         for i, varhash := range tilelib.variant[tag] {
91                 if varhash == seqhash {
92                         return tileLibRef{tag: tag, variant: tileVariantID(i + 1)}
93                 }
94         }
95         tilelib.variant[tag] = append(tilelib.variant[tag], seqhash)
96         // tilelib.seq[seqhash] = append([]byte(nil), seq...)
97         return tileLibRef{tag: tag, variant: tileVariantID(len(tilelib.variant[tag]))}
98 }