projects
/
lightning.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Change zygosity column info from het=1 to hom=1.
[lightning.git]
/
merge.go
diff --git
a/merge.go
b/merge.go
index 1807393b593b7d2c4e59ac3b138e3532b977391b..68560edbd0ae62af9fd203ba9aa6d2867a1fe883 100644
(file)
--- a/
merge.go
+++ b/
merge.go
@@
-1,4
+1,8
@@
-package main
+// Copyright (C) The Lightning Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package lightning
import (
"bufio"
import (
"bufio"
@@
-12,16
+16,18
@@
import (
"net/http"
_ "net/http/pprof"
"os"
"net/http"
_ "net/http/pprof"
"os"
+ "strings"
"sync"
"git.arvados.org/arvados.git/sdk/go/arvados"
"sync"
"git.arvados.org/arvados.git/sdk/go/arvados"
+ "github.com/klauspost/pgzip"
log "github.com/sirupsen/logrus"
)
type merger struct {
stdin io.Reader
inputs []string
log "github.com/sirupsen/logrus"
)
type merger struct {
stdin io.Reader
inputs []string
- output io.Write
Close
r
+ output io.Writer
tagSet [][]byte
tilelib *tileLibrary
mapped map[string]map[tileLibRef]tileVariantID
tagSet [][]byte
tilelib *tileLibrary
mapped map[string]map[tileLibRef]tileVariantID
@@
-65,12
+71,14
@@
func (cmd *merger) RunCommand(prog string, args []string, stdin io.Reader, stdou
return 1
}
runner := arvadosContainerRunner{
return 1
}
runner := arvadosContainerRunner{
- Name: "lightning
filter
",
+ Name: "lightning
merge
",
Client: arvados.NewClientFromEnv(),
ProjectUUID: *projectUUID,
Client: arvados.NewClientFromEnv(),
ProjectUUID: *projectUUID,
- RAM:
64
000000000,
- VCPUs:
2
,
+ RAM:
700
000000000,
+ VCPUs:
16
,
Priority: *priority,
Priority: *priority,
+ APIAccess: true,
+ KeepCache: 1,
}
for i := range cmd.inputs {
err = runner.TranslatePaths(&cmd.inputs[i])
}
for i := range cmd.inputs {
err = runner.TranslatePaths(&cmd.inputs[i])
@@
-79,35
+87,52
@@
func (cmd *merger) RunCommand(prog string, args []string, stdin io.Reader, stdou
}
}
runner.Args = append([]string{"merge", "-local=true",
}
}
runner.Args = append([]string{"merge", "-local=true",
- "-o", "/mnt/output/library.gob",
+ "-o", "/mnt/output/library.gob
.gz
",
}, cmd.inputs...)
var output string
output, err = runner.Run()
if err != nil {
return 1
}
}, cmd.inputs...)
var output string
output, err = runner.Run()
if err != nil {
return 1
}
- fmt.Fprintln(stdout, output+"/library.gob")
+ fmt.Fprintln(stdout, output+"/library.gob
.gz
")
return 0
}
return 0
}
+ var outf, outw io.WriteCloser
if *outputFilename == "-" {
if *outputFilename == "-" {
-
cmd.output
= nopCloser{stdout}
+
outw
= nopCloser{stdout}
} else {
} else {
-
cmd.output
, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
+
outf
, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
if err != nil {
return 1
}
if err != nil {
return 1
}
- defer cmd.output.Close()
+ defer outf.Close()
+ if strings.HasSuffix(*outputFilename, ".gz") {
+ outw = pgzip.NewWriter(outf)
+ } else {
+ outw = nopCloser{outf}
+ }
}
}
-
+ bufw := bufio.NewWriterSize(outw, 64*1024*1024)
+ cmd.output = bufw
err = cmd.doMerge()
if err != nil {
return 1
}
err = cmd.doMerge()
if err != nil {
return 1
}
- err =
cmd.output.Close
()
+ err =
bufw.Flush
()
if err != nil {
return 1
}
if err != nil {
return 1
}
+ err = outw.Close()
+ if err != nil {
+ return 1
+ }
+ if outf != nil {
+ err = outf.Close()
+ if err != nil {
+ return 1
+ }
+ }
return 0
}
return 0
}
@@
-138,33
+163,26
@@
func (cmd *merger) doMerge() error {
var wg sync.WaitGroup
for _, input := range cmd.inputs {
var wg sync.WaitGroup
for _, input := range cmd.inputs {
- var infile io.ReadCloser
- if input == "-" {
- infile = ioutil.NopCloser(cmd.stdin)
- } else {
+ rdr := ioutil.NopCloser(cmd.stdin)
+ if input != "-" {
var err error
var err error
-
infile, err = os.O
pen(input)
+
rdr, err = o
pen(input)
if err != nil {
return err
}
if err != nil {
return err
}
- defer
infile
.Close()
+ defer
rdr
.Close()
}
}
+ rdr = ioutil.NopCloser(bufio.NewReaderSize(rdr, 8*1024*1024))
wg.Add(1)
go func(input string) {
defer wg.Done()
log.Printf("%s: reading", input)
wg.Add(1)
go func(input string) {
defer wg.Done()
log.Printf("%s: reading", input)
- err := cmd.tilelib.LoadGob(ctx,
infile, nil
)
+ err := cmd.tilelib.LoadGob(ctx,
rdr, strings.HasSuffix(input, ".gz")
)
if err != nil {
cmd.setError(fmt.Errorf("%s: load failed: %w", input, err))
cancel()
return
}
if err != nil {
cmd.setError(fmt.Errorf("%s: load failed: %w", input, err))
cancel()
return
}
- err = infile.Close()
- if err != nil {
- cmd.setError(fmt.Errorf("%s: error closing input file: %w", input, err))
- cancel()
- return
- }
log.Printf("%s: done", input)
}(input)
}
log.Printf("%s: done", input)
}(input)
}