Use callbacks in struct instead of args to Load*().
[lightning.git] / tilelib.go
1 // Copyright (C) The Lightning Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package lightning
6
7 import (
8         "bufio"
9         "bytes"
10         "context"
11         "encoding/gob"
12         "fmt"
13         "io"
14         "os"
15         "regexp"
16         "runtime"
17         "sort"
18         "strings"
19         "sync"
20         "sync/atomic"
21
22         "github.com/klauspost/pgzip"
23         log "github.com/sirupsen/logrus"
24         "golang.org/x/crypto/blake2b"
25 )
26
27 type tileVariantID uint16 // 1-based
28
29 type tileLibRef struct {
30         Tag     tagID
31         Variant tileVariantID
32 }
33
34 type tileSeq map[string][]tileLibRef
35
36 func (tseq tileSeq) Variants() ([]tileVariantID, int, int) {
37         maxtag := 0
38         for _, refs := range tseq {
39                 for _, ref := range refs {
40                         if maxtag < int(ref.Tag) {
41                                 maxtag = int(ref.Tag)
42                         }
43                 }
44         }
45         vars := make([]tileVariantID, maxtag+1)
46         var kept, dropped int
47         for _, refs := range tseq {
48                 for _, ref := range refs {
49                         if vars[int(ref.Tag)] != 0 {
50                                 dropped++
51                         } else {
52                                 kept++
53                         }
54                         vars[int(ref.Tag)] = ref.Variant
55                 }
56         }
57         return vars, kept, dropped
58 }
59
60 type tileLibrary struct {
61         retainNoCalls       bool
62         skipOOO             bool
63         retainTileSequences bool
64
65         taglib         *tagLibrary
66         variant        [][][blake2b.Size256]byte
67         refseqs        map[string]map[string][]tileLibRef
68         compactGenomes map[string][]tileVariantID
69         seq2           map[[2]byte]map[[blake2b.Size256]byte][]byte
70         seq2lock       map[[2]byte]sync.Locker
71         variants       int64
72         // if non-nil, write out any tile variants added while tiling
73         encoder *gob.Encoder
74
75         onAddTileVariant func(libref tileLibRef, hash [blake2b.Size256]byte, seq []byte) error
76         onAddGenome      func(CompactGenome) error
77         onAddRefseq      func(CompactSequence) error
78
79         mtx   sync.RWMutex
80         vlock []sync.Locker
81 }
82
83 func (tilelib *tileLibrary) loadTagSet(newtagset [][]byte) error {
84         // Loading a tagset means either passing it through to the
85         // output (if it's the first one we've seen), or just ensuring
86         // it doesn't disagree with what we already have.
87         if len(newtagset) == 0 {
88                 return nil
89         }
90         tilelib.mtx.Lock()
91         defer tilelib.mtx.Unlock()
92         if tilelib.taglib == nil || tilelib.taglib.Len() == 0 {
93                 tilelib.taglib = &tagLibrary{}
94                 err := tilelib.taglib.setTags(newtagset)
95                 if err != nil {
96                         return err
97                 }
98                 if tilelib.encoder != nil {
99                         err = tilelib.encoder.Encode(LibraryEntry{
100                                 TagSet: newtagset,
101                         })
102                         if err != nil {
103                                 return err
104                         }
105                 }
106         } else if tilelib.taglib.Len() != len(newtagset) {
107                 return fmt.Errorf("cannot merge libraries with differing tagsets")
108         } else {
109                 current := tilelib.taglib.Tags()
110                 for i := range newtagset {
111                         if !bytes.Equal(newtagset[i], current[i]) {
112                                 return fmt.Errorf("cannot merge libraries with differing tagsets")
113                         }
114                 }
115         }
116         return nil
117 }
118
119 func (tilelib *tileLibrary) loadTileVariants(tvs []TileVariant, variantmap map[tileLibRef]tileVariantID) error {
120         for _, tv := range tvs {
121                 // Assign a new variant ID (unique across all inputs)
122                 // for each input variant.
123                 variantmap[tileLibRef{Tag: tv.Tag, Variant: tv.Variant}] = tilelib.getRef(tv.Tag, tv.Sequence).Variant
124         }
125         return nil
126 }
127
128 func (tilelib *tileLibrary) loadCompactGenomes(cgs []CompactGenome, variantmap map[tileLibRef]tileVariantID) error {
129         log.Debugf("loadCompactGenomes: %d", len(cgs))
130         var wg sync.WaitGroup
131         errs := make(chan error, 1)
132         for _, cg := range cgs {
133                 wg.Add(1)
134                 cg := cg
135                 go func() {
136                         defer wg.Done()
137                         for i, variant := range cg.Variants {
138                                 if len(errs) > 0 {
139                                         return
140                                 }
141                                 if variant == 0 {
142                                         continue
143                                 }
144                                 tag := tagID(i / 2)
145                                 newvariant, ok := variantmap[tileLibRef{Tag: tag, Variant: variant}]
146                                 if !ok {
147                                         err := fmt.Errorf("oops: genome %q has variant %d for tag %d, but that variant was not in its library", cg.Name, variant, tag)
148                                         select {
149                                         case errs <- err:
150                                         default:
151                                         }
152                                         return
153                                 }
154                                 // log.Tracef("loadCompactGenomes: cg %s tag %d variant %d => %d", cg.Name, tag, variant, newvariant)
155                                 cg.Variants[i] = newvariant
156                         }
157                         if tilelib.onAddGenome != nil {
158                                 err := tilelib.onAddGenome(cg)
159                                 if err != nil {
160                                         select {
161                                         case errs <- err:
162                                         default:
163                                         }
164                                         return
165                                 }
166                         }
167                         if tilelib.encoder != nil {
168                                 err := tilelib.encoder.Encode(LibraryEntry{
169                                         CompactGenomes: []CompactGenome{cg},
170                                 })
171                                 if err != nil {
172                                         select {
173                                         case errs <- err:
174                                         default:
175                                         }
176                                         return
177                                 }
178                         }
179                         if tilelib.compactGenomes != nil {
180                                 tilelib.mtx.Lock()
181                                 defer tilelib.mtx.Unlock()
182                                 tilelib.compactGenomes[cg.Name] = cg.Variants
183                         }
184                 }()
185         }
186         wg.Wait()
187         go close(errs)
188         return <-errs
189 }
190
191 func (tilelib *tileLibrary) loadCompactSequences(cseqs []CompactSequence, variantmap map[tileLibRef]tileVariantID) error {
192         log.Infof("loadCompactSequences: %d todo", len(cseqs))
193         for _, cseq := range cseqs {
194                 log.Infof("loadCompactSequences: checking %s", cseq.Name)
195                 for _, tseq := range cseq.TileSequences {
196                         for i, libref := range tseq {
197                                 if libref.Variant == 0 {
198                                         // No variant (e.g., import
199                                         // dropped tiles with
200                                         // no-calls) = no translation.
201                                         continue
202                                 }
203                                 v, ok := variantmap[libref]
204                                 if !ok {
205                                         return fmt.Errorf("oops: CompactSequence %q has variant %d for tag %d, but that variant was not in its library", cseq.Name, libref.Variant, libref.Tag)
206                                 }
207                                 tseq[i].Variant = v
208                         }
209                 }
210                 if tilelib.encoder != nil {
211                         if err := tilelib.encoder.Encode(LibraryEntry{
212                                 CompactSequences: []CompactSequence{cseq},
213                         }); err != nil {
214                                 return err
215                         }
216                 }
217                 if tilelib.onAddRefseq != nil {
218                         err := tilelib.onAddRefseq(cseq)
219                         if err != nil {
220                                 return err
221                         }
222                 }
223                 log.Infof("loadCompactSequences: checking %s done", cseq.Name)
224         }
225         tilelib.mtx.Lock()
226         defer tilelib.mtx.Unlock()
227         if tilelib.refseqs == nil {
228                 tilelib.refseqs = map[string]map[string][]tileLibRef{}
229         }
230         for _, cseq := range cseqs {
231                 tilelib.refseqs[cseq.Name] = cseq.TileSequences
232         }
233         log.Info("loadCompactSequences: done")
234         return nil
235 }
236
237 func (tilelib *tileLibrary) LoadDir(ctx context.Context, path string) error {
238         var files []string
239         var walk func(string) error
240         walk = func(path string) error {
241                 f, err := open(path)
242                 if err != nil {
243                         return err
244                 }
245                 defer f.Close()
246                 fis, err := f.Readdir(-1)
247                 if err != nil {
248                         files = append(files, path)
249                         return nil
250                 }
251                 for _, fi := range fis {
252                         if fi.Name() == "." || fi.Name() == ".." {
253                                 continue
254                         } else if child := path + "/" + fi.Name(); fi.IsDir() {
255                                 err = walk(child)
256                                 if err != nil {
257                                         return err
258                                 }
259                         } else if strings.HasSuffix(child, ".gob") || strings.HasSuffix(child, ".gob.gz") {
260                                 files = append(files, child)
261                         }
262                 }
263                 return nil
264         }
265         log.Infof("LoadDir: walk dir %s", path)
266         err := walk(path)
267         if err != nil {
268                 return err
269         }
270         ctx, cancel := context.WithCancel(ctx)
271         defer cancel()
272         var mtx sync.Mutex
273         allcgs := make([][]CompactGenome, len(files))
274         allcseqs := make([][]CompactSequence, len(files))
275         allvariantmap := map[tileLibRef]tileVariantID{}
276         errs := make(chan error, len(files))
277         log.Infof("LoadDir: read %d files", len(files))
278         for fileno, path := range files {
279                 fileno, path := fileno, path
280                 go func() {
281                         f, err := open(path)
282                         if err != nil {
283                                 errs <- err
284                                 return
285                         }
286                         defer f.Close()
287                         defer log.Infof("LoadDir: finished reading %s", path)
288
289                         var variantmap = map[tileLibRef]tileVariantID{}
290                         var cgs []CompactGenome
291                         var cseqs []CompactSequence
292                         err = DecodeLibrary(f, strings.HasSuffix(path, ".gz"), func(ent *LibraryEntry) error {
293                                 if ctx.Err() != nil {
294                                         return ctx.Err()
295                                 }
296                                 if len(ent.TagSet) > 0 {
297                                         mtx.Lock()
298                                         if tilelib.taglib == nil || tilelib.taglib.Len() != len(ent.TagSet) {
299                                                 // load first set of tags, or
300                                                 // report mismatch if 2 sets
301                                                 // have different #tags.
302                                                 if err := tilelib.loadTagSet(ent.TagSet); err != nil {
303                                                         mtx.Unlock()
304                                                         return err
305                                                 }
306                                         }
307                                         mtx.Unlock()
308                                 }
309                                 for _, tv := range ent.TileVariants {
310                                         variantmap[tileLibRef{Tag: tv.Tag, Variant: tv.Variant}] = tilelib.getRef(tv.Tag, tv.Sequence).Variant
311                                 }
312                                 cgs = append(cgs, ent.CompactGenomes...)
313                                 cseqs = append(cseqs, ent.CompactSequences...)
314                                 return nil
315                         })
316                         allcgs[fileno] = cgs
317                         allcseqs[fileno] = cseqs
318                         mtx.Lock()
319                         defer mtx.Unlock()
320                         for k, v := range variantmap {
321                                 allvariantmap[k] = v
322                         }
323                         errs <- err
324                 }()
325         }
326         for range files {
327                 err := <-errs
328                 if err != nil {
329                         return err
330                 }
331         }
332
333         log.Info("LoadDir: loadCompactGenomes")
334         var flatcgs []CompactGenome
335         for _, cgs := range allcgs {
336                 flatcgs = append(flatcgs, cgs...)
337         }
338         err = tilelib.loadCompactGenomes(flatcgs, allvariantmap)
339         if err != nil {
340                 return err
341         }
342
343         log.Info("LoadDir: loadCompactSequences")
344         var flatcseqs []CompactSequence
345         for _, cseqs := range allcseqs {
346                 flatcseqs = append(flatcseqs, cseqs...)
347         }
348         err = tilelib.loadCompactSequences(flatcseqs, allvariantmap)
349         if err != nil {
350                 return err
351         }
352
353         log.Info("LoadDir done")
354         return nil
355 }
356
357 func (tilelib *tileLibrary) WriteDir(dir string) error {
358         ntilefiles := 128
359         nfiles := ntilefiles + len(tilelib.refseqs)
360         files := make([]*os.File, nfiles)
361         for i := range files {
362                 f, err := os.OpenFile(fmt.Sprintf("%s/library.%04d.gob.gz", dir, i), os.O_CREATE|os.O_WRONLY, 0666)
363                 if err != nil {
364                         return err
365                 }
366                 defer f.Close()
367                 files[i] = f
368         }
369         bufws := make([]*bufio.Writer, nfiles)
370         for i := range bufws {
371                 bufws[i] = bufio.NewWriterSize(files[i], 1<<26)
372         }
373         zws := make([]*pgzip.Writer, nfiles)
374         for i := range zws {
375                 zws[i] = pgzip.NewWriter(bufws[i])
376                 defer zws[i].Close()
377         }
378         encoders := make([]*gob.Encoder, nfiles)
379         for i := range encoders {
380                 encoders[i] = gob.NewEncoder(zws[i])
381         }
382
383         cgnames := make([]string, 0, len(tilelib.compactGenomes))
384         for name := range tilelib.compactGenomes {
385                 cgnames = append(cgnames, name)
386         }
387         sort.Strings(cgnames)
388
389         refnames := make([]string, 0, len(tilelib.refseqs))
390         for name := range tilelib.refseqs {
391                 refnames = append(refnames, name)
392         }
393         sort.Strings(refnames)
394
395         log.Infof("WriteDir: writing %d files", nfiles)
396         ctx, cancel := context.WithCancel(context.Background())
397         defer cancel()
398         errs := make(chan error, nfiles)
399         for start := range files {
400                 start := start
401                 go func() {
402                         err := encoders[start].Encode(LibraryEntry{TagSet: tilelib.taglib.Tags()})
403                         if err != nil {
404                                 errs <- err
405                                 return
406                         }
407                         if refidx := start - ntilefiles; refidx >= 0 {
408                                 // write each ref to its own file
409                                 // (they seem to load very slowly)
410                                 name := refnames[refidx]
411                                 errs <- encoders[start].Encode(LibraryEntry{CompactSequences: []CompactSequence{{
412                                         Name:          name,
413                                         TileSequences: tilelib.refseqs[name],
414                                 }}})
415                                 return
416                         }
417                         for i := start; i < len(cgnames); i += ntilefiles {
418                                 err := encoders[start].Encode(LibraryEntry{CompactGenomes: []CompactGenome{{
419                                         Name:     cgnames[i],
420                                         Variants: tilelib.compactGenomes[cgnames[i]],
421                                 }}})
422                                 if err != nil {
423                                         errs <- err
424                                         return
425                                 }
426                         }
427                         tvs := []TileVariant{}
428                         for tag := start; tag < len(tilelib.variant) && ctx.Err() == nil; tag += ntilefiles {
429                                 tvs = tvs[:0]
430                                 for idx, hash := range tilelib.variant[tag] {
431                                         tvs = append(tvs, TileVariant{
432                                                 Tag:      tagID(tag),
433                                                 Variant:  tileVariantID(idx + 1),
434                                                 Blake2b:  hash,
435                                                 Sequence: tilelib.hashSequence(hash),
436                                         })
437                                 }
438                                 err := encoders[start].Encode(LibraryEntry{TileVariants: tvs})
439                                 if err != nil {
440                                         errs <- err
441                                         return
442                                 }
443                         }
444                         errs <- nil
445                 }()
446         }
447         for range files {
448                 err := <-errs
449                 if err != nil {
450                         return err
451                 }
452         }
453         log.Info("WriteDir: flushing")
454         for i := range zws {
455                 err := zws[i].Close()
456                 if err != nil {
457                         return err
458                 }
459                 err = bufws[i].Flush()
460                 if err != nil {
461                         return err
462                 }
463                 err = files[i].Close()
464                 if err != nil {
465                         return err
466                 }
467         }
468         log.Info("WriteDir: done")
469         return nil
470 }
471
472 // Load library data from rdr. Tile variants might be renumbered in
473 // the process; in that case, genomes variants will be renumbered to
474 // match.
475 func (tilelib *tileLibrary) LoadGob(ctx context.Context, rdr io.Reader, gz bool) error {
476         cgs := []CompactGenome{}
477         cseqs := []CompactSequence{}
478         variantmap := map[tileLibRef]tileVariantID{}
479         err := DecodeLibrary(rdr, gz, func(ent *LibraryEntry) error {
480                 if ctx.Err() != nil {
481                         return ctx.Err()
482                 }
483                 if err := tilelib.loadTagSet(ent.TagSet); err != nil {
484                         return err
485                 }
486                 if err := tilelib.loadTileVariants(ent.TileVariants, variantmap); err != nil {
487                         return err
488                 }
489                 cgs = append(cgs, ent.CompactGenomes...)
490                 cseqs = append(cseqs, ent.CompactSequences...)
491                 return nil
492         })
493         if err != nil {
494                 return err
495         }
496         if ctx.Err() != nil {
497                 return ctx.Err()
498         }
499         err = tilelib.loadCompactGenomes(cgs, variantmap)
500         if err != nil {
501                 return err
502         }
503         err = tilelib.loadCompactSequences(cseqs, variantmap)
504         if err != nil {
505                 return err
506         }
507         return nil
508 }
509
510 func (tilelib *tileLibrary) dump(out io.Writer) {
511         printTV := func(tag int, variant tileVariantID) {
512                 if variant < 1 {
513                         fmt.Fprintf(out, " -")
514                 } else if tag >= len(tilelib.variant) {
515                         fmt.Fprintf(out, " (!tag=%d)", tag)
516                 } else if int(variant) > len(tilelib.variant[tag]) {
517                         fmt.Fprintf(out, " (tag=%d,!variant=%d)", tag, variant)
518                 } else {
519                         fmt.Fprintf(out, " %x", tilelib.variant[tag][variant-1][:8])
520                 }
521         }
522         for refname, refseqs := range tilelib.refseqs {
523                 for seqname, seq := range refseqs {
524                         fmt.Fprintf(out, "ref %s %s", refname, seqname)
525                         for _, libref := range seq {
526                                 printTV(int(libref.Tag), libref.Variant)
527                         }
528                         fmt.Fprintf(out, "\n")
529                 }
530         }
531         for name, cg := range tilelib.compactGenomes {
532                 fmt.Fprintf(out, "cg %s", name)
533                 for tag, variant := range cg {
534                         printTV(tag/2, variant)
535                 }
536                 fmt.Fprintf(out, "\n")
537         }
538 }
539
540 type importStats struct {
541         InputFile              string
542         InputLabel             string
543         InputLength            int
544         InputCoverage          int
545         PathLength             int
546         DroppedOutOfOrderTiles int
547 }
548
549 func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader, matchChromosome *regexp.Regexp) (tileSeq, []importStats, error) {
550         ret := tileSeq{}
551         type jobT struct {
552                 label string
553                 fasta []byte
554         }
555         todo := make(chan jobT, 1)
556         scanner := bufio.NewScanner(rdr)
557         go func() {
558                 defer close(todo)
559                 var fasta []byte
560                 var seqlabel string
561                 for scanner.Scan() {
562                         buf := scanner.Bytes()
563                         if len(buf) > 0 && buf[0] == '>' {
564                                 todo <- jobT{seqlabel, append([]byte(nil), fasta...)}
565                                 seqlabel, fasta = strings.SplitN(string(buf[1:]), " ", 2)[0], fasta[:0]
566                                 log.Debugf("%s %s reading fasta", filelabel, seqlabel)
567                         } else {
568                                 fasta = append(fasta, bytes.ToLower(buf)...)
569                         }
570                 }
571                 todo <- jobT{seqlabel, fasta}
572         }()
573         type foundtag struct {
574                 pos   int
575                 tagid tagID
576         }
577         found := make([]foundtag, 2000000)
578         path := make([]tileLibRef, 2000000)
579         totalFoundTags := 0
580         totalPathLen := 0
581         skippedSequences := 0
582         taglen := tilelib.taglib.TagLen()
583         var stats []importStats
584         for job := range todo {
585                 if len(job.fasta) == 0 {
586                         continue
587                 } else if !matchChromosome.MatchString(job.label) {
588                         skippedSequences++
589                         continue
590                 }
591                 log.Debugf("%s %s tiling", filelabel, job.label)
592
593                 found = found[:0]
594                 tilelib.taglib.FindAll(job.fasta, func(tagid tagID, pos, taglen int) {
595                         found = append(found, foundtag{pos: pos, tagid: tagid})
596                 })
597                 totalFoundTags += len(found)
598                 if len(found) == 0 {
599                         log.Warnf("%s %s no tags found", filelabel, job.label)
600                 }
601
602                 skipped := 0
603                 if tilelib.skipOOO {
604                         log.Infof("%s %s keeping longest increasing subsequence", filelabel, job.label)
605                         keep := longestIncreasingSubsequence(len(found), func(i int) int { return int(found[i].tagid) })
606                         for i, x := range keep {
607                                 found[i] = found[x]
608                         }
609                         skipped = len(found) - len(keep)
610                         found = found[:len(keep)]
611                 }
612
613                 log.Infof("%s %s getting %d librefs", filelabel, job.label, len(found))
614                 throttle := &throttle{Max: runtime.NumCPU()}
615                 path = path[:len(found)]
616                 var lowquality int64
617                 for i, f := range found {
618                         i, f := i, f
619                         throttle.Acquire()
620                         go func() {
621                                 defer throttle.Release()
622                                 var startpos, endpos int
623                                 if i == 0 {
624                                         startpos = 0
625                                 } else {
626                                         startpos = f.pos
627                                 }
628                                 if i == len(found)-1 {
629                                         endpos = len(job.fasta)
630                                 } else {
631                                         endpos = found[i+1].pos + taglen
632                                 }
633                                 path[i] = tilelib.getRef(f.tagid, job.fasta[startpos:endpos])
634                                 if countBases(job.fasta[startpos:endpos]) != endpos-startpos {
635                                         atomic.AddInt64(&lowquality, 1)
636                                 }
637                         }()
638                 }
639                 throttle.Wait()
640
641                 log.Infof("%s %s copying path", filelabel, job.label)
642
643                 pathcopy := make([]tileLibRef, len(path))
644                 copy(pathcopy, path)
645                 ret[job.label] = pathcopy
646
647                 basesIn := countBases(job.fasta)
648                 log.Infof("%s %s fasta in %d coverage in %d path len %d low-quality %d skipped-out-of-order %d", filelabel, job.label, len(job.fasta), basesIn, len(path), lowquality, skipped)
649                 stats = append(stats, importStats{
650                         InputFile:              filelabel,
651                         InputLabel:             job.label,
652                         InputLength:            len(job.fasta),
653                         InputCoverage:          basesIn,
654                         PathLength:             len(path),
655                         DroppedOutOfOrderTiles: skipped,
656                 })
657
658                 totalPathLen += len(path)
659         }
660         log.Printf("%s tiled with total path len %d in %d sequences (skipped %d sequences that did not match chromosome regexp, skipped %d out-of-order tags)", filelabel, totalPathLen, len(ret), skippedSequences, totalFoundTags-totalPathLen)
661         return ret, stats, scanner.Err()
662 }
663
664 func (tilelib *tileLibrary) Len() int64 {
665         return atomic.LoadInt64(&tilelib.variants)
666 }
667
668 // Return a tileLibRef for a tile with the given tag and sequence,
669 // adding the sequence to the library if needed.
670 func (tilelib *tileLibrary) getRef(tag tagID, seq []byte) tileLibRef {
671         dropSeq := false
672         if !tilelib.retainNoCalls {
673                 for _, b := range seq {
674                         if b != 'a' && b != 'c' && b != 'g' && b != 't' {
675                                 dropSeq = true
676                                 break
677                         }
678                 }
679         }
680         seqhash := blake2b.Sum256(seq)
681         var vlock sync.Locker
682
683         tilelib.mtx.RLock()
684         if len(tilelib.vlock) > int(tag) {
685                 vlock = tilelib.vlock[tag]
686         }
687         tilelib.mtx.RUnlock()
688
689         if vlock != nil {
690                 vlock.Lock()
691                 for i, varhash := range tilelib.variant[tag] {
692                         if varhash == seqhash {
693                                 vlock.Unlock()
694                                 return tileLibRef{Tag: tag, Variant: tileVariantID(i + 1)}
695                         }
696                 }
697                 vlock.Unlock()
698         } else {
699                 tilelib.mtx.Lock()
700                 if tilelib.variant == nil && tilelib.taglib != nil {
701                         tilelib.variant = make([][][blake2b.Size256]byte, tilelib.taglib.Len())
702                         tilelib.vlock = make([]sync.Locker, tilelib.taglib.Len())
703                         for i := range tilelib.vlock {
704                                 tilelib.vlock[i] = new(sync.Mutex)
705                         }
706                 }
707                 if int(tag) >= len(tilelib.variant) {
708                         oldlen := len(tilelib.vlock)
709                         for i := 0; i < oldlen; i++ {
710                                 tilelib.vlock[i].Lock()
711                         }
712                         // If we haven't seen the tag library yet (as
713                         // in a merge), tilelib.taglib.Len() is
714                         // zero. We can still behave correctly, we
715                         // just need to expand the tilelib.variant and
716                         // tilelib.vlock slices as needed.
717                         if int(tag) >= cap(tilelib.variant) {
718                                 // Allocate 2x capacity.
719                                 newslice := make([][][blake2b.Size256]byte, int(tag)+1, (int(tag)+1)*2)
720                                 copy(newslice, tilelib.variant)
721                                 tilelib.variant = newslice[:int(tag)+1]
722                                 newvlock := make([]sync.Locker, int(tag)+1, (int(tag)+1)*2)
723                                 copy(newvlock, tilelib.vlock)
724                                 tilelib.vlock = newvlock[:int(tag)+1]
725                         } else {
726                                 // Use previously allocated capacity,
727                                 // avoiding copy.
728                                 tilelib.variant = tilelib.variant[:int(tag)+1]
729                                 tilelib.vlock = tilelib.vlock[:int(tag)+1]
730                         }
731                         for i := oldlen; i < len(tilelib.vlock); i++ {
732                                 tilelib.vlock[i] = new(sync.Mutex)
733                         }
734                         for i := 0; i < oldlen; i++ {
735                                 tilelib.vlock[i].Unlock()
736                         }
737                 }
738                 vlock = tilelib.vlock[tag]
739                 tilelib.mtx.Unlock()
740         }
741
742         vlock.Lock()
743         for i, varhash := range tilelib.variant[tag] {
744                 if varhash == seqhash {
745                         vlock.Unlock()
746                         return tileLibRef{Tag: tag, Variant: tileVariantID(i + 1)}
747                 }
748         }
749         atomic.AddInt64(&tilelib.variants, 1)
750         tilelib.variant[tag] = append(tilelib.variant[tag], seqhash)
751         variant := tileVariantID(len(tilelib.variant[tag]))
752         vlock.Unlock()
753
754         if tilelib.retainTileSequences && !dropSeq {
755                 seqCopy := append([]byte(nil), seq...)
756                 if tilelib.seq2 == nil {
757                         tilelib.mtx.Lock()
758                         if tilelib.seq2 == nil {
759                                 tilelib.seq2lock = map[[2]byte]sync.Locker{}
760                                 m := map[[2]byte]map[[blake2b.Size256]byte][]byte{}
761                                 var k [2]byte
762                                 for i := 0; i < 256; i++ {
763                                         k[0] = byte(i)
764                                         for j := 0; j < 256; j++ {
765                                                 k[1] = byte(j)
766                                                 m[k] = map[[blake2b.Size256]byte][]byte{}
767                                                 tilelib.seq2lock[k] = &sync.Mutex{}
768                                         }
769                                 }
770                                 tilelib.seq2 = m
771                         }
772                         tilelib.mtx.Unlock()
773                 }
774                 var k [2]byte
775                 copy(k[:], seqhash[:])
776                 locker := tilelib.seq2lock[k]
777                 locker.Lock()
778                 tilelib.seq2[k][seqhash] = seqCopy
779                 locker.Unlock()
780         }
781
782         saveSeq := seq
783         if dropSeq {
784                 // Save the hash, but not the sequence
785                 saveSeq = nil
786         }
787         if tilelib.encoder != nil {
788                 tilelib.encoder.Encode(LibraryEntry{
789                         TileVariants: []TileVariant{{
790                                 Tag:      tag,
791                                 Variant:  variant,
792                                 Blake2b:  seqhash,
793                                 Sequence: saveSeq,
794                         }},
795                 })
796         }
797         if tilelib.onAddTileVariant != nil {
798                 tilelib.onAddTileVariant(tileLibRef{tag, variant}, seqhash, saveSeq)
799         }
800         return tileLibRef{Tag: tag, Variant: variant}
801 }
802
803 func (tilelib *tileLibrary) hashSequence(hash [blake2b.Size256]byte) []byte {
804         var partition [2]byte
805         copy(partition[:], hash[:])
806         return tilelib.seq2[partition][hash]
807 }
808
809 func (tilelib *tileLibrary) TileVariantSequence(libref tileLibRef) []byte {
810         if libref.Variant == 0 || len(tilelib.variant) <= int(libref.Tag) || len(tilelib.variant[libref.Tag]) < int(libref.Variant) {
811                 return nil
812         }
813         return tilelib.hashSequence(tilelib.variant[libref.Tag][libref.Variant-1])
814 }
815
816 // Tidy deletes unreferenced tile variants and renumbers variants so
817 // more common variants have smaller IDs.
818 func (tilelib *tileLibrary) Tidy() {
819         log.Print("Tidy: compute inref")
820         inref := map[tileLibRef]bool{}
821         for _, refseq := range tilelib.refseqs {
822                 for _, librefs := range refseq {
823                         for _, libref := range librefs {
824                                 inref[libref] = true
825                         }
826                 }
827         }
828         log.Print("Tidy: compute remap")
829         remap := make([][]tileVariantID, len(tilelib.variant))
830         throttle := throttle{Max: runtime.NumCPU() + 1}
831         for tag, oldvariants := range tilelib.variant {
832                 tag, oldvariants := tagID(tag), oldvariants
833                 if tag%1000000 == 0 {
834                         log.Printf("Tidy: tag %d", tag)
835                 }
836                 throttle.Acquire()
837                 go func() {
838                         defer throttle.Release()
839                         uses := make([]int, len(oldvariants))
840                         for _, cg := range tilelib.compactGenomes {
841                                 for phase := 0; phase < 2; phase++ {
842                                         cgi := int(tag)*2 + phase
843                                         if cgi < len(cg) && cg[cgi] > 0 {
844                                                 uses[cg[cgi]-1]++
845                                         }
846                                 }
847                         }
848
849                         // Compute desired order of variants:
850                         // neworder[x] == index in oldvariants that
851                         // should move to position x.
852                         neworder := make([]int, len(oldvariants))
853                         for i := range neworder {
854                                 neworder[i] = i
855                         }
856                         sort.Slice(neworder, func(i, j int) bool {
857                                 if cmp := uses[neworder[i]] - uses[neworder[j]]; cmp != 0 {
858                                         return cmp > 0
859                                 } else {
860                                         return bytes.Compare(oldvariants[neworder[i]][:], oldvariants[neworder[j]][:]) < 0
861                                 }
862                         })
863
864                         // Replace tilelib.variant[tag] with a new
865                         // re-ordered slice of hashes, and make a
866                         // mapping from old to new variant IDs.
867                         remaptag := make([]tileVariantID, len(oldvariants)+1)
868                         newvariants := make([][blake2b.Size256]byte, 0, len(neworder))
869                         for _, oldi := range neworder {
870                                 if uses[oldi] > 0 || inref[tileLibRef{Tag: tag, Variant: tileVariantID(oldi + 1)}] {
871                                         newvariants = append(newvariants, oldvariants[oldi])
872                                         remaptag[oldi+1] = tileVariantID(len(newvariants))
873                                 }
874                         }
875                         tilelib.variant[tag] = newvariants
876                         remap[tag] = remaptag
877                 }()
878         }
879         throttle.Wait()
880
881         // Apply remap to genomes and reference sequences, so they
882         // refer to the same tile variants using the changed IDs.
883         log.Print("Tidy: apply remap")
884         var wg sync.WaitGroup
885         for _, cg := range tilelib.compactGenomes {
886                 cg := cg
887                 wg.Add(1)
888                 go func() {
889                         defer wg.Done()
890                         for idx, variant := range cg {
891                                 cg[idx] = remap[tagID(idx/2)][variant]
892                         }
893                 }()
894         }
895         for _, refcs := range tilelib.refseqs {
896                 for _, refseq := range refcs {
897                         refseq := refseq
898                         wg.Add(1)
899                         go func() {
900                                 defer wg.Done()
901                                 for i, tv := range refseq {
902                                         refseq[i].Variant = remap[tv.Tag][tv.Variant]
903                                 }
904                         }()
905                 }
906         }
907         wg.Wait()
908         log.Print("Tidy: done")
909 }
910
911 func countBases(seq []byte) int {
912         n := 0
913         for _, c := range seq {
914                 if isbase[c] {
915                         n++
916                 }
917         }
918         return n
919 }