X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/69b71af4136fdaeeb5c2afbc559208dc5f428c48..c5be98ec79b8bd3dbd8d129daa6648b0f8ac5b61:/merge.go diff --git a/merge.go b/merge.go index 27b32308bf..68560edbd0 100644 --- a/merge.go +++ b/merge.go @@ -1,8 +1,11 @@ -package main +// Copyright (C) The Lightning Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package lightning import ( "bufio" - "compress/gzip" "context" "encoding/gob" "errors" @@ -17,13 +20,14 @@ import ( "sync" "git.arvados.org/arvados.git/sdk/go/arvados" + "github.com/klauspost/pgzip" log "github.com/sirupsen/logrus" ) type merger struct { stdin io.Reader inputs []string - output io.WriteCloser + output io.Writer tagSet [][]byte tilelib *tileLibrary mapped map[string]map[tileLibRef]tileVariantID @@ -67,12 +71,14 @@ func (cmd *merger) RunCommand(prog string, args []string, stdin io.Reader, stdou return 1 } runner := arvadosContainerRunner{ - Name: "lightning filter", + Name: "lightning merge", Client: arvados.NewClientFromEnv(), ProjectUUID: *projectUUID, - RAM: 64000000000, - VCPUs: 2, + RAM: 700000000000, + VCPUs: 16, Priority: *priority, + APIAccess: true, + KeepCache: 1, } for i := range cmd.inputs { err = runner.TranslatePaths(&cmd.inputs[i]) @@ -102,21 +108,26 @@ func (cmd *merger) RunCommand(prog string, args []string, stdin io.Reader, stdou } defer outf.Close() if strings.HasSuffix(*outputFilename, ".gz") { - outw = gzip.NewWriter(outf) + outw = pgzip.NewWriter(outf) } else { - outw = outf + outw = nopCloser{outf} } } - cmd.output = outw + bufw := bufio.NewWriterSize(outw, 64*1024*1024) + cmd.output = bufw err = cmd.doMerge() if err != nil { return 1 } + err = bufw.Flush() + if err != nil { + return 1 + } err = outw.Close() if err != nil { return 1 } - if outf != nil && outf != outw { + if outf != nil { err = outf.Close() if err != nil { return 1 @@ -152,22 +163,21 @@ func (cmd *merger) doMerge() error { var wg sync.WaitGroup for _, input := range cmd.inputs { - var infile io.ReadCloser - if input == "-" { - infile = ioutil.NopCloser(cmd.stdin) - } else { + rdr := ioutil.NopCloser(cmd.stdin) + if input != "-" { var err error - infile, err = os.Open(input) + rdr, err = open(input) if err != nil { return err } - defer infile.Close() + defer rdr.Close() } + rdr = ioutil.NopCloser(bufio.NewReaderSize(rdr, 8*1024*1024)) wg.Add(1) go func(input string) { defer wg.Done() log.Printf("%s: reading", input) - err := cmd.tilelib.LoadGob(ctx, infile, strings.HasSuffix(input, ".gz"), nil) + err := cmd.tilelib.LoadGob(ctx, rdr, strings.HasSuffix(input, ".gz")) if err != nil { cmd.setError(fmt.Errorf("%s: load failed: %w", input, err)) cancel()