import (
"bufio"
- "bytes"
+ "context"
"encoding/gob"
"errors"
"flag"
tagSet [][]byte
tilelib *tileLibrary
mapped map[string]map[tileLibRef]tileVariantID
- todo []liftCompactGenome
mtxTags sync.Mutex
errs chan error
}
return 0
}
-func (cmd *merger) mergeLibraryEntry(ent *LibraryEntry, src string) error {
- mapped := cmd.mapped[src]
- if len(cmd.errs) > 0 {
- return errors.New("stopping after error in other goroutine")
- }
- if len(ent.TagSet) > 0 {
- // We don't need the tagset to do a merge, but if it
- // does appear in the input, we (a) output it once,
- // and (b) do a sanity check, erroring out if the
- // inputs have different tagsets.
- cmd.mtxTags.Lock()
- defer cmd.mtxTags.Unlock()
- if len(cmd.tagSet) == 0 {
- cmd.tagSet = ent.TagSet
- if cmd.tilelib.encoder != nil {
- go cmd.tilelib.encoder.Encode(LibraryEntry{
- TagSet: cmd.tagSet,
- })
- }
- } else if len(cmd.tagSet) != len(ent.TagSet) {
- return fmt.Errorf("cannot merge libraries with differing tagsets")
- } else {
- for i := range ent.TagSet {
- if !bytes.Equal(ent.TagSet[i], cmd.tagSet[i]) {
- return fmt.Errorf("cannot merge libraries with differing tagsets")
- }
- }
- }
- }
- for _, tv := range ent.TileVariants {
- // Assign a new variant ID (unique across all inputs)
- // for each input variant.
- mapped[tileLibRef{tag: tv.Tag, variant: tv.Variant}] = cmd.tilelib.getRef(tv.Tag, tv.Sequence).variant
- }
- for _, cg := range ent.CompactGenomes {
- cmd.todo = append(cmd.todo, liftCompactGenome{cg, mapped})
- }
- return nil
-}
-
-type liftCompactGenome struct {
- CompactGenome
- mapped map[tileLibRef]tileVariantID
-}
-
-// Translate old variant IDs to new (mapped) variant IDs.
-func (cg liftCompactGenome) lift() error {
- for i, variant := range cg.Variants {
- if variant == 0 {
- continue
- }
- tag := tagID(i / 2)
- newvariant, ok := cg.mapped[tileLibRef{tag: tag, variant: variant}]
- if !ok {
- return fmt.Errorf("oops: ent.CompactGenomes[] (%q) refs tag %d variant %d which is not in library", cg.Name, tag, variant)
- }
- cg.Variants[tag] = newvariant
- }
- return nil
-}
-
func (cmd *merger) setError(err error) {
select {
case cmd.errs <- err:
func (cmd *merger) doMerge() error {
w := bufio.NewWriter(cmd.output)
+ encoder := gob.NewEncoder(w)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
cmd.errs = make(chan error, 1)
cmd.tilelib = &tileLibrary{
- encoder: gob.NewEncoder(w),
+ encoder: encoder,
includeNoCalls: true,
+ onLoadGenome: func(cg CompactGenome) {
+ err := encoder.Encode(LibraryEntry{CompactGenomes: []CompactGenome{cg}})
+ if err != nil {
+ cmd.setError(err)
+ cancel()
+ }
+ },
}
cmd.mapped = map[string]map[tileLibRef]tileVariantID{}
go func(input string) {
defer wg.Done()
log.Printf("%s: reading", input)
- err := DecodeLibrary(infile, func(ent *LibraryEntry) error {
- return cmd.mergeLibraryEntry(ent, input)
- })
+ err := cmd.tilelib.LoadGob(ctx, infile, nil)
if err != nil {
- cmd.setError(fmt.Errorf("%s: decode: %w", input, err))
+ cmd.setError(fmt.Errorf("%s: load failed: %w", input, err))
+ cancel()
return
}
err = infile.Close()
if err != nil {
- cmd.setError(fmt.Errorf("%s: close: %w", input, err))
+ cmd.setError(fmt.Errorf("%s: error closing input file: %w", input, err))
+ cancel()
return
}
log.Printf("%s: done", input)
if err := <-cmd.errs; err != nil {
return err
}
-
- var cgs []CompactGenome
- for _, cg := range cmd.todo {
- err := cg.lift()
- if err != nil {
- return err
- }
- cgs = append(cgs, cg.CompactGenome)
- }
- err := cmd.tilelib.encoder.Encode(LibraryEntry{
- CompactGenomes: cgs,
- })
- if err != nil {
- return err
- }
-
log.Print("flushing")
- err = w.Flush()
+ err := w.Flush()
if err != nil {
return err
}
import (
"bufio"
"bytes"
+ "context"
"encoding/gob"
+ "fmt"
"io"
"strings"
"sync"
variants int
// if non-nil, write out any tile variants added while tiling
encoder *gob.Encoder
+ // if non-nil, call this func upon loading a genome
+ onLoadGenome func(CompactGenome)
mtx sync.Mutex
}
+func (tilelib *tileLibrary) loadTagSet(newtagset [][]byte) error {
+ // Loading a tagset means either passing it through to the
+ // output (if it's the first one we've seen), or just ensuring
+ // it doesn't disagree with what we already have.
+ if len(newtagset) == 0 {
+ return nil
+ }
+ tilelib.mtx.Lock()
+ defer tilelib.mtx.Unlock()
+ if tilelib.taglib == nil || tilelib.taglib.Len() == 0 {
+ tilelib.taglib = &tagLibrary{}
+ err := tilelib.taglib.setTags(newtagset)
+ if err != nil {
+ return err
+ }
+ if tilelib.encoder != nil {
+ err = tilelib.encoder.Encode(LibraryEntry{
+ TagSet: newtagset,
+ })
+ if err != nil {
+ return err
+ }
+ }
+ } else if tilelib.taglib.Len() != len(newtagset) {
+ return fmt.Errorf("cannot merge libraries with differing tagsets")
+ } else {
+ current := tilelib.taglib.Tags()
+ for i := range newtagset {
+ if !bytes.Equal(newtagset[i], current[i]) {
+ return fmt.Errorf("cannot merge libraries with differing tagsets")
+ }
+ }
+ }
+ return nil
+}
+
+func (tilelib *tileLibrary) loadTileVariants(tvs []TileVariant, variantmap map[tileLibRef]tileVariantID) error {
+ for _, tv := range tvs {
+ // Assign a new variant ID (unique across all inputs)
+ // for each input variant.
+ variantmap[tileLibRef{tag: tv.Tag, variant: tv.Variant}] = tilelib.getRef(tv.Tag, tv.Sequence).variant
+ }
+ return nil
+}
+
+func (tilelib *tileLibrary) loadGenomes(genomes map[string][]tileVariantID, variantmap map[tileLibRef]tileVariantID, onLoadGenome func(CompactGenome)) error {
+ var wg sync.WaitGroup
+ errs := make(chan error, 1)
+ for name, variants := range genomes {
+ name, variants := name, variants
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for i, variant := range variants {
+ if len(errs) > 0 {
+ return
+ }
+ if variant == 0 {
+ continue
+ }
+ tag := tagID(i / 2)
+ newvariant, ok := variantmap[tileLibRef{tag: tag, variant: variant}]
+ if !ok {
+ err := fmt.Errorf("oops: genome %q has variant %d for tag %d, but that variant was not in its library", name, variant, tag)
+ select {
+ case errs <- err:
+ default:
+ }
+ return
+ }
+ variants[i] = newvariant
+ }
+ if tilelib.encoder != nil {
+ for name, variants := range genomes {
+ cg := CompactGenome{
+ Name: name,
+ Variants: variants,
+ }
+ if onLoadGenome != nil {
+ onLoadGenome(cg)
+ }
+ err := tilelib.encoder.Encode(LibraryEntry{
+ CompactGenomes: []CompactGenome{cg},
+ })
+ if err != nil {
+ select {
+ case errs <- err:
+ default:
+ }
+ return
+ }
+ }
+ }
+ }()
+ }
+ wg.Wait()
+ go close(errs)
+ return <-errs
+}
+
+func (tilelib *tileLibrary) LoadGob(ctx context.Context, rdr io.Reader, onLoadGenome func(CompactGenome)) error {
+ genomes := map[string][]tileVariantID{}
+ variantmap := map[tileLibRef]tileVariantID{}
+ err := DecodeLibrary(rdr, func(ent *LibraryEntry) error {
+ if ctx.Err() != nil {
+ return ctx.Err()
+ }
+ if err := tilelib.loadTagSet(ent.TagSet); err != nil {
+ return err
+ } else if err = tilelib.loadTileVariants(ent.TileVariants, variantmap); err != nil {
+ return err
+ } else {
+ for _, cg := range ent.CompactGenomes {
+ genomes[cg.Name] = cg.Variants
+ }
+ return nil
+ }
+ })
+ if err != nil {
+ return err
+ }
+ if ctx.Err() != nil {
+ return ctx.Err()
+ }
+ err = tilelib.loadGenomes(genomes, variantmap, onLoadGenome)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader) (tileSeq, error) {
ret := tileSeq{}
type jobT struct {