18 "git.arvados.org/arvados.git/sdk/go/arvados"
19 "github.com/klauspost/pgzip"
20 log "github.com/sirupsen/logrus"
29 mapped map[string]map[tileLibRef]tileVariantID
34 func (cmd *merger) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
38 fmt.Fprintf(stderr, "%s\n", err)
41 flags := flag.NewFlagSet("", flag.ContinueOnError)
42 flags.SetOutput(stderr)
43 pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
44 runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
45 projectUUID := flags.String("project", "", "project `UUID` for output data")
46 priority := flags.Int("priority", 500, "container request priority")
47 outputFilename := flags.String("o", "-", "output `file`")
48 err = flags.Parse(args)
49 if err == flag.ErrHelp {
52 } else if err != nil {
56 cmd.inputs = flags.Args()
60 log.Println(http.ListenAndServe(*pprof, nil))
65 if *outputFilename != "-" {
66 err = errors.New("cannot specify output file in container mode: not implemented")
69 runner := arvadosContainerRunner{
70 Name: "lightning merge",
71 Client: arvados.NewClientFromEnv(),
72 ProjectUUID: *projectUUID,
78 for i := range cmd.inputs {
79 err = runner.TranslatePaths(&cmd.inputs[i])
84 runner.Args = append([]string{"merge", "-local=true",
85 "-o", "/mnt/output/library.gob.gz",
88 output, err = runner.Run()
92 fmt.Fprintln(stdout, output+"/library.gob.gz")
96 var outf, outw io.WriteCloser
97 if *outputFilename == "-" {
98 outw = nopCloser{stdout}
100 outf, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
105 if strings.HasSuffix(*outputFilename, ".gz") {
106 outw = pgzip.NewWriter(outf)
108 outw = nopCloser{outf}
111 bufw := bufio.NewWriterSize(outw, 64*1024*1024)
134 func (cmd *merger) setError(err error) {
136 case cmd.errs <- err:
141 func (cmd *merger) doMerge() error {
142 w := bufio.NewWriter(cmd.output)
143 encoder := gob.NewEncoder(w)
145 ctx, cancel := context.WithCancel(context.Background())
148 cmd.errs = make(chan error, 1)
149 cmd.tilelib = &tileLibrary{
154 cmd.mapped = map[string]map[tileLibRef]tileVariantID{}
155 for _, input := range cmd.inputs {
156 cmd.mapped[input] = map[tileLibRef]tileVariantID{}
159 var wg sync.WaitGroup
160 for _, input := range cmd.inputs {
161 rdr := ioutil.NopCloser(cmd.stdin)
164 rdr, err = open(input)
170 rdr = ioutil.NopCloser(bufio.NewReaderSize(rdr, 8*1024*1024))
172 go func(input string) {
174 log.Printf("%s: reading", input)
175 err := cmd.tilelib.LoadGob(ctx, rdr, strings.HasSuffix(input, ".gz"), nil)
177 cmd.setError(fmt.Errorf("%s: load failed: %w", input, err))
181 log.Printf("%s: done", input)
186 if err := <-cmd.errs; err != nil {
189 log.Print("flushing")