X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/83983adc7d02cb2b460eede7341313d54379c8c3..0689d0a23c71380de140fd5d89da63eaeea449ff:/merge.go diff --git a/merge.go b/merge.go index 1807393b59..68560edbd0 100644 --- a/merge.go +++ b/merge.go @@ -1,4 +1,8 @@ -package main +// Copyright (C) The Lightning Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package lightning import ( "bufio" @@ -12,16 +16,18 @@ import ( "net/http" _ "net/http/pprof" "os" + "strings" "sync" "git.arvados.org/arvados.git/sdk/go/arvados" + "github.com/klauspost/pgzip" log "github.com/sirupsen/logrus" ) type merger struct { stdin io.Reader inputs []string - output io.WriteCloser + output io.Writer tagSet [][]byte tilelib *tileLibrary mapped map[string]map[tileLibRef]tileVariantID @@ -65,12 +71,14 @@ func (cmd *merger) RunCommand(prog string, args []string, stdin io.Reader, stdou return 1 } runner := arvadosContainerRunner{ - Name: "lightning filter", + Name: "lightning merge", Client: arvados.NewClientFromEnv(), ProjectUUID: *projectUUID, - RAM: 64000000000, - VCPUs: 2, + RAM: 700000000000, + VCPUs: 16, Priority: *priority, + APIAccess: true, + KeepCache: 1, } for i := range cmd.inputs { err = runner.TranslatePaths(&cmd.inputs[i]) @@ -79,35 +87,52 @@ func (cmd *merger) RunCommand(prog string, args []string, stdin io.Reader, stdou } } runner.Args = append([]string{"merge", "-local=true", - "-o", "/mnt/output/library.gob", + "-o", "/mnt/output/library.gob.gz", }, cmd.inputs...) var output string output, err = runner.Run() if err != nil { return 1 } - fmt.Fprintln(stdout, output+"/library.gob") + fmt.Fprintln(stdout, output+"/library.gob.gz") return 0 } + var outf, outw io.WriteCloser if *outputFilename == "-" { - cmd.output = nopCloser{stdout} + outw = nopCloser{stdout} } else { - cmd.output, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777) + outf, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777) if err != nil { return 1 } - defer cmd.output.Close() + defer outf.Close() + if strings.HasSuffix(*outputFilename, ".gz") { + outw = pgzip.NewWriter(outf) + } else { + outw = nopCloser{outf} + } } - + bufw := bufio.NewWriterSize(outw, 64*1024*1024) + cmd.output = bufw err = cmd.doMerge() if err != nil { return 1 } - err = cmd.output.Close() + err = bufw.Flush() if err != nil { return 1 } + err = outw.Close() + if err != nil { + return 1 + } + if outf != nil { + err = outf.Close() + if err != nil { + return 1 + } + } return 0 } @@ -138,33 +163,26 @@ func (cmd *merger) doMerge() error { var wg sync.WaitGroup for _, input := range cmd.inputs { - var infile io.ReadCloser - if input == "-" { - infile = ioutil.NopCloser(cmd.stdin) - } else { + rdr := ioutil.NopCloser(cmd.stdin) + if input != "-" { var err error - infile, err = os.Open(input) + rdr, err = open(input) if err != nil { return err } - defer infile.Close() + defer rdr.Close() } + rdr = ioutil.NopCloser(bufio.NewReaderSize(rdr, 8*1024*1024)) wg.Add(1) go func(input string) { defer wg.Done() log.Printf("%s: reading", input) - err := cmd.tilelib.LoadGob(ctx, infile, nil) + err := cmd.tilelib.LoadGob(ctx, rdr, strings.HasSuffix(input, ".gz")) if err != nil { cmd.setError(fmt.Errorf("%s: load failed: %w", input, err)) cancel() return } - err = infile.Close() - if err != nil { - cmd.setError(fmt.Errorf("%s: error closing input file: %w", input, err)) - cancel() - return - } log.Printf("%s: done", input) }(input) }