Faster merge output.
authorTom Clegg <tom@tomclegg.ca>
Mon, 14 Dec 2020 21:00:37 +0000 (16:00 -0500)
committerTom Clegg <tom@tomclegg.ca>
Mon, 14 Dec 2020 21:00:37 +0000 (16:00 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

merge.go

index 968923711f3353fd0a37d36626a886446244ca93..4198ea88b73d69b99d366320d2a987387a2506f4 100644 (file)
--- a/merge.go
+++ b/merge.go
@@ -2,7 +2,6 @@ package main
 
 import (
        "bufio"
-       "compress/gzip"
        "context"
        "encoding/gob"
        "errors"
@@ -17,13 +16,14 @@ import (
        "sync"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
+       "github.com/klauspost/pgzip"
        log "github.com/sirupsen/logrus"
 )
 
 type merger struct {
        stdin   io.Reader
        inputs  []string
-       output  io.WriteCloser
+       output  io.Writer
        tagSet  [][]byte
        tilelib *tileLibrary
        mapped  map[string]map[tileLibRef]tileVariantID
@@ -71,7 +71,7 @@ func (cmd *merger) RunCommand(prog string, args []string, stdin io.Reader, stdou
                        Client:      arvados.NewClientFromEnv(),
                        ProjectUUID: *projectUUID,
                        RAM:         150000000000,
-                       VCPUs:       2,
+                       VCPUs:       16,
                        Priority:    *priority,
                        APIAccess:   true,
                }
@@ -103,21 +103,26 @@ func (cmd *merger) RunCommand(prog string, args []string, stdin io.Reader, stdou
                }
                defer outf.Close()
                if strings.HasSuffix(*outputFilename, ".gz") {
-                       outw = gzip.NewWriter(outf)
+                       outw = pgzip.NewWriter(outf)
                } else {
-                       outw = outf
+                       outw = nopCloser{outf}
                }
        }
-       cmd.output = outw
+       bufw := bufio.NewWriterSize(outw, 64*1024*1024)
+       cmd.output = bufw
        err = cmd.doMerge()
        if err != nil {
                return 1
        }
+       err = bufw.Flush()
+       if err != nil {
+               return 1
+       }
        err = outw.Close()
        if err != nil {
                return 1
        }
-       if outf != nil && outf != outw {
+       if outf != nil {
                err = outf.Close()
                if err != nil {
                        return 1