X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/1a76177ea6911896e6e6196f5975aefc2988950a..76fbc75a359348e2a91546a70b8d2c738865cce2:/merge.go diff --git a/merge.go b/merge.go index a55f47228d..904db8e4d2 100644 --- a/merge.go +++ b/merge.go @@ -2,6 +2,7 @@ package main import ( "bufio" + "compress/gzip" "context" "encoding/gob" "errors" @@ -12,6 +13,7 @@ import ( "net/http" _ "net/http/pprof" "os" + "strings" "sync" "git.arvados.org/arvados.git/sdk/go/arvados" @@ -65,7 +67,7 @@ func (cmd *merger) RunCommand(prog string, args []string, stdin io.Reader, stdou return 1 } runner := arvadosContainerRunner{ - Name: "lightning filter", + Name: "lightning merge", Client: arvados.NewClientFromEnv(), ProjectUUID: *projectUUID, RAM: 64000000000, @@ -79,35 +81,47 @@ func (cmd *merger) RunCommand(prog string, args []string, stdin io.Reader, stdou } } runner.Args = append([]string{"merge", "-local=true", - "-o", "/mnt/output/library.gob", + "-o", "/mnt/output/library.gob.gz", }, cmd.inputs...) var output string output, err = runner.Run() if err != nil { return 1 } - fmt.Fprintln(stdout, output+"/library.gob") + fmt.Fprintln(stdout, output+"/library.gob.gz") return 0 } + var outf, outw io.WriteCloser if *outputFilename == "-" { - cmd.output = nopCloser{stdout} + outw = nopCloser{stdout} } else { - cmd.output, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777) + outf, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777) if err != nil { return 1 } - defer cmd.output.Close() + defer outf.Close() + if strings.HasSuffix(*outputFilename, ".gz") { + outw = gzip.NewWriter(outf) + } else { + outw = outf + } } - + cmd.output = outw err = cmd.doMerge() if err != nil { return 1 } - err = cmd.output.Close() + err = outw.Close() if err != nil { return 1 } + if outf != nil && outf != outw { + err = outf.Close() + if err != nil { + return 1 + } + } return 0 } @@ -127,15 +141,8 @@ func (cmd *merger) doMerge() error { cmd.errs = make(chan error, 1) cmd.tilelib = &tileLibrary{ - encoder: encoder, - includeNoCalls: true, - onLoadGenome: func(cg CompactGenome) { - err := encoder.Encode(LibraryEntry{CompactGenomes: []CompactGenome{cg}}) - if err != nil { - cmd.setError(err) - cancel() - } - }, + encoder: encoder, + retainNoCalls: true, } cmd.mapped = map[string]map[tileLibRef]tileVariantID{} @@ -160,18 +167,12 @@ func (cmd *merger) doMerge() error { go func(input string) { defer wg.Done() log.Printf("%s: reading", input) - err := cmd.tilelib.LoadGob(ctx, infile, nil) + err := cmd.tilelib.LoadGob(ctx, infile, strings.HasSuffix(input, ".gz"), nil) if err != nil { cmd.setError(fmt.Errorf("%s: load failed: %w", input, err)) cancel() return } - err = infile.Close() - if err != nil { - cmd.setError(fmt.Errorf("%s: error closing input file: %w", input, err)) - cancel() - return - } log.Printf("%s: done", input) }(input) }