import (
"time"
- "git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/lib/dispatchcloud/worker"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
)
// fixStaleLocks waits for any already-locked containers (i.e., locked
func (sch *Scheduler) fixStaleLocks() {
wp := sch.pool.Subscribe()
defer sch.pool.Unsubscribe(wp)
+
+ var stale []string
timeout := time.NewTimer(sch.staleLockTimeout)
waiting:
- for {
- unlock := false
- select {
- case <-wp:
- // If all workers have been contacted, unlock
- // containers that aren't claimed by any
- // worker.
- unlock = sch.pool.CountWorkers()[worker.StateUnknown] == 0
- case <-timeout.C:
- // Give up and unlock the containers, even
- // though they might be working.
- unlock = true
- }
-
+ for sch.pool.CountWorkers()[worker.StateUnknown] > 0 {
running := sch.pool.Running()
qEntries, _ := sch.queue.Entries()
+
+ stale = nil
for uuid, ent := range qEntries {
if ent.Container.State != arvados.ContainerStateLocked {
continue
if _, running := running[uuid]; running {
continue
}
- if !unlock {
- continue waiting
- }
- err := sch.queue.Unlock(uuid)
- if err != nil {
- sch.logger.Warnf("Unlock %s: %s", uuid, err)
- }
+ stale = append(stale, uuid)
+ }
+ if len(stale) == 0 {
+ return
+ }
+
+ select {
+ case <-wp:
+ case <-timeout.C:
+ // Give up.
+ break waiting
+ }
+ }
+
+ for _, uuid := range stale {
+ err := sch.queue.Unlock(uuid)
+ if err != nil {
+ sch.logger.Warnf("Unlock %s: %s", uuid, err)
}
- return
}
}