c.Fatalf("timed out; still waiting for %d containers: %q", len(waiting), waiting)
}
- deadline := time.Now().Add(time.Second)
+ deadline := time.Now().Add(5 * time.Second)
for range time.NewTicker(10 * time.Millisecond).C {
insts, err := s.stubDriver.InstanceSets()[0].Instances(nil)
c.Check(err, check.IsNil)
func (sch *Scheduler) fixStaleLocks() {
wp := sch.pool.Subscribe()
defer sch.pool.Unsubscribe(wp)
+
+ var stale []string
timeout := time.NewTimer(sch.staleLockTimeout)
waiting:
for {
- unlock := false
- select {
- case <-wp:
- // If all workers have been contacted, unlock
- // containers that aren't claimed by any
- // worker.
- unlock = sch.pool.CountWorkers()[worker.StateUnknown] == 0
- case <-timeout.C:
- // Give up and unlock the containers, even
- // though they might be working.
- unlock = true
- }
-
running := sch.pool.Running()
qEntries, _ := sch.queue.Entries()
+
+ stale = nil
for uuid, ent := range qEntries {
if ent.Container.State != arvados.ContainerStateLocked {
continue
if _, running := running[uuid]; running {
continue
}
- if !unlock {
- continue waiting
- }
- err := sch.queue.Unlock(uuid)
- if err != nil {
- sch.logger.Warnf("Unlock %s: %s", uuid, err)
+ stale = append(stale, uuid)
+ }
+ if len(stale) == 0 {
+ return
+ }
+
+ select {
+ case <-wp:
+ // Stop waiting if all workers have been
+ // contacted.
+ if sch.pool.CountWorkers()[worker.StateUnknown] == 0 {
+ break waiting
}
+ case <-timeout.C:
+ // Give up.
+ break waiting
+ }
+
+ }
+
+ for _, uuid := range stale {
+ err := sch.queue.Unlock(uuid)
+ if err != nil {
+ sch.logger.Warnf("Unlock %s: %s", uuid, err)
}
- return
}
}