-package main
+// Copyright (C) The Lightning Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package lightning
import (
"bufio"
- "bytes"
+ "context"
"encoding/gob"
"errors"
"flag"
"net/http"
_ "net/http/pprof"
"os"
+ "strings"
"sync"
"git.arvados.org/arvados.git/sdk/go/arvados"
+ "github.com/klauspost/pgzip"
log "github.com/sirupsen/logrus"
)
type merger struct {
stdin io.Reader
inputs []string
- output io.WriteCloser
+ output io.Writer
tagSet [][]byte
tilelib *tileLibrary
mapped map[string]map[tileLibRef]tileVariantID
- todo []liftCompactGenome
mtxTags sync.Mutex
errs chan error
}
return 1
}
runner := arvadosContainerRunner{
- Name: "lightning filter",
+ Name: "lightning merge",
Client: arvados.NewClientFromEnv(),
ProjectUUID: *projectUUID,
- RAM: 64000000000,
- VCPUs: 2,
+ RAM: 700000000000,
+ VCPUs: 16,
Priority: *priority,
+ APIAccess: true,
+ KeepCache: 1,
}
for i := range cmd.inputs {
err = runner.TranslatePaths(&cmd.inputs[i])
}
}
runner.Args = append([]string{"merge", "-local=true",
- "-o", "/mnt/output/library.gob",
+ "-o", "/mnt/output/library.gob.gz",
}, cmd.inputs...)
var output string
output, err = runner.Run()
if err != nil {
return 1
}
- fmt.Fprintln(stdout, output+"/library.gob")
+ fmt.Fprintln(stdout, output+"/library.gob.gz")
return 0
}
+ var outf, outw io.WriteCloser
if *outputFilename == "-" {
- cmd.output = nopCloser{stdout}
+ outw = nopCloser{stdout}
} else {
- cmd.output, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
+ outf, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
if err != nil {
return 1
}
- defer cmd.output.Close()
+ defer outf.Close()
+ if strings.HasSuffix(*outputFilename, ".gz") {
+ outw = pgzip.NewWriter(outf)
+ } else {
+ outw = nopCloser{outf}
+ }
}
-
+ bufw := bufio.NewWriterSize(outw, 64*1024*1024)
+ cmd.output = bufw
err = cmd.doMerge()
if err != nil {
return 1
}
- err = cmd.output.Close()
+ err = bufw.Flush()
if err != nil {
return 1
}
- return 0
-}
-
-func (cmd *merger) mergeLibraryEntry(ent *LibraryEntry, src string) error {
- mapped := cmd.mapped[src]
- if len(cmd.errs) > 0 {
- return errors.New("stopping after error in other goroutine")
- }
- if len(ent.TagSet) > 0 {
- // We don't need the tagset to do a merge, but if it
- // does appear in the input, we (a) output it once,
- // and (b) do a sanity check, erroring out if the
- // inputs have different tagsets.
- cmd.mtxTags.Lock()
- defer cmd.mtxTags.Unlock()
- if len(cmd.tagSet) == 0 {
- cmd.tagSet = ent.TagSet
- if cmd.tilelib.encoder != nil {
- go cmd.tilelib.encoder.Encode(LibraryEntry{
- TagSet: cmd.tagSet,
- })
- }
- } else if len(cmd.tagSet) != len(ent.TagSet) {
- return fmt.Errorf("cannot merge libraries with differing tagsets")
- } else {
- for i := range ent.TagSet {
- if !bytes.Equal(ent.TagSet[i], cmd.tagSet[i]) {
- return fmt.Errorf("cannot merge libraries with differing tagsets")
- }
- }
- }
- }
- for _, tv := range ent.TileVariants {
- // Assign a new variant ID (unique across all inputs)
- // for each input variant.
- mapped[tileLibRef{tag: tv.Tag, variant: tv.Variant}] = cmd.tilelib.getRef(tv.Tag, tv.Sequence).variant
- }
- for _, cg := range ent.CompactGenomes {
- cmd.todo = append(cmd.todo, liftCompactGenome{cg, mapped})
+ err = outw.Close()
+ if err != nil {
+ return 1
}
- return nil
-}
-
-type liftCompactGenome struct {
- CompactGenome
- mapped map[tileLibRef]tileVariantID
-}
-
-// Translate old variant IDs to new (mapped) variant IDs.
-func (cg liftCompactGenome) lift() error {
- for i, variant := range cg.Variants {
- if variant == 0 {
- continue
- }
- tag := tagID(i / 2)
- newvariant, ok := cg.mapped[tileLibRef{tag: tag, variant: variant}]
- if !ok {
- return fmt.Errorf("oops: ent.CompactGenomes[] (%q) refs tag %d variant %d which is not in library", cg.Name, tag, variant)
+ if outf != nil {
+ err = outf.Close()
+ if err != nil {
+ return 1
}
- cg.Variants[tag] = newvariant
}
- return nil
+ return 0
}
func (cmd *merger) setError(err error) {
func (cmd *merger) doMerge() error {
w := bufio.NewWriter(cmd.output)
+ encoder := gob.NewEncoder(w)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
cmd.errs = make(chan error, 1)
cmd.tilelib = &tileLibrary{
- encoder: gob.NewEncoder(w),
- includeNoCalls: true,
+ encoder: encoder,
+ retainNoCalls: true,
}
cmd.mapped = map[string]map[tileLibRef]tileVariantID{}
var wg sync.WaitGroup
for _, input := range cmd.inputs {
- var infile io.ReadCloser
- if input == "-" {
- infile = ioutil.NopCloser(cmd.stdin)
- } else {
+ rdr := ioutil.NopCloser(cmd.stdin)
+ if input != "-" {
var err error
- infile, err = os.Open(input)
+ rdr, err = open(input)
if err != nil {
return err
}
- defer infile.Close()
+ defer rdr.Close()
}
+ rdr = ioutil.NopCloser(bufio.NewReaderSize(rdr, 8*1024*1024))
wg.Add(1)
go func(input string) {
defer wg.Done()
log.Printf("%s: reading", input)
- err := DecodeLibrary(infile, func(ent *LibraryEntry) error {
- return cmd.mergeLibraryEntry(ent, input)
- })
- if err != nil {
- cmd.setError(fmt.Errorf("%s: decode: %w", input, err))
- return
- }
- err = infile.Close()
+ err := cmd.tilelib.LoadGob(ctx, rdr, strings.HasSuffix(input, ".gz"))
if err != nil {
- cmd.setError(fmt.Errorf("%s: close: %w", input, err))
+ cmd.setError(fmt.Errorf("%s: load failed: %w", input, err))
+ cancel()
return
}
log.Printf("%s: done", input)
if err := <-cmd.errs; err != nil {
return err
}
-
- var cgs []CompactGenome
- for _, cg := range cmd.todo {
- err := cg.lift()
- if err != nil {
- return err
- }
- cgs = append(cgs, cg.CompactGenome)
- }
- err := cmd.tilelib.encoder.Encode(LibraryEntry{
- CompactGenomes: cgs,
- })
- if err != nil {
- return err
- }
-
log.Print("flushing")
- err = w.Flush()
+ err := w.Flush()
if err != nil {
return err
}