1 // Copyright (C) The Lightning Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
22 "git.arvados.org/arvados.git/sdk/go/arvados"
23 "github.com/klauspost/pgzip"
24 log "github.com/sirupsen/logrus"
33 mapped map[string]map[tileLibRef]tileVariantID
38 func (cmd *merger) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
42 fmt.Fprintf(stderr, "%s\n", err)
45 flags := flag.NewFlagSet("", flag.ContinueOnError)
46 flags.SetOutput(stderr)
47 pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
48 runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
49 projectUUID := flags.String("project", "", "project `UUID` for output data")
50 priority := flags.Int("priority", 500, "container request priority")
51 outputFilename := flags.String("o", "-", "output `file`")
52 err = flags.Parse(args)
53 if err == flag.ErrHelp {
56 } else if err != nil {
60 cmd.inputs = flags.Args()
64 log.Println(http.ListenAndServe(*pprof, nil))
69 if *outputFilename != "-" {
70 err = errors.New("cannot specify output file in container mode: not implemented")
73 runner := arvadosContainerRunner{
74 Name: "lightning merge",
75 Client: arvados.NewClientFromEnv(),
76 ProjectUUID: *projectUUID,
83 for i := range cmd.inputs {
84 err = runner.TranslatePaths(&cmd.inputs[i])
89 runner.Args = append([]string{"merge", "-local=true",
90 "-o", "/mnt/output/library.gob.gz",
93 output, err = runner.Run()
97 fmt.Fprintln(stdout, output+"/library.gob.gz")
101 var outf, outw io.WriteCloser
102 if *outputFilename == "-" {
103 outw = nopCloser{stdout}
105 outf, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
110 if strings.HasSuffix(*outputFilename, ".gz") {
111 outw = pgzip.NewWriter(outf)
113 outw = nopCloser{outf}
116 bufw := bufio.NewWriterSize(outw, 64*1024*1024)
139 func (cmd *merger) setError(err error) {
141 case cmd.errs <- err:
146 func (cmd *merger) doMerge() error {
147 w := bufio.NewWriter(cmd.output)
148 encoder := gob.NewEncoder(w)
150 ctx, cancel := context.WithCancel(context.Background())
153 cmd.errs = make(chan error, 1)
154 cmd.tilelib = &tileLibrary{
159 cmd.mapped = map[string]map[tileLibRef]tileVariantID{}
160 for _, input := range cmd.inputs {
161 cmd.mapped[input] = map[tileLibRef]tileVariantID{}
164 var wg sync.WaitGroup
165 for _, input := range cmd.inputs {
166 rdr := ioutil.NopCloser(cmd.stdin)
169 rdr, err = open(input)
175 rdr = ioutil.NopCloser(bufio.NewReaderSize(rdr, 8*1024*1024))
177 go func(input string) {
179 log.Printf("%s: reading", input)
180 err := cmd.tilelib.LoadGob(ctx, rdr, strings.HasSuffix(input, ".gz"))
182 cmd.setError(fmt.Errorf("%s: load failed: %w", input, err))
186 log.Printf("%s: done", input)
191 if err := <-cmd.errs; err != nil {
194 log.Print("flushing")