-package main
+// Copyright (C) The Lightning Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package lightning
import (
"bufio"
- "compress/gzip"
"context"
"encoding/gob"
"errors"
"sync"
"git.arvados.org/arvados.git/sdk/go/arvados"
+ "github.com/klauspost/pgzip"
log "github.com/sirupsen/logrus"
)
type merger struct {
stdin io.Reader
inputs []string
- output io.WriteCloser
+ output io.Writer
tagSet [][]byte
tilelib *tileLibrary
mapped map[string]map[tileLibRef]tileVariantID
Name: "lightning merge",
Client: arvados.NewClientFromEnv(),
ProjectUUID: *projectUUID,
- RAM: 150000000000,
- VCPUs: 2,
+ RAM: 700000000000,
+ VCPUs: 16,
Priority: *priority,
APIAccess: true,
+ KeepCache: 1,
}
for i := range cmd.inputs {
err = runner.TranslatePaths(&cmd.inputs[i])
}
defer outf.Close()
if strings.HasSuffix(*outputFilename, ".gz") {
- outw = gzip.NewWriter(outf)
+ outw = pgzip.NewWriter(outf)
} else {
- outw = outf
+ outw = nopCloser{outf}
}
}
- cmd.output = outw
+ bufw := bufio.NewWriterSize(outw, 64*1024*1024)
+ cmd.output = bufw
err = cmd.doMerge()
if err != nil {
return 1
}
+ err = bufw.Flush()
+ if err != nil {
+ return 1
+ }
err = outw.Close()
if err != nil {
return 1
}
- if outf != nil && outf != outw {
+ if outf != nil {
err = outf.Close()
if err != nil {
return 1
go func(input string) {
defer wg.Done()
log.Printf("%s: reading", input)
- err := cmd.tilelib.LoadGob(ctx, rdr, strings.HasSuffix(input, ".gz"), nil)
+ err := cmd.tilelib.LoadGob(ctx, rdr, strings.HasSuffix(input, ".gz"))
if err != nil {
cmd.setError(fmt.Errorf("%s: load failed: %w", input, err))
cancel()