15924: Change import paths to git.arvados.org.
[arvados.git] / services / crunch-dispatch-local / crunch-dispatch-local.go
index fc10393626be103c17b01b5b1bfde615ed470bc9..2922817b557ccef70fa25f32c66b8447575abb5d 100644 (file)
@@ -17,10 +17,10 @@ import (
        "syscall"
        "time"
 
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/dispatch"
-       "github.com/Sirupsen/logrus"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+       "git.arvados.org/arvados.git/sdk/go/dispatch"
+       "github.com/sirupsen/logrus"
 )
 
 var version = "dev"
@@ -85,14 +85,15 @@ func doMain() error {
        }
        arv.Retries = 25
 
+       ctx, cancel := context.WithCancel(context.Background())
+
        dispatcher := dispatch.Dispatcher{
                Logger:       logger,
                Arv:          arv,
-               RunContainer: run,
+               RunContainer: (&LocalRun{startFunc, make(chan bool, 8), ctx}).run,
                PollPeriod:   time.Duration(*pollInterval) * time.Second,
        }
 
-       ctx, cancel := context.WithCancel(context.Background())
        err = dispatcher.Run(ctx)
        if err != nil {
                return err
@@ -123,7 +124,11 @@ func startFunc(container arvados.Container, cmd *exec.Cmd) error {
        return cmd.Start()
 }
 
-var startCmd = startFunc
+type LocalRun struct {
+       startCmd         func(container arvados.Container, cmd *exec.Cmd) error
+       concurrencyLimit chan bool
+       ctx              context.Context
+}
 
 // Run a container.
 //
@@ -133,14 +138,36 @@ var startCmd = startFunc
 //
 // If the container is in any other state, or is not Complete/Cancelled after
 // crunch-run terminates, mark the container as Cancelled.
-func run(dispatcher *dispatch.Dispatcher,
+func (lr *LocalRun) run(dispatcher *dispatch.Dispatcher,
        container arvados.Container,
        status <-chan arvados.Container) {
 
        uuid := container.UUID
 
        if container.State == dispatch.Locked {
+
+               select {
+               case lr.concurrencyLimit <- true:
+                       break
+               case <-lr.ctx.Done():
+                       return
+               }
+
+               defer func() { <-lr.concurrencyLimit }()
+
+               select {
+               case c := <-status:
+                       // Check for state updates after possibly
+                       // waiting to be ready-to-run
+                       if c.Priority == 0 {
+                               goto Finish
+                       }
+               default:
+                       break
+               }
+
                waitGroup.Add(1)
+               defer waitGroup.Done()
 
                cmd := exec.Command(*crunchRunCommand, uuid)
                cmd.Stdin = nil
@@ -153,7 +180,7 @@ func run(dispatcher *dispatch.Dispatcher,
                // succeed in starting crunch-run.
 
                runningCmdsMutex.Lock()
-               if err := startCmd(container, cmd); err != nil {
+               if err := lr.startCmd(container, cmd); err != nil {
                        runningCmdsMutex.Unlock()
                        dispatcher.Logger.Warnf("error starting %q for %s: %s", *crunchRunCommand, uuid, err)
                        dispatcher.UpdateState(uuid, dispatch.Cancelled)
@@ -194,9 +221,10 @@ func run(dispatcher *dispatch.Dispatcher,
                        delete(runningCmds, uuid)
                        runningCmdsMutex.Unlock()
                }
-               waitGroup.Done()
        }
 
+Finish:
+
        // If the container is not finalized, then change it to "Cancelled".
        err := dispatcher.Arv.Get("containers", uuid, nil, &container)
        if err != nil {