17 "git.arvados.org/arvados.git/sdk/go/arvados"
18 log "github.com/sirupsen/logrus"
27 mapped map[string]map[tileLibRef]tileVariantID
32 func (cmd *merger) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
36 fmt.Fprintf(stderr, "%s\n", err)
39 flags := flag.NewFlagSet("", flag.ContinueOnError)
40 flags.SetOutput(stderr)
41 pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
42 runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
43 projectUUID := flags.String("project", "", "project `UUID` for output data")
44 priority := flags.Int("priority", 500, "container request priority")
45 outputFilename := flags.String("o", "-", "output `file`")
46 err = flags.Parse(args)
47 if err == flag.ErrHelp {
50 } else if err != nil {
54 cmd.inputs = flags.Args()
58 log.Println(http.ListenAndServe(*pprof, nil))
63 if *outputFilename != "-" {
64 err = errors.New("cannot specify output file in container mode: not implemented")
67 runner := arvadosContainerRunner{
68 Name: "lightning filter",
69 Client: arvados.NewClientFromEnv(),
70 ProjectUUID: *projectUUID,
75 for i := range cmd.inputs {
76 err = runner.TranslatePaths(&cmd.inputs[i])
81 runner.Args = append([]string{"merge", "-local=true",
82 "-o", "/mnt/output/library.gob",
85 output, err = runner.Run()
89 fmt.Fprintln(stdout, output+"/library.gob")
93 if *outputFilename == "-" {
94 cmd.output = nopCloser{stdout}
96 cmd.output, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
100 defer cmd.output.Close()
107 err = cmd.output.Close()
114 func (cmd *merger) setError(err error) {
116 case cmd.errs <- err:
121 func (cmd *merger) doMerge() error {
122 w := bufio.NewWriter(cmd.output)
123 encoder := gob.NewEncoder(w)
125 ctx, cancel := context.WithCancel(context.Background())
128 cmd.errs = make(chan error, 1)
129 cmd.tilelib = &tileLibrary{
131 includeNoCalls: true,
134 cmd.mapped = map[string]map[tileLibRef]tileVariantID{}
135 for _, input := range cmd.inputs {
136 cmd.mapped[input] = map[tileLibRef]tileVariantID{}
139 var wg sync.WaitGroup
140 for _, input := range cmd.inputs {
141 var infile io.ReadCloser
143 infile = ioutil.NopCloser(cmd.stdin)
146 infile, err = os.Open(input)
153 go func(input string) {
155 log.Printf("%s: reading", input)
156 err := cmd.tilelib.LoadGob(ctx, infile, nil)
158 cmd.setError(fmt.Errorf("%s: load failed: %w", input, err))
164 cmd.setError(fmt.Errorf("%s: error closing input file: %w", input, err))
168 log.Printf("%s: done", input)
173 if err := <-cmd.errs; err != nil {
176 log.Print("flushing")