Merge remote-tracking branch 'origin/master' into 14645-fuse-operations-reporting
[arvados.git] / lib / dispatchcloud / test / queue.go
index 0bad5a391e6f5e369a6c7f163135076b7c818aa9..6a6c952358bcadff447d08869e641f32d24618ad 100644 (file)
@@ -61,14 +61,20 @@ func (q *Queue) Forget(uuid string) {
 }
 
 func (q *Queue) Lock(uuid string) error {
+       q.mtx.Lock()
+       defer q.mtx.Unlock()
        return q.changeState(uuid, arvados.ContainerStateQueued, arvados.ContainerStateLocked)
 }
 
 func (q *Queue) Unlock(uuid string) error {
+       q.mtx.Lock()
+       defer q.mtx.Unlock()
        return q.changeState(uuid, arvados.ContainerStateLocked, arvados.ContainerStateQueued)
 }
 
 func (q *Queue) Cancel(uuid string) error {
+       q.mtx.Lock()
+       defer q.mtx.Unlock()
        return q.changeState(uuid, q.entries[uuid].Container.State, arvados.ContainerStateCancelled)
 }
 
@@ -89,6 +95,7 @@ func (q *Queue) Unsubscribe(ch <-chan struct{}) {
        delete(q.subscribers, ch)
 }
 
+// caller must have lock.
 func (q *Queue) notify() {
        for _, ch := range q.subscribers {
                select {
@@ -98,12 +105,11 @@ func (q *Queue) notify() {
        }
 }
 
+// caller must have lock.
 func (q *Queue) changeState(uuid string, from, to arvados.ContainerState) error {
-       q.mtx.Lock()
-       defer q.mtx.Unlock()
        ent := q.entries[uuid]
        if ent.Container.State != from {
-               return fmt.Errorf("lock failed: state=%q", ent.Container.State)
+               return fmt.Errorf("changeState failed: state=%q", ent.Container.State)
        }
        ent.Container.State = to
        q.entries[uuid] = ent
@@ -128,10 +134,15 @@ func (q *Queue) Update() error {
                if !exists && (ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled) {
                        continue
                }
-               it, _ := q.ChooseType(&ctr)
-               upd[ctr.UUID] = container.QueueEnt{
-                       Container:    ctr,
-                       InstanceType: it,
+               if ent, ok := upd[ctr.UUID]; ok {
+                       ent.Container = ctr
+                       upd[ctr.UUID] = ent
+               } else {
+                       it, _ := q.ChooseType(&ctr)
+                       upd[ctr.UUID] = container.QueueEnt{
+                               Container:    ctr,
+                               InstanceType: it,
+                       }
                }
        }
        q.entries = upd
@@ -147,14 +158,21 @@ func (q *Queue) Update() error {
 //
 // The resulting changes are not exposed through Get() or Entries()
 // until the next call to Update().
-func (q *Queue) Notify(upd arvados.Container) {
+//
+// Return value is true unless the update is rejected (invalid state
+// transition).
+func (q *Queue) Notify(upd arvados.Container) bool {
        q.mtx.Lock()
        defer q.mtx.Unlock()
        for i, ctr := range q.Containers {
                if ctr.UUID == upd.UUID {
-                       q.Containers[i] = upd
-                       return
+                       if ctr.State != arvados.ContainerStateComplete && ctr.State != arvados.ContainerStateCancelled {
+                               q.Containers[i] = upd
+                               return true
+                       }
+                       return false
                }
        }
        q.Containers = append(q.Containers, upd)
+       return true
 }