- sch.locking[uuid] = true
- go func() {
- defer func() {
- sch.mtx.Lock()
- defer sch.mtx.Unlock()
- delete(sch.locking, uuid)
- }()
- err := sch.queue.Lock(uuid)
- if err != nil {
- logger.WithError(err).Warn("error locking container")
- return
- }
- logger.Debug("lock succeeded")
- ctr, ok := sch.queue.Get(uuid)
- if !ok {
- logger.Error("(BUG?) container disappeared from queue after Lock succeeded")
- } else if ctr.State != arvados.ContainerStateLocked {
- logger.Warnf("(race?) container has state=%q after Lock succeeded", ctr.State)
- }
- }()
+ err := sch.queue.Lock(uuid)
+ if err != nil {
+ logger.WithError(err).Warn("error locking container")
+ return
+ }
+ logger.Debug("lock succeeded")
+ ctr, ok := sch.queue.Get(uuid)
+ if !ok {
+ logger.Error("(BUG?) container disappeared from queue after Lock succeeded")
+ } else if ctr.State != arvados.ContainerStateLocked {
+ logger.Warnf("(race?) container has state=%q after Lock succeeded", ctr.State)
+ }
+}
+
+// Acquire a non-blocking lock for specified UUID, returning true if
+// successful. The op argument is used only for debug logs.
+//
+// If the lock is not available, uuidLock arranges to wake up the
+// scheduler after a short delay, so it can retry whatever operation
+// is trying to get the lock (if that operation is still worth doing).
+//
+// This mechanism helps avoid spamming the controller/database with
+// concurrent updates for any single container, even when the
+// scheduler loop is running frequently.
+func (sch *Scheduler) uuidLock(uuid, op string) bool {
+ sch.mtx.Lock()
+ defer sch.mtx.Unlock()
+ logger := sch.logger.WithFields(logrus.Fields{
+ "ContainerUUID": uuid,
+ "Op": op,
+ })
+ if op, locked := sch.uuidOp[uuid]; locked {
+ logger.Debugf("uuidLock not available, Op=%s in progress", op)
+ // Make sure the scheduler loop wakes up to retry.
+ sch.wakeup.Reset(time.Second / 4)
+ return false
+ }
+ logger.Debug("uuidLock acquired")
+ sch.uuidOp[uuid] = op
+ return true
+}
+
+func (sch *Scheduler) uuidUnlock(uuid string) {
+ sch.mtx.Lock()
+ defer sch.mtx.Unlock()
+ delete(sch.uuidOp, uuid)