Refactor merge.
authorTom Clegg <tom@tomclegg.ca>
Mon, 12 Oct 2020 19:53:14 +0000 (15:53 -0400)
committerTom Clegg <tom@tomclegg.ca>
Mon, 12 Oct 2020 19:53:14 +0000 (15:53 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@tomclegg.ca>

merge.go
tilelib.go

index 2e32ddb03e51313dae0d0eb5fbb9aa0e377db186..a55f47228db43b99c124a949c06f1642afa9df45 100644 (file)
--- a/merge.go
+++ b/merge.go
@@ -2,7 +2,7 @@ package main
 
 import (
        "bufio"
-       "bytes"
+       "context"
        "encoding/gob"
        "errors"
        "flag"
@@ -25,7 +25,6 @@ type merger struct {
        tagSet  [][]byte
        tilelib *tileLibrary
        mapped  map[string]map[tileLibRef]tileVariantID
-       todo    []liftCompactGenome
        mtxTags sync.Mutex
        errs    chan error
 }
@@ -112,67 +111,6 @@ func (cmd *merger) RunCommand(prog string, args []string, stdin io.Reader, stdou
        return 0
 }
 
-func (cmd *merger) mergeLibraryEntry(ent *LibraryEntry, src string) error {
-       mapped := cmd.mapped[src]
-       if len(cmd.errs) > 0 {
-               return errors.New("stopping after error in other goroutine")
-       }
-       if len(ent.TagSet) > 0 {
-               // We don't need the tagset to do a merge, but if it
-               // does appear in the input, we (a) output it once,
-               // and (b) do a sanity check, erroring out if the
-               // inputs have different tagsets.
-               cmd.mtxTags.Lock()
-               defer cmd.mtxTags.Unlock()
-               if len(cmd.tagSet) == 0 {
-                       cmd.tagSet = ent.TagSet
-                       if cmd.tilelib.encoder != nil {
-                               go cmd.tilelib.encoder.Encode(LibraryEntry{
-                                       TagSet: cmd.tagSet,
-                               })
-                       }
-               } else if len(cmd.tagSet) != len(ent.TagSet) {
-                       return fmt.Errorf("cannot merge libraries with differing tagsets")
-               } else {
-                       for i := range ent.TagSet {
-                               if !bytes.Equal(ent.TagSet[i], cmd.tagSet[i]) {
-                                       return fmt.Errorf("cannot merge libraries with differing tagsets")
-                               }
-                       }
-               }
-       }
-       for _, tv := range ent.TileVariants {
-               // Assign a new variant ID (unique across all inputs)
-               // for each input variant.
-               mapped[tileLibRef{tag: tv.Tag, variant: tv.Variant}] = cmd.tilelib.getRef(tv.Tag, tv.Sequence).variant
-       }
-       for _, cg := range ent.CompactGenomes {
-               cmd.todo = append(cmd.todo, liftCompactGenome{cg, mapped})
-       }
-       return nil
-}
-
-type liftCompactGenome struct {
-       CompactGenome
-       mapped map[tileLibRef]tileVariantID
-}
-
-// Translate old variant IDs to new (mapped) variant IDs.
-func (cg liftCompactGenome) lift() error {
-       for i, variant := range cg.Variants {
-               if variant == 0 {
-                       continue
-               }
-               tag := tagID(i / 2)
-               newvariant, ok := cg.mapped[tileLibRef{tag: tag, variant: variant}]
-               if !ok {
-                       return fmt.Errorf("oops: ent.CompactGenomes[] (%q) refs tag %d variant %d which is not in library", cg.Name, tag, variant)
-               }
-               cg.Variants[tag] = newvariant
-       }
-       return nil
-}
-
 func (cmd *merger) setError(err error) {
        select {
        case cmd.errs <- err:
@@ -182,10 +120,22 @@ func (cmd *merger) setError(err error) {
 
 func (cmd *merger) doMerge() error {
        w := bufio.NewWriter(cmd.output)
+       encoder := gob.NewEncoder(w)
+
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
        cmd.errs = make(chan error, 1)
        cmd.tilelib = &tileLibrary{
-               encoder:        gob.NewEncoder(w),
+               encoder:        encoder,
                includeNoCalls: true,
+               onLoadGenome: func(cg CompactGenome) {
+                       err := encoder.Encode(LibraryEntry{CompactGenomes: []CompactGenome{cg}})
+                       if err != nil {
+                               cmd.setError(err)
+                               cancel()
+                       }
+               },
        }
 
        cmd.mapped = map[string]map[tileLibRef]tileVariantID{}
@@ -210,16 +160,16 @@ func (cmd *merger) doMerge() error {
                go func(input string) {
                        defer wg.Done()
                        log.Printf("%s: reading", input)
-                       err := DecodeLibrary(infile, func(ent *LibraryEntry) error {
-                               return cmd.mergeLibraryEntry(ent, input)
-                       })
+                       err := cmd.tilelib.LoadGob(ctx, infile, nil)
                        if err != nil {
-                               cmd.setError(fmt.Errorf("%s: decode: %w", input, err))
+                               cmd.setError(fmt.Errorf("%s: load failed: %w", input, err))
+                               cancel()
                                return
                        }
                        err = infile.Close()
                        if err != nil {
-                               cmd.setError(fmt.Errorf("%s: close: %w", input, err))
+                               cmd.setError(fmt.Errorf("%s: error closing input file: %w", input, err))
+                               cancel()
                                return
                        }
                        log.Printf("%s: done", input)
@@ -230,24 +180,8 @@ func (cmd *merger) doMerge() error {
        if err := <-cmd.errs; err != nil {
                return err
        }
-
-       var cgs []CompactGenome
-       for _, cg := range cmd.todo {
-               err := cg.lift()
-               if err != nil {
-                       return err
-               }
-               cgs = append(cgs, cg.CompactGenome)
-       }
-       err := cmd.tilelib.encoder.Encode(LibraryEntry{
-               CompactGenomes: cgs,
-       })
-       if err != nil {
-               return err
-       }
-
        log.Print("flushing")
-       err = w.Flush()
+       err := w.Flush()
        if err != nil {
                return err
        }
index 0a8d857cf1226e2d8146c48b64819b549807ec4e..b4ecd4a3afcfb122908fabf4f9631b623cd20cc5 100644 (file)
@@ -3,7 +3,9 @@ package main
 import (
        "bufio"
        "bytes"
+       "context"
        "encoding/gob"
+       "fmt"
        "io"
        "strings"
        "sync"
@@ -55,10 +57,143 @@ type tileLibrary struct {
        variants int
        // if non-nil, write out any tile variants added while tiling
        encoder *gob.Encoder
+       // if non-nil, call this func upon loading a genome
+       onLoadGenome func(CompactGenome)
 
        mtx sync.Mutex
 }
 
+func (tilelib *tileLibrary) loadTagSet(newtagset [][]byte) error {
+       // Loading a tagset means either passing it through to the
+       // output (if it's the first one we've seen), or just ensuring
+       // it doesn't disagree with what we already have.
+       if len(newtagset) == 0 {
+               return nil
+       }
+       tilelib.mtx.Lock()
+       defer tilelib.mtx.Unlock()
+       if tilelib.taglib == nil || tilelib.taglib.Len() == 0 {
+               tilelib.taglib = &tagLibrary{}
+               err := tilelib.taglib.setTags(newtagset)
+               if err != nil {
+                       return err
+               }
+               if tilelib.encoder != nil {
+                       err = tilelib.encoder.Encode(LibraryEntry{
+                               TagSet: newtagset,
+                       })
+                       if err != nil {
+                               return err
+                       }
+               }
+       } else if tilelib.taglib.Len() != len(newtagset) {
+               return fmt.Errorf("cannot merge libraries with differing tagsets")
+       } else {
+               current := tilelib.taglib.Tags()
+               for i := range newtagset {
+                       if !bytes.Equal(newtagset[i], current[i]) {
+                               return fmt.Errorf("cannot merge libraries with differing tagsets")
+                       }
+               }
+       }
+       return nil
+}
+
+func (tilelib *tileLibrary) loadTileVariants(tvs []TileVariant, variantmap map[tileLibRef]tileVariantID) error {
+       for _, tv := range tvs {
+               // Assign a new variant ID (unique across all inputs)
+               // for each input variant.
+               variantmap[tileLibRef{tag: tv.Tag, variant: tv.Variant}] = tilelib.getRef(tv.Tag, tv.Sequence).variant
+       }
+       return nil
+}
+
+func (tilelib *tileLibrary) loadGenomes(genomes map[string][]tileVariantID, variantmap map[tileLibRef]tileVariantID, onLoadGenome func(CompactGenome)) error {
+       var wg sync.WaitGroup
+       errs := make(chan error, 1)
+       for name, variants := range genomes {
+               name, variants := name, variants
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       for i, variant := range variants {
+                               if len(errs) > 0 {
+                                       return
+                               }
+                               if variant == 0 {
+                                       continue
+                               }
+                               tag := tagID(i / 2)
+                               newvariant, ok := variantmap[tileLibRef{tag: tag, variant: variant}]
+                               if !ok {
+                                       err := fmt.Errorf("oops: genome %q has variant %d for tag %d, but that variant was not in its library", name, variant, tag)
+                                       select {
+                                       case errs <- err:
+                                       default:
+                                       }
+                                       return
+                               }
+                               variants[i] = newvariant
+                       }
+                       if tilelib.encoder != nil {
+                               for name, variants := range genomes {
+                                       cg := CompactGenome{
+                                               Name:     name,
+                                               Variants: variants,
+                                       }
+                                       if onLoadGenome != nil {
+                                               onLoadGenome(cg)
+                                       }
+                                       err := tilelib.encoder.Encode(LibraryEntry{
+                                               CompactGenomes: []CompactGenome{cg},
+                                       })
+                                       if err != nil {
+                                               select {
+                                               case errs <- err:
+                                               default:
+                                               }
+                                               return
+                                       }
+                               }
+                       }
+               }()
+       }
+       wg.Wait()
+       go close(errs)
+       return <-errs
+}
+
+func (tilelib *tileLibrary) LoadGob(ctx context.Context, rdr io.Reader, onLoadGenome func(CompactGenome)) error {
+       genomes := map[string][]tileVariantID{}
+       variantmap := map[tileLibRef]tileVariantID{}
+       err := DecodeLibrary(rdr, func(ent *LibraryEntry) error {
+               if ctx.Err() != nil {
+                       return ctx.Err()
+               }
+               if err := tilelib.loadTagSet(ent.TagSet); err != nil {
+                       return err
+               } else if err = tilelib.loadTileVariants(ent.TileVariants, variantmap); err != nil {
+                       return err
+               } else {
+                       for _, cg := range ent.CompactGenomes {
+                               genomes[cg.Name] = cg.Variants
+                       }
+                       return nil
+               }
+       })
+       if err != nil {
+               return err
+       }
+       if ctx.Err() != nil {
+               return ctx.Err()
+       }
+       err = tilelib.loadGenomes(genomes, variantmap, onLoadGenome)
+       if err != nil {
+               return err
+       }
+       return nil
+}
+
 func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader) (tileSeq, error) {
        ret := tileSeq{}
        type jobT struct {