From 1f2bdbd78e4dad9d831297c6ac280dc085733b5e Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Fri, 29 Mar 2019 14:06:22 -0400 Subject: [PATCH] 15050: Limit number of containers that crunch-dispatch-local runs at a time So I can do some testing with submitting a huge number of containers and not melt my machine. Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- .../crunch-dispatch-local.go | 23 +++++++++++++++---- .../crunch-dispatch-local_test.go | 22 ++++++++++-------- 2 files changed, 30 insertions(+), 15 deletions(-) diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go index dcd54e8968..36d149272f 100644 --- a/services/crunch-dispatch-local/crunch-dispatch-local.go +++ b/services/crunch-dispatch-local/crunch-dispatch-local.go @@ -85,14 +85,15 @@ func doMain() error { } arv.Retries = 25 + ctx, cancel := context.WithCancel(context.Background()) + dispatcher := dispatch.Dispatcher{ Logger: logger, Arv: arv, - RunContainer: run, + RunContainer: (&LocalRun{startFunc, make(chan bool, 8), ctx}).run, PollPeriod: time.Duration(*pollInterval) * time.Second, } - ctx, cancel := context.WithCancel(context.Background()) err = dispatcher.Run(ctx) if err != nil { return err @@ -123,7 +124,11 @@ func startFunc(container arvados.Container, cmd *exec.Cmd) error { return cmd.Start() } -var startCmd = startFunc +type LocalRun struct { + startCmd func(container arvados.Container, cmd *exec.Cmd) error + concurrencyLimit chan bool + ctx context.Context +} // Run a container. // @@ -133,13 +138,21 @@ var startCmd = startFunc // // If the container is in any other state, or is not Complete/Cancelled after // crunch-run terminates, mark the container as Cancelled. -func run(dispatcher *dispatch.Dispatcher, +func (lr *LocalRun) run(dispatcher *dispatch.Dispatcher, container arvados.Container, status <-chan arvados.Container) { uuid := container.UUID if container.State == dispatch.Locked { + + select { + case lr.concurrencyLimit <- true: + break + case <-lr.ctx.Done(): + return + } + waitGroup.Add(1) cmd := exec.Command(*crunchRunCommand, uuid) @@ -153,7 +166,7 @@ func run(dispatcher *dispatch.Dispatcher, // succeed in starting crunch-run. runningCmdsMutex.Lock() - if err := startCmd(container, cmd); err != nil { + if err := lr.startCmd(container, cmd); err != nil { runningCmdsMutex.Unlock() dispatcher.Logger.Warnf("error starting %q for %s: %s", *crunchRunCommand, uuid, err) dispatcher.UpdateState(uuid, dispatch.Cancelled) diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go index 6bae1f4099..41357403f0 100644 --- a/services/crunch-dispatch-local/crunch-dispatch-local_test.go +++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go @@ -73,18 +73,19 @@ func (s *TestSuite) TestIntegration(c *C) { dispatcher := dispatch.Dispatcher{ Arv: arv, PollPeriod: time.Second, - RunContainer: func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) { - run(d, c, s) - cancel() - }, } - startCmd = func(container arvados.Container, cmd *exec.Cmd) error { + startCmd := func(container arvados.Container, cmd *exec.Cmd) error { dispatcher.UpdateState(container.UUID, "Running") dispatcher.UpdateState(container.UUID, "Complete") return cmd.Start() } + dispatcher.RunContainer = func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) { + (&LocalRun{startCmd, make(chan bool, 8), ctx}).run(d, c, s) + cancel() + } + err = dispatcher.Run(ctx) c.Assert(err, Equals, context.Canceled) @@ -175,18 +176,19 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon dispatcher := dispatch.Dispatcher{ Arv: arv, PollPeriod: time.Second / 20, - RunContainer: func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) { - run(d, c, s) - cancel() - }, } - startCmd = func(container arvados.Container, cmd *exec.Cmd) error { + startCmd := func(container arvados.Container, cmd *exec.Cmd) error { dispatcher.UpdateState(container.UUID, "Running") dispatcher.UpdateState(container.UUID, "Complete") return cmd.Start() } + dispatcher.RunContainer = func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) { + (&LocalRun{startCmd, make(chan bool, 8), ctx}).run(d, c, s) + cancel() + } + re := regexp.MustCompile(`(?ms).*` + expected + `.*`) go func() { for i := 0; i < 80 && !re.MatchString(buf.String()); i++ { -- 2.30.2