Add merge command.
authorTom Clegg <tom@tomclegg.ca>
Mon, 12 Oct 2020 13:48:18 +0000 (09:48 -0400)
committerTom Clegg <tom@tomclegg.ca>
Mon, 12 Oct 2020 13:48:18 +0000 (09:48 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@tomclegg.ca>

gob.go
merge.go [new file with mode: 0644]
pipeline_test.go
tilelib.go

diff --git a/gob.go b/gob.go
index 7619ebd44f2b6b5fb351b59e534809de6056b040..2fb061c5b1fc1770982dbeb9400d35b33ef562fb 100644 (file)
--- a/gob.go
+++ b/gob.go
@@ -28,16 +28,27 @@ type LibraryEntry struct {
 }
 
 func ReadCompactGenomes(rdr io.Reader) ([]CompactGenome, error) {
-       dec := gob.NewDecoder(bufio.NewReaderSize(rdr, 1<<26))
        var ret []CompactGenome
-       for {
+       err := DecodeLibrary(rdr, func(ent *LibraryEntry) error {
+               ret = append(ret, ent.CompactGenomes...)
+               return nil
+       })
+       return ret, err
+}
+
+func DecodeLibrary(rdr io.Reader, cb func(*LibraryEntry) error) error {
+       dec := gob.NewDecoder(bufio.NewReaderSize(rdr, 1<<26))
+       var err error
+       for err == nil {
                var ent LibraryEntry
-               err := dec.Decode(&ent)
-               if err == io.EOF {
-                       return ret, nil
-               } else if err != nil {
-                       return nil, err
+               err = dec.Decode(&ent)
+               if err == nil {
+                       err = cb(&ent)
                }
-               ret = append(ret, ent.CompactGenomes...)
+       }
+       if err == io.EOF {
+               return nil
+       } else {
+               return err
        }
 }
diff --git a/merge.go b/merge.go
new file mode 100644 (file)
index 0000000..2e32ddb
--- /dev/null
+++ b/merge.go
@@ -0,0 +1,255 @@
+package main
+
+import (
+       "bufio"
+       "bytes"
+       "encoding/gob"
+       "errors"
+       "flag"
+       "fmt"
+       "io"
+       "io/ioutil"
+       "net/http"
+       _ "net/http/pprof"
+       "os"
+       "sync"
+
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       log "github.com/sirupsen/logrus"
+)
+
+type merger struct {
+       stdin   io.Reader
+       inputs  []string
+       output  io.WriteCloser
+       tagSet  [][]byte
+       tilelib *tileLibrary
+       mapped  map[string]map[tileLibRef]tileVariantID
+       todo    []liftCompactGenome
+       mtxTags sync.Mutex
+       errs    chan error
+}
+
+func (cmd *merger) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+       var err error
+       defer func() {
+               if err != nil {
+                       fmt.Fprintf(stderr, "%s\n", err)
+               }
+       }()
+       flags := flag.NewFlagSet("", flag.ContinueOnError)
+       flags.SetOutput(stderr)
+       pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
+       runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
+       projectUUID := flags.String("project", "", "project `UUID` for output data")
+       priority := flags.Int("priority", 500, "container request priority")
+       outputFilename := flags.String("o", "-", "output `file`")
+       err = flags.Parse(args)
+       if err == flag.ErrHelp {
+               err = nil
+               return 0
+       } else if err != nil {
+               return 2
+       }
+       cmd.stdin = stdin
+       cmd.inputs = flags.Args()
+
+       if *pprof != "" {
+               go func() {
+                       log.Println(http.ListenAndServe(*pprof, nil))
+               }()
+       }
+
+       if !*runlocal {
+               if *outputFilename != "-" {
+                       err = errors.New("cannot specify output file in container mode: not implemented")
+                       return 1
+               }
+               runner := arvadosContainerRunner{
+                       Name:        "lightning filter",
+                       Client:      arvados.NewClientFromEnv(),
+                       ProjectUUID: *projectUUID,
+                       RAM:         64000000000,
+                       VCPUs:       2,
+                       Priority:    *priority,
+               }
+               for i := range cmd.inputs {
+                       err = runner.TranslatePaths(&cmd.inputs[i])
+                       if err != nil {
+                               return 1
+                       }
+               }
+               runner.Args = append([]string{"merge", "-local=true",
+                       "-o", "/mnt/output/library.gob",
+               }, cmd.inputs...)
+               var output string
+               output, err = runner.Run()
+               if err != nil {
+                       return 1
+               }
+               fmt.Fprintln(stdout, output+"/library.gob")
+               return 0
+       }
+
+       if *outputFilename == "-" {
+               cmd.output = nopCloser{stdout}
+       } else {
+               cmd.output, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
+               if err != nil {
+                       return 1
+               }
+               defer cmd.output.Close()
+       }
+
+       err = cmd.doMerge()
+       if err != nil {
+               return 1
+       }
+       err = cmd.output.Close()
+       if err != nil {
+               return 1
+       }
+       return 0
+}
+
+func (cmd *merger) mergeLibraryEntry(ent *LibraryEntry, src string) error {
+       mapped := cmd.mapped[src]
+       if len(cmd.errs) > 0 {
+               return errors.New("stopping after error in other goroutine")
+       }
+       if len(ent.TagSet) > 0 {
+               // We don't need the tagset to do a merge, but if it
+               // does appear in the input, we (a) output it once,
+               // and (b) do a sanity check, erroring out if the
+               // inputs have different tagsets.
+               cmd.mtxTags.Lock()
+               defer cmd.mtxTags.Unlock()
+               if len(cmd.tagSet) == 0 {
+                       cmd.tagSet = ent.TagSet
+                       if cmd.tilelib.encoder != nil {
+                               go cmd.tilelib.encoder.Encode(LibraryEntry{
+                                       TagSet: cmd.tagSet,
+                               })
+                       }
+               } else if len(cmd.tagSet) != len(ent.TagSet) {
+                       return fmt.Errorf("cannot merge libraries with differing tagsets")
+               } else {
+                       for i := range ent.TagSet {
+                               if !bytes.Equal(ent.TagSet[i], cmd.tagSet[i]) {
+                                       return fmt.Errorf("cannot merge libraries with differing tagsets")
+                               }
+                       }
+               }
+       }
+       for _, tv := range ent.TileVariants {
+               // Assign a new variant ID (unique across all inputs)
+               // for each input variant.
+               mapped[tileLibRef{tag: tv.Tag, variant: tv.Variant}] = cmd.tilelib.getRef(tv.Tag, tv.Sequence).variant
+       }
+       for _, cg := range ent.CompactGenomes {
+               cmd.todo = append(cmd.todo, liftCompactGenome{cg, mapped})
+       }
+       return nil
+}
+
+type liftCompactGenome struct {
+       CompactGenome
+       mapped map[tileLibRef]tileVariantID
+}
+
+// Translate old variant IDs to new (mapped) variant IDs.
+func (cg liftCompactGenome) lift() error {
+       for i, variant := range cg.Variants {
+               if variant == 0 {
+                       continue
+               }
+               tag := tagID(i / 2)
+               newvariant, ok := cg.mapped[tileLibRef{tag: tag, variant: variant}]
+               if !ok {
+                       return fmt.Errorf("oops: ent.CompactGenomes[] (%q) refs tag %d variant %d which is not in library", cg.Name, tag, variant)
+               }
+               cg.Variants[tag] = newvariant
+       }
+       return nil
+}
+
+func (cmd *merger) setError(err error) {
+       select {
+       case cmd.errs <- err:
+       default:
+       }
+}
+
+func (cmd *merger) doMerge() error {
+       w := bufio.NewWriter(cmd.output)
+       cmd.errs = make(chan error, 1)
+       cmd.tilelib = &tileLibrary{
+               encoder:        gob.NewEncoder(w),
+               includeNoCalls: true,
+       }
+
+       cmd.mapped = map[string]map[tileLibRef]tileVariantID{}
+       for _, input := range cmd.inputs {
+               cmd.mapped[input] = map[tileLibRef]tileVariantID{}
+       }
+
+       var wg sync.WaitGroup
+       for _, input := range cmd.inputs {
+               var infile io.ReadCloser
+               if input == "-" {
+                       infile = ioutil.NopCloser(cmd.stdin)
+               } else {
+                       var err error
+                       infile, err = os.Open(input)
+                       if err != nil {
+                               return err
+                       }
+                       defer infile.Close()
+               }
+               wg.Add(1)
+               go func(input string) {
+                       defer wg.Done()
+                       log.Printf("%s: reading", input)
+                       err := DecodeLibrary(infile, func(ent *LibraryEntry) error {
+                               return cmd.mergeLibraryEntry(ent, input)
+                       })
+                       if err != nil {
+                               cmd.setError(fmt.Errorf("%s: decode: %w", input, err))
+                               return
+                       }
+                       err = infile.Close()
+                       if err != nil {
+                               cmd.setError(fmt.Errorf("%s: close: %w", input, err))
+                               return
+                       }
+                       log.Printf("%s: done", input)
+               }(input)
+       }
+       wg.Wait()
+       go close(cmd.errs)
+       if err := <-cmd.errs; err != nil {
+               return err
+       }
+
+       var cgs []CompactGenome
+       for _, cg := range cmd.todo {
+               err := cg.lift()
+               if err != nil {
+                       return err
+               }
+               cgs = append(cgs, cg.CompactGenome)
+       }
+       err := cmd.tilelib.encoder.Encode(LibraryEntry{
+               CompactGenomes: cgs,
+       })
+       if err != nil {
+               return err
+       }
+
+       log.Print("flushing")
+       err = w.Flush()
+       if err != nil {
+               return err
+       }
+       return nil
+}
index 96659a29776340c8a0b4d270e37d7576246997bf..eb61d9672367b92e412a1c2be54f69b1a9aee342 100644 (file)
@@ -2,6 +2,7 @@ package main
 
 import (
        "bytes"
+       "fmt"
        "io"
        "os"
        "sync"
@@ -37,6 +38,39 @@ func (s *pipelineSuite) TestImport(c *check.C) {
                        c.Check(code, check.Equals, 0)
                }()
                wg.Wait()
-               os.Stdout.Write(statsout.Bytes())
+               c.Logf("%s", statsout.String())
        }
 }
+
+func (s *pipelineSuite) TestImportMerge(c *check.C) {
+       libfile := make([]string, 2)
+       tmpdir := c.MkDir()
+
+       var wg sync.WaitGroup
+       for i, infile := range []string{
+               "testdata/pipeline1/",
+               "testdata/ref.fasta",
+       } {
+               i, infile := i, infile
+               c.Logf("TestImportMerge: %s", infile)
+               libfile[i] = fmt.Sprintf("%s/%d.gob", tmpdir, i)
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       code := (&importer{}).RunCommand("lightning import", []string{"-local=true", "-o=" + libfile[i], "-skip-ooo=true", "-output-tiles", "-tag-library", "testdata/tags", infile}, bytes.NewReader(nil), &bytes.Buffer{}, os.Stderr)
+                       c.Check(code, check.Equals, 0)
+               }()
+       }
+       wg.Wait()
+
+       merged := &bytes.Buffer{}
+       code := (&merger{}).RunCommand("lightning merge", []string{"-local", libfile[0], libfile[1]}, bytes.NewReader(nil), merged, os.Stderr)
+       c.Check(code, check.Equals, 0)
+       c.Logf("len(merged) %d", merged.Len())
+
+       statsout := &bytes.Buffer{}
+       code = (&stats{}).RunCommand("lightning stats", []string{"-local"}, merged, statsout, os.Stderr)
+       c.Check(code, check.Equals, 0)
+       c.Check(statsout.Len() > 0, check.Equals, true)
+       c.Logf("%s", statsout.String())
+}
index 857b56683095dc9ddd1c7169fa3b2b1487c0edda..0a8d857cf1226e2d8146c48b64819b549807ec4e 100644 (file)
@@ -162,9 +162,25 @@ func (tilelib *tileLibrary) getRef(tag tagID, seq []byte) tileLibRef {
        // if tilelib.seq == nil {
        //      tilelib.seq = map[[blake2b.Size]byte][]byte{}
        // }
-       if tilelib.variant == nil {
+       if tilelib.variant == nil && tilelib.taglib != nil {
                tilelib.variant = make([][][blake2b.Size256]byte, tilelib.taglib.Len())
        }
+       if int(tag) >= len(tilelib.variant) {
+               // If we haven't seen the tag library yet (as in a
+               // merge), tilelib.taglib.Len() is zero. We can still
+               // behave correctly, we just need to expand the
+               // tilelib.variant slice as needed.
+               if int(tag) >= cap(tilelib.variant) {
+                       // Allocate 2x capacity.
+                       newslice := make([][][blake2b.Size256]byte, int(tag)+1, (int(tag)+1)*2)
+                       copy(newslice, tilelib.variant)
+                       tilelib.variant = newslice[:int(tag)+1]
+               } else {
+                       // Use previously allocated capacity, avoiding
+                       // copy.
+                       tilelib.variant = tilelib.variant[:int(tag)+1]
+               }
+       }
        seqhash := blake2b.Sum256(seq)
        for i, varhash := range tilelib.variant[tag] {
                if varhash == seqhash {
@@ -176,7 +192,6 @@ func (tilelib *tileLibrary) getRef(tag tagID, seq []byte) tileLibRef {
        tilelib.variant[tag] = append(tilelib.variant[tag], seqhash)
        // tilelib.seq[seqhash] = append([]byte(nil), seq...)
        variant := tileVariantID(len(tilelib.variant[tag]))
-       ret := tileLibRef{tag: tag, variant: variant}
        tilelib.mtx.Unlock()
 
        if tilelib.encoder != nil {
@@ -189,5 +204,5 @@ func (tilelib *tileLibrary) getRef(tag tagID, seq []byte) tileLibRef {
                        }},
                })
        }
-       return ret
+       return tileLibRef{tag: tag, variant: variant}
 }