// Copyright (C) The Lightning Authors. All rights reserved. // // SPDX-License-Identifier: AGPL-3.0 package lightning import ( "bufio" "context" "encoding/gob" "errors" "flag" "fmt" "io" "io/ioutil" "net/http" _ "net/http/pprof" "os" "strings" "sync" "git.arvados.org/arvados.git/sdk/go/arvados" "github.com/klauspost/pgzip" log "github.com/sirupsen/logrus" ) type merger struct { stdin io.Reader inputs []string output io.Writer tagSet [][]byte tilelib *tileLibrary mapped map[string]map[tileLibRef]tileVariantID mtxTags sync.Mutex errs chan error } func (cmd *merger) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { var err error defer func() { if err != nil { fmt.Fprintf(stderr, "%s\n", err) } }() flags := flag.NewFlagSet("", flag.ContinueOnError) flags.SetOutput(stderr) pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`") runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)") projectUUID := flags.String("project", "", "project `UUID` for output data") priority := flags.Int("priority", 500, "container request priority") outputFilename := flags.String("o", "-", "output `file`") err = flags.Parse(args) if err == flag.ErrHelp { err = nil return 0 } else if err != nil { return 2 } cmd.stdin = stdin cmd.inputs = flags.Args() if *pprof != "" { go func() { log.Println(http.ListenAndServe(*pprof, nil)) }() } if !*runlocal { if *outputFilename != "-" { err = errors.New("cannot specify output file in container mode: not implemented") return 1 } runner := arvadosContainerRunner{ Name: "lightning merge", Client: arvados.NewClientFromEnv(), ProjectUUID: *projectUUID, RAM: 700000000000, VCPUs: 16, Priority: *priority, APIAccess: true, KeepCache: 1, } for i := range cmd.inputs { err = runner.TranslatePaths(&cmd.inputs[i]) if err != nil { return 1 } } runner.Args = append([]string{"merge", "-local=true", "-o", "/mnt/output/library.gob.gz", }, cmd.inputs...) var output string output, err = runner.Run() if err != nil { return 1 } fmt.Fprintln(stdout, output+"/library.gob.gz") return 0 } var outf, outw io.WriteCloser if *outputFilename == "-" { outw = nopCloser{stdout} } else { outf, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777) if err != nil { return 1 } defer outf.Close() if strings.HasSuffix(*outputFilename, ".gz") { outw = pgzip.NewWriter(outf) } else { outw = nopCloser{outf} } } bufw := bufio.NewWriterSize(outw, 64*1024*1024) cmd.output = bufw err = cmd.doMerge() if err != nil { return 1 } err = bufw.Flush() if err != nil { return 1 } err = outw.Close() if err != nil { return 1 } if outf != nil { err = outf.Close() if err != nil { return 1 } } return 0 } func (cmd *merger) setError(err error) { select { case cmd.errs <- err: default: } } func (cmd *merger) doMerge() error { w := bufio.NewWriter(cmd.output) encoder := gob.NewEncoder(w) ctx, cancel := context.WithCancel(context.Background()) defer cancel() cmd.errs = make(chan error, 1) cmd.tilelib = &tileLibrary{ encoder: encoder, retainNoCalls: true, } cmd.mapped = map[string]map[tileLibRef]tileVariantID{} for _, input := range cmd.inputs { cmd.mapped[input] = map[tileLibRef]tileVariantID{} } var wg sync.WaitGroup for _, input := range cmd.inputs { rdr := ioutil.NopCloser(cmd.stdin) if input != "-" { var err error rdr, err = open(input) if err != nil { return err } defer rdr.Close() } rdr = ioutil.NopCloser(bufio.NewReaderSize(rdr, 8*1024*1024)) wg.Add(1) go func(input string) { defer wg.Done() log.Printf("%s: reading", input) err := cmd.tilelib.LoadGob(ctx, rdr, strings.HasSuffix(input, ".gz")) if err != nil { cmd.setError(fmt.Errorf("%s: load failed: %w", input, err)) cancel() return } log.Printf("%s: done", input) }(input) } wg.Wait() go close(cmd.errs) if err := <-cmd.errs; err != nil { return err } log.Print("flushing") err := w.Flush() if err != nil { return err } return nil }