Fix some tests.
[lightning.git] / merge.go
index 968923711f3353fd0a37d36626a886446244ca93..68560edbd0ae62af9fd203ba9aa6d2867a1fe883 100644 (file)
--- a/merge.go
+++ b/merge.go
@@ -1,8 +1,11 @@
-package main
+// Copyright (C) The Lightning Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package lightning
 
 import (
        "bufio"
-       "compress/gzip"
        "context"
        "encoding/gob"
        "errors"
@@ -17,13 +20,14 @@ import (
        "sync"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
+       "github.com/klauspost/pgzip"
        log "github.com/sirupsen/logrus"
 )
 
 type merger struct {
        stdin   io.Reader
        inputs  []string
-       output  io.WriteCloser
+       output  io.Writer
        tagSet  [][]byte
        tilelib *tileLibrary
        mapped  map[string]map[tileLibRef]tileVariantID
@@ -70,10 +74,11 @@ func (cmd *merger) RunCommand(prog string, args []string, stdin io.Reader, stdou
                        Name:        "lightning merge",
                        Client:      arvados.NewClientFromEnv(),
                        ProjectUUID: *projectUUID,
-                       RAM:         150000000000,
-                       VCPUs:       2,
+                       RAM:         700000000000,
+                       VCPUs:       16,
                        Priority:    *priority,
                        APIAccess:   true,
+                       KeepCache:   1,
                }
                for i := range cmd.inputs {
                        err = runner.TranslatePaths(&cmd.inputs[i])
@@ -103,21 +108,26 @@ func (cmd *merger) RunCommand(prog string, args []string, stdin io.Reader, stdou
                }
                defer outf.Close()
                if strings.HasSuffix(*outputFilename, ".gz") {
-                       outw = gzip.NewWriter(outf)
+                       outw = pgzip.NewWriter(outf)
                } else {
-                       outw = outf
+                       outw = nopCloser{outf}
                }
        }
-       cmd.output = outw
+       bufw := bufio.NewWriterSize(outw, 64*1024*1024)
+       cmd.output = bufw
        err = cmd.doMerge()
        if err != nil {
                return 1
        }
+       err = bufw.Flush()
+       if err != nil {
+               return 1
+       }
        err = outw.Close()
        if err != nil {
                return 1
        }
-       if outf != nil && outf != outw {
+       if outf != nil {
                err = outf.Close()
                if err != nil {
                        return 1
@@ -167,7 +177,7 @@ func (cmd *merger) doMerge() error {
                go func(input string) {
                        defer wg.Done()
                        log.Printf("%s: reading", input)
-                       err := cmd.tilelib.LoadGob(ctx, rdr, strings.HasSuffix(input, ".gz"), nil)
+                       err := cmd.tilelib.LoadGob(ctx, rdr, strings.HasSuffix(input, ".gz"))
                        if err != nil {
                                cmd.setError(fmt.Errorf("%s: load failed: %w", input, err))
                                cancel()