}
arv.Retries = 25
+ ctx, cancel := context.WithCancel(context.Background())
+
dispatcher := dispatch.Dispatcher{
Logger: logger,
Arv: arv,
- RunContainer: run,
+ RunContainer: (&LocalRun{startFunc, make(chan bool, 8), ctx}).run,
PollPeriod: time.Duration(*pollInterval) * time.Second,
}
- ctx, cancel := context.WithCancel(context.Background())
err = dispatcher.Run(ctx)
if err != nil {
return err
return cmd.Start()
}
-var startCmd = startFunc
+type LocalRun struct {
+ startCmd func(container arvados.Container, cmd *exec.Cmd) error
+ concurrencyLimit chan bool
+ ctx context.Context
+}
// Run a container.
//
//
// If the container is in any other state, or is not Complete/Cancelled after
// crunch-run terminates, mark the container as Cancelled.
-func run(dispatcher *dispatch.Dispatcher,
+func (lr *LocalRun) run(dispatcher *dispatch.Dispatcher,
container arvados.Container,
status <-chan arvados.Container) {
uuid := container.UUID
if container.State == dispatch.Locked {
+
+ select {
+ case lr.concurrencyLimit <- true:
+ break
+ case <-lr.ctx.Done():
+ return
+ }
+
waitGroup.Add(1)
cmd := exec.Command(*crunchRunCommand, uuid)
// succeed in starting crunch-run.
runningCmdsMutex.Lock()
- if err := startCmd(container, cmd); err != nil {
+ if err := lr.startCmd(container, cmd); err != nil {
runningCmdsMutex.Unlock()
dispatcher.Logger.Warnf("error starting %q for %s: %s", *crunchRunCommand, uuid, err)
dispatcher.UpdateState(uuid, dispatch.Cancelled)
dispatcher := dispatch.Dispatcher{
Arv: arv,
PollPeriod: time.Second,
- RunContainer: func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) {
- run(d, c, s)
- cancel()
- },
}
- startCmd = func(container arvados.Container, cmd *exec.Cmd) error {
+ startCmd := func(container arvados.Container, cmd *exec.Cmd) error {
dispatcher.UpdateState(container.UUID, "Running")
dispatcher.UpdateState(container.UUID, "Complete")
return cmd.Start()
}
+ dispatcher.RunContainer = func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) {
+ (&LocalRun{startCmd, make(chan bool, 8), ctx}).run(d, c, s)
+ cancel()
+ }
+
err = dispatcher.Run(ctx)
c.Assert(err, Equals, context.Canceled)
dispatcher := dispatch.Dispatcher{
Arv: arv,
PollPeriod: time.Second / 20,
- RunContainer: func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) {
- run(d, c, s)
- cancel()
- },
}
- startCmd = func(container arvados.Container, cmd *exec.Cmd) error {
+ startCmd := func(container arvados.Container, cmd *exec.Cmd) error {
dispatcher.UpdateState(container.UUID, "Running")
dispatcher.UpdateState(container.UUID, "Complete")
return cmd.Start()
}
+ dispatcher.RunContainer = func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) {
+ (&LocalRun{startCmd, make(chan bool, 8), ctx}).run(d, c, s)
+ cancel()
+ }
+
re := regexp.MustCompile(`(?ms).*` + expected + `.*`)
go func() {
for i := 0; i < 80 && !re.MatchString(buf.String()); i++ {