Fix coordinates in hgvs annotations.
[lightning.git] / merge.go
index 2e32ddb03e51313dae0d0eb5fbb9aa0e377db186..68560edbd0ae62af9fd203ba9aa6d2867a1fe883 100644 (file)
--- a/merge.go
+++ b/merge.go
@@ -1,8 +1,12 @@
-package main
+// Copyright (C) The Lightning Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package lightning
 
 import (
        "bufio"
-       "bytes"
+       "context"
        "encoding/gob"
        "errors"
        "flag"
@@ -12,20 +16,21 @@ import (
        "net/http"
        _ "net/http/pprof"
        "os"
+       "strings"
        "sync"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
+       "github.com/klauspost/pgzip"
        log "github.com/sirupsen/logrus"
 )
 
 type merger struct {
        stdin   io.Reader
        inputs  []string
-       output  io.WriteCloser
+       output  io.Writer
        tagSet  [][]byte
        tilelib *tileLibrary
        mapped  map[string]map[tileLibRef]tileVariantID
-       todo    []liftCompactGenome
        mtxTags sync.Mutex
        errs    chan error
 }
@@ -66,12 +71,14 @@ func (cmd *merger) RunCommand(prog string, args []string, stdin io.Reader, stdou
                        return 1
                }
                runner := arvadosContainerRunner{
-                       Name:        "lightning filter",
+                       Name:        "lightning merge",
                        Client:      arvados.NewClientFromEnv(),
                        ProjectUUID: *projectUUID,
-                       RAM:         64000000000,
-                       VCPUs:       2,
+                       RAM:         700000000000,
+                       VCPUs:       16,
                        Priority:    *priority,
+                       APIAccess:   true,
+                       KeepCache:   1,
                }
                for i := range cmd.inputs {
                        err = runner.TranslatePaths(&cmd.inputs[i])
@@ -80,97 +87,53 @@ func (cmd *merger) RunCommand(prog string, args []string, stdin io.Reader, stdou
                        }
                }
                runner.Args = append([]string{"merge", "-local=true",
-                       "-o", "/mnt/output/library.gob",
+                       "-o", "/mnt/output/library.gob.gz",
                }, cmd.inputs...)
                var output string
                output, err = runner.Run()
                if err != nil {
                        return 1
                }
-               fmt.Fprintln(stdout, output+"/library.gob")
+               fmt.Fprintln(stdout, output+"/library.gob.gz")
                return 0
        }
 
+       var outf, outw io.WriteCloser
        if *outputFilename == "-" {
-               cmd.output = nopCloser{stdout}
+               outw = nopCloser{stdout}
        } else {
-               cmd.output, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
+               outf, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
                if err != nil {
                        return 1
                }
-               defer cmd.output.Close()
+               defer outf.Close()
+               if strings.HasSuffix(*outputFilename, ".gz") {
+                       outw = pgzip.NewWriter(outf)
+               } else {
+                       outw = nopCloser{outf}
+               }
        }
-
+       bufw := bufio.NewWriterSize(outw, 64*1024*1024)
+       cmd.output = bufw
        err = cmd.doMerge()
        if err != nil {
                return 1
        }
-       err = cmd.output.Close()
+       err = bufw.Flush()
        if err != nil {
                return 1
        }
-       return 0
-}
-
-func (cmd *merger) mergeLibraryEntry(ent *LibraryEntry, src string) error {
-       mapped := cmd.mapped[src]
-       if len(cmd.errs) > 0 {
-               return errors.New("stopping after error in other goroutine")
-       }
-       if len(ent.TagSet) > 0 {
-               // We don't need the tagset to do a merge, but if it
-               // does appear in the input, we (a) output it once,
-               // and (b) do a sanity check, erroring out if the
-               // inputs have different tagsets.
-               cmd.mtxTags.Lock()
-               defer cmd.mtxTags.Unlock()
-               if len(cmd.tagSet) == 0 {
-                       cmd.tagSet = ent.TagSet
-                       if cmd.tilelib.encoder != nil {
-                               go cmd.tilelib.encoder.Encode(LibraryEntry{
-                                       TagSet: cmd.tagSet,
-                               })
-                       }
-               } else if len(cmd.tagSet) != len(ent.TagSet) {
-                       return fmt.Errorf("cannot merge libraries with differing tagsets")
-               } else {
-                       for i := range ent.TagSet {
-                               if !bytes.Equal(ent.TagSet[i], cmd.tagSet[i]) {
-                                       return fmt.Errorf("cannot merge libraries with differing tagsets")
-                               }
-                       }
-               }
-       }
-       for _, tv := range ent.TileVariants {
-               // Assign a new variant ID (unique across all inputs)
-               // for each input variant.
-               mapped[tileLibRef{tag: tv.Tag, variant: tv.Variant}] = cmd.tilelib.getRef(tv.Tag, tv.Sequence).variant
-       }
-       for _, cg := range ent.CompactGenomes {
-               cmd.todo = append(cmd.todo, liftCompactGenome{cg, mapped})
+       err = outw.Close()
+       if err != nil {
+               return 1
        }
-       return nil
-}
-
-type liftCompactGenome struct {
-       CompactGenome
-       mapped map[tileLibRef]tileVariantID
-}
-
-// Translate old variant IDs to new (mapped) variant IDs.
-func (cg liftCompactGenome) lift() error {
-       for i, variant := range cg.Variants {
-               if variant == 0 {
-                       continue
-               }
-               tag := tagID(i / 2)
-               newvariant, ok := cg.mapped[tileLibRef{tag: tag, variant: variant}]
-               if !ok {
-                       return fmt.Errorf("oops: ent.CompactGenomes[] (%q) refs tag %d variant %d which is not in library", cg.Name, tag, variant)
+       if outf != nil {
+               err = outf.Close()
+               if err != nil {
+                       return 1
                }
-               cg.Variants[tag] = newvariant
        }
-       return nil
+       return 0
 }
 
 func (cmd *merger) setError(err error) {
@@ -182,10 +145,15 @@ func (cmd *merger) setError(err error) {
 
 func (cmd *merger) doMerge() error {
        w := bufio.NewWriter(cmd.output)
+       encoder := gob.NewEncoder(w)
+
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
        cmd.errs = make(chan error, 1)
        cmd.tilelib = &tileLibrary{
-               encoder:        gob.NewEncoder(w),
-               includeNoCalls: true,
+               encoder:       encoder,
+               retainNoCalls: true,
        }
 
        cmd.mapped = map[string]map[tileLibRef]tileVariantID{}
@@ -195,31 +163,24 @@ func (cmd *merger) doMerge() error {
 
        var wg sync.WaitGroup
        for _, input := range cmd.inputs {
-               var infile io.ReadCloser
-               if input == "-" {
-                       infile = ioutil.NopCloser(cmd.stdin)
-               } else {
+               rdr := ioutil.NopCloser(cmd.stdin)
+               if input != "-" {
                        var err error
-                       infile, err = os.Open(input)
+                       rdr, err = open(input)
                        if err != nil {
                                return err
                        }
-                       defer infile.Close()
+                       defer rdr.Close()
                }
+               rdr = ioutil.NopCloser(bufio.NewReaderSize(rdr, 8*1024*1024))
                wg.Add(1)
                go func(input string) {
                        defer wg.Done()
                        log.Printf("%s: reading", input)
-                       err := DecodeLibrary(infile, func(ent *LibraryEntry) error {
-                               return cmd.mergeLibraryEntry(ent, input)
-                       })
-                       if err != nil {
-                               cmd.setError(fmt.Errorf("%s: decode: %w", input, err))
-                               return
-                       }
-                       err = infile.Close()
+                       err := cmd.tilelib.LoadGob(ctx, rdr, strings.HasSuffix(input, ".gz"))
                        if err != nil {
-                               cmd.setError(fmt.Errorf("%s: close: %w", input, err))
+                               cmd.setError(fmt.Errorf("%s: load failed: %w", input, err))
+                               cancel()
                                return
                        }
                        log.Printf("%s: done", input)
@@ -230,24 +191,8 @@ func (cmd *merger) doMerge() error {
        if err := <-cmd.errs; err != nil {
                return err
        }
-
-       var cgs []CompactGenome
-       for _, cg := range cmd.todo {
-               err := cg.lift()
-               if err != nil {
-                       return err
-               }
-               cgs = append(cgs, cg.CompactGenome)
-       }
-       err := cmd.tilelib.encoder.Encode(LibraryEntry{
-               CompactGenomes: cgs,
-       })
-       if err != nil {
-               return err
-       }
-
        log.Print("flushing")
-       err = w.Flush()
+       err := w.Flush()
        if err != nil {
                return err
        }