15050: Limit number of containers that crunch-dispatch-local runs at a time
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Fri, 29 Mar 2019 18:06:22 +0000 (14:06 -0400)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Fri, 29 Mar 2019 18:06:22 +0000 (14:06 -0400)
So I can do some testing with submitting a huge number of containers
and not melt my machine.

Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

services/crunch-dispatch-local/crunch-dispatch-local.go
services/crunch-dispatch-local/crunch-dispatch-local_test.go

index dcd54e8968e930f1cdb390aa9b0e5c40182c3bdb..36d149272ff5de2989a0defe7dcd6ec5a9b92eab 100644 (file)
@@ -85,14 +85,15 @@ func doMain() error {
        }
        arv.Retries = 25
 
+       ctx, cancel := context.WithCancel(context.Background())
+
        dispatcher := dispatch.Dispatcher{
                Logger:       logger,
                Arv:          arv,
-               RunContainer: run,
+               RunContainer: (&LocalRun{startFunc, make(chan bool, 8), ctx}).run,
                PollPeriod:   time.Duration(*pollInterval) * time.Second,
        }
 
-       ctx, cancel := context.WithCancel(context.Background())
        err = dispatcher.Run(ctx)
        if err != nil {
                return err
@@ -123,7 +124,11 @@ func startFunc(container arvados.Container, cmd *exec.Cmd) error {
        return cmd.Start()
 }
 
-var startCmd = startFunc
+type LocalRun struct {
+       startCmd         func(container arvados.Container, cmd *exec.Cmd) error
+       concurrencyLimit chan bool
+       ctx              context.Context
+}
 
 // Run a container.
 //
@@ -133,13 +138,21 @@ var startCmd = startFunc
 //
 // If the container is in any other state, or is not Complete/Cancelled after
 // crunch-run terminates, mark the container as Cancelled.
-func run(dispatcher *dispatch.Dispatcher,
+func (lr *LocalRun) run(dispatcher *dispatch.Dispatcher,
        container arvados.Container,
        status <-chan arvados.Container) {
 
        uuid := container.UUID
 
        if container.State == dispatch.Locked {
+
+               select {
+               case lr.concurrencyLimit <- true:
+                       break
+               case <-lr.ctx.Done():
+                       return
+               }
+
                waitGroup.Add(1)
 
                cmd := exec.Command(*crunchRunCommand, uuid)
@@ -153,7 +166,7 @@ func run(dispatcher *dispatch.Dispatcher,
                // succeed in starting crunch-run.
 
                runningCmdsMutex.Lock()
-               if err := startCmd(container, cmd); err != nil {
+               if err := lr.startCmd(container, cmd); err != nil {
                        runningCmdsMutex.Unlock()
                        dispatcher.Logger.Warnf("error starting %q for %s: %s", *crunchRunCommand, uuid, err)
                        dispatcher.UpdateState(uuid, dispatch.Cancelled)
index 6bae1f40997a8a824284390a18c2da8df8568cdb..41357403f0a01c9092e2ee7503e13943ba4c2cd3 100644 (file)
@@ -73,18 +73,19 @@ func (s *TestSuite) TestIntegration(c *C) {
        dispatcher := dispatch.Dispatcher{
                Arv:        arv,
                PollPeriod: time.Second,
-               RunContainer: func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) {
-                       run(d, c, s)
-                       cancel()
-               },
        }
 
-       startCmd = func(container arvados.Container, cmd *exec.Cmd) error {
+       startCmd := func(container arvados.Container, cmd *exec.Cmd) error {
                dispatcher.UpdateState(container.UUID, "Running")
                dispatcher.UpdateState(container.UUID, "Complete")
                return cmd.Start()
        }
 
+       dispatcher.RunContainer = func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) {
+               (&LocalRun{startCmd, make(chan bool, 8), ctx}).run(d, c, s)
+               cancel()
+       }
+
        err = dispatcher.Run(ctx)
        c.Assert(err, Equals, context.Canceled)
 
@@ -175,18 +176,19 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
        dispatcher := dispatch.Dispatcher{
                Arv:        arv,
                PollPeriod: time.Second / 20,
-               RunContainer: func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) {
-                       run(d, c, s)
-                       cancel()
-               },
        }
 
-       startCmd = func(container arvados.Container, cmd *exec.Cmd) error {
+       startCmd := func(container arvados.Container, cmd *exec.Cmd) error {
                dispatcher.UpdateState(container.UUID, "Running")
                dispatcher.UpdateState(container.UUID, "Complete")
                return cmd.Start()
        }
 
+       dispatcher.RunContainer = func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) {
+               (&LocalRun{startCmd, make(chan bool, 8), ctx}).run(d, c, s)
+               cancel()
+       }
+
        re := regexp.MustCompile(`(?ms).*` + expected + `.*`)
        go func() {
                for i := 0; i < 80 && !re.MatchString(buf.String()); i++ {