14325: Start up immediately if there are no stale locks.
authorTom Clegg <tclegg@veritasgenetics.com>
Tue, 12 Feb 2019 19:17:10 +0000 (14:17 -0500)
committerTom Clegg <tclegg@veritasgenetics.com>
Tue, 12 Feb 2019 19:17:10 +0000 (14:17 -0500)
...instead of waiting for the pool to send a notification to trigger
the first loop iteration.

refs #14325

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

lib/dispatchcloud/dispatcher_test.go
lib/dispatchcloud/scheduler/fix_stale_locks.go

index 22e3425b6de34592948ea7f349c72b9c9bf06658..0558d79f1a5b95737ab61958e467a7ee73177df9 100644 (file)
@@ -156,7 +156,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
                c.Fatalf("timed out; still waiting for %d containers: %q", len(waiting), waiting)
        }
 
-       deadline := time.Now().Add(time.Second)
+       deadline := time.Now().Add(5 * time.Second)
        for range time.NewTicker(10 * time.Millisecond).C {
                insts, err := s.stubDriver.InstanceSets()[0].Instances(nil)
                c.Check(err, check.IsNil)
index 264f9e4ec6bbc3747401858a37c3f70b259116c1..4bd27021c675d1c8ce40753d131d0631041ea59c 100644 (file)
@@ -19,24 +19,15 @@ import (
 func (sch *Scheduler) fixStaleLocks() {
        wp := sch.pool.Subscribe()
        defer sch.pool.Unsubscribe(wp)
+
+       var stale []string
        timeout := time.NewTimer(sch.staleLockTimeout)
 waiting:
        for {
-               unlock := false
-               select {
-               case <-wp:
-                       // If all workers have been contacted, unlock
-                       // containers that aren't claimed by any
-                       // worker.
-                       unlock = sch.pool.CountWorkers()[worker.StateUnknown] == 0
-               case <-timeout.C:
-                       // Give up and unlock the containers, even
-                       // though they might be working.
-                       unlock = true
-               }
-
                running := sch.pool.Running()
                qEntries, _ := sch.queue.Entries()
+
+               stale = nil
                for uuid, ent := range qEntries {
                        if ent.Container.State != arvados.ContainerStateLocked {
                                continue
@@ -44,14 +35,30 @@ waiting:
                        if _, running := running[uuid]; running {
                                continue
                        }
-                       if !unlock {
-                               continue waiting
-                       }
-                       err := sch.queue.Unlock(uuid)
-                       if err != nil {
-                               sch.logger.Warnf("Unlock %s: %s", uuid, err)
+                       stale = append(stale, uuid)
+               }
+               if len(stale) == 0 {
+                       return
+               }
+
+               select {
+               case <-wp:
+                       // Stop waiting if all workers have been
+                       // contacted.
+                       if sch.pool.CountWorkers()[worker.StateUnknown] == 0 {
+                               break waiting
                        }
+               case <-timeout.C:
+                       // Give up.
+                       break waiting
+               }
+
+       }
+
+       for _, uuid := range stale {
+               err := sch.queue.Unlock(uuid)
+               if err != nil {
+                       sch.logger.Warnf("Unlock %s: %s", uuid, err)
                }
-               return
        }
 }