X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/e473f1316af98ae33f3bea221d713049fbc8df81..dcfa4d7541c720b503ed1b0bde689158d162ac5b:/merge.go diff --git a/merge.go b/merge.go index 2e32ddb03e..68560edbd0 100644 --- a/merge.go +++ b/merge.go @@ -1,8 +1,12 @@ -package main +// Copyright (C) The Lightning Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package lightning import ( "bufio" - "bytes" + "context" "encoding/gob" "errors" "flag" @@ -12,20 +16,21 @@ import ( "net/http" _ "net/http/pprof" "os" + "strings" "sync" "git.arvados.org/arvados.git/sdk/go/arvados" + "github.com/klauspost/pgzip" log "github.com/sirupsen/logrus" ) type merger struct { stdin io.Reader inputs []string - output io.WriteCloser + output io.Writer tagSet [][]byte tilelib *tileLibrary mapped map[string]map[tileLibRef]tileVariantID - todo []liftCompactGenome mtxTags sync.Mutex errs chan error } @@ -66,12 +71,14 @@ func (cmd *merger) RunCommand(prog string, args []string, stdin io.Reader, stdou return 1 } runner := arvadosContainerRunner{ - Name: "lightning filter", + Name: "lightning merge", Client: arvados.NewClientFromEnv(), ProjectUUID: *projectUUID, - RAM: 64000000000, - VCPUs: 2, + RAM: 700000000000, + VCPUs: 16, Priority: *priority, + APIAccess: true, + KeepCache: 1, } for i := range cmd.inputs { err = runner.TranslatePaths(&cmd.inputs[i]) @@ -80,97 +87,53 @@ func (cmd *merger) RunCommand(prog string, args []string, stdin io.Reader, stdou } } runner.Args = append([]string{"merge", "-local=true", - "-o", "/mnt/output/library.gob", + "-o", "/mnt/output/library.gob.gz", }, cmd.inputs...) var output string output, err = runner.Run() if err != nil { return 1 } - fmt.Fprintln(stdout, output+"/library.gob") + fmt.Fprintln(stdout, output+"/library.gob.gz") return 0 } + var outf, outw io.WriteCloser if *outputFilename == "-" { - cmd.output = nopCloser{stdout} + outw = nopCloser{stdout} } else { - cmd.output, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777) + outf, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777) if err != nil { return 1 } - defer cmd.output.Close() + defer outf.Close() + if strings.HasSuffix(*outputFilename, ".gz") { + outw = pgzip.NewWriter(outf) + } else { + outw = nopCloser{outf} + } } - + bufw := bufio.NewWriterSize(outw, 64*1024*1024) + cmd.output = bufw err = cmd.doMerge() if err != nil { return 1 } - err = cmd.output.Close() + err = bufw.Flush() if err != nil { return 1 } - return 0 -} - -func (cmd *merger) mergeLibraryEntry(ent *LibraryEntry, src string) error { - mapped := cmd.mapped[src] - if len(cmd.errs) > 0 { - return errors.New("stopping after error in other goroutine") - } - if len(ent.TagSet) > 0 { - // We don't need the tagset to do a merge, but if it - // does appear in the input, we (a) output it once, - // and (b) do a sanity check, erroring out if the - // inputs have different tagsets. - cmd.mtxTags.Lock() - defer cmd.mtxTags.Unlock() - if len(cmd.tagSet) == 0 { - cmd.tagSet = ent.TagSet - if cmd.tilelib.encoder != nil { - go cmd.tilelib.encoder.Encode(LibraryEntry{ - TagSet: cmd.tagSet, - }) - } - } else if len(cmd.tagSet) != len(ent.TagSet) { - return fmt.Errorf("cannot merge libraries with differing tagsets") - } else { - for i := range ent.TagSet { - if !bytes.Equal(ent.TagSet[i], cmd.tagSet[i]) { - return fmt.Errorf("cannot merge libraries with differing tagsets") - } - } - } - } - for _, tv := range ent.TileVariants { - // Assign a new variant ID (unique across all inputs) - // for each input variant. - mapped[tileLibRef{tag: tv.Tag, variant: tv.Variant}] = cmd.tilelib.getRef(tv.Tag, tv.Sequence).variant - } - for _, cg := range ent.CompactGenomes { - cmd.todo = append(cmd.todo, liftCompactGenome{cg, mapped}) + err = outw.Close() + if err != nil { + return 1 } - return nil -} - -type liftCompactGenome struct { - CompactGenome - mapped map[tileLibRef]tileVariantID -} - -// Translate old variant IDs to new (mapped) variant IDs. -func (cg liftCompactGenome) lift() error { - for i, variant := range cg.Variants { - if variant == 0 { - continue - } - tag := tagID(i / 2) - newvariant, ok := cg.mapped[tileLibRef{tag: tag, variant: variant}] - if !ok { - return fmt.Errorf("oops: ent.CompactGenomes[] (%q) refs tag %d variant %d which is not in library", cg.Name, tag, variant) + if outf != nil { + err = outf.Close() + if err != nil { + return 1 } - cg.Variants[tag] = newvariant } - return nil + return 0 } func (cmd *merger) setError(err error) { @@ -182,10 +145,15 @@ func (cmd *merger) setError(err error) { func (cmd *merger) doMerge() error { w := bufio.NewWriter(cmd.output) + encoder := gob.NewEncoder(w) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cmd.errs = make(chan error, 1) cmd.tilelib = &tileLibrary{ - encoder: gob.NewEncoder(w), - includeNoCalls: true, + encoder: encoder, + retainNoCalls: true, } cmd.mapped = map[string]map[tileLibRef]tileVariantID{} @@ -195,31 +163,24 @@ func (cmd *merger) doMerge() error { var wg sync.WaitGroup for _, input := range cmd.inputs { - var infile io.ReadCloser - if input == "-" { - infile = ioutil.NopCloser(cmd.stdin) - } else { + rdr := ioutil.NopCloser(cmd.stdin) + if input != "-" { var err error - infile, err = os.Open(input) + rdr, err = open(input) if err != nil { return err } - defer infile.Close() + defer rdr.Close() } + rdr = ioutil.NopCloser(bufio.NewReaderSize(rdr, 8*1024*1024)) wg.Add(1) go func(input string) { defer wg.Done() log.Printf("%s: reading", input) - err := DecodeLibrary(infile, func(ent *LibraryEntry) error { - return cmd.mergeLibraryEntry(ent, input) - }) - if err != nil { - cmd.setError(fmt.Errorf("%s: decode: %w", input, err)) - return - } - err = infile.Close() + err := cmd.tilelib.LoadGob(ctx, rdr, strings.HasSuffix(input, ".gz")) if err != nil { - cmd.setError(fmt.Errorf("%s: close: %w", input, err)) + cmd.setError(fmt.Errorf("%s: load failed: %w", input, err)) + cancel() return } log.Printf("%s: done", input) @@ -230,24 +191,8 @@ func (cmd *merger) doMerge() error { if err := <-cmd.errs; err != nil { return err } - - var cgs []CompactGenome - for _, cg := range cmd.todo { - err := cg.lift() - if err != nil { - return err - } - cgs = append(cgs, cg.CompactGenome) - } - err := cmd.tilelib.encoder.Encode(LibraryEntry{ - CompactGenomes: cgs, - }) - if err != nil { - return err - } - log.Print("flushing") - err = w.Flush() + err := w.Flush() if err != nil { return err }