Fix some tests.
[lightning.git] / merge.go
index 1807393b593b7d2c4e59ac3b138e3532b977391b..68560edbd0ae62af9fd203ba9aa6d2867a1fe883 100644 (file)
--- a/merge.go
+++ b/merge.go
@@ -1,4 +1,8 @@
-package main
+// Copyright (C) The Lightning Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package lightning
 
 import (
        "bufio"
@@ -12,16 +16,18 @@ import (
        "net/http"
        _ "net/http/pprof"
        "os"
+       "strings"
        "sync"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
+       "github.com/klauspost/pgzip"
        log "github.com/sirupsen/logrus"
 )
 
 type merger struct {
        stdin   io.Reader
        inputs  []string
-       output  io.WriteCloser
+       output  io.Writer
        tagSet  [][]byte
        tilelib *tileLibrary
        mapped  map[string]map[tileLibRef]tileVariantID
@@ -65,12 +71,14 @@ func (cmd *merger) RunCommand(prog string, args []string, stdin io.Reader, stdou
                        return 1
                }
                runner := arvadosContainerRunner{
-                       Name:        "lightning filter",
+                       Name:        "lightning merge",
                        Client:      arvados.NewClientFromEnv(),
                        ProjectUUID: *projectUUID,
-                       RAM:         64000000000,
-                       VCPUs:       2,
+                       RAM:         700000000000,
+                       VCPUs:       16,
                        Priority:    *priority,
+                       APIAccess:   true,
+                       KeepCache:   1,
                }
                for i := range cmd.inputs {
                        err = runner.TranslatePaths(&cmd.inputs[i])
@@ -79,35 +87,52 @@ func (cmd *merger) RunCommand(prog string, args []string, stdin io.Reader, stdou
                        }
                }
                runner.Args = append([]string{"merge", "-local=true",
-                       "-o", "/mnt/output/library.gob",
+                       "-o", "/mnt/output/library.gob.gz",
                }, cmd.inputs...)
                var output string
                output, err = runner.Run()
                if err != nil {
                        return 1
                }
-               fmt.Fprintln(stdout, output+"/library.gob")
+               fmt.Fprintln(stdout, output+"/library.gob.gz")
                return 0
        }
 
+       var outf, outw io.WriteCloser
        if *outputFilename == "-" {
-               cmd.output = nopCloser{stdout}
+               outw = nopCloser{stdout}
        } else {
-               cmd.output, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
+               outf, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
                if err != nil {
                        return 1
                }
-               defer cmd.output.Close()
+               defer outf.Close()
+               if strings.HasSuffix(*outputFilename, ".gz") {
+                       outw = pgzip.NewWriter(outf)
+               } else {
+                       outw = nopCloser{outf}
+               }
        }
-
+       bufw := bufio.NewWriterSize(outw, 64*1024*1024)
+       cmd.output = bufw
        err = cmd.doMerge()
        if err != nil {
                return 1
        }
-       err = cmd.output.Close()
+       err = bufw.Flush()
        if err != nil {
                return 1
        }
+       err = outw.Close()
+       if err != nil {
+               return 1
+       }
+       if outf != nil {
+               err = outf.Close()
+               if err != nil {
+                       return 1
+               }
+       }
        return 0
 }
 
@@ -138,33 +163,26 @@ func (cmd *merger) doMerge() error {
 
        var wg sync.WaitGroup
        for _, input := range cmd.inputs {
-               var infile io.ReadCloser
-               if input == "-" {
-                       infile = ioutil.NopCloser(cmd.stdin)
-               } else {
+               rdr := ioutil.NopCloser(cmd.stdin)
+               if input != "-" {
                        var err error
-                       infile, err = os.Open(input)
+                       rdr, err = open(input)
                        if err != nil {
                                return err
                        }
-                       defer infile.Close()
+                       defer rdr.Close()
                }
+               rdr = ioutil.NopCloser(bufio.NewReaderSize(rdr, 8*1024*1024))
                wg.Add(1)
                go func(input string) {
                        defer wg.Done()
                        log.Printf("%s: reading", input)
-                       err := cmd.tilelib.LoadGob(ctx, infile, nil)
+                       err := cmd.tilelib.LoadGob(ctx, rdr, strings.HasSuffix(input, ".gz"))
                        if err != nil {
                                cmd.setError(fmt.Errorf("%s: load failed: %w", input, err))
                                cancel()
                                return
                        }
-                       err = infile.Close()
-                       if err != nil {
-                               cmd.setError(fmt.Errorf("%s: error closing input file: %w", input, err))
-                               cancel()
-                               return
-                       }
                        log.Printf("%s: done", input)
                }(input)
        }