Merge branch '21535-multi-wf-delete'
[arvados.git] / lib / dispatchcloud / test / queue.go
index 909f561144006dc290410a9babddcc8f9876b03a..ea2b98236ffe34d325fc2c69f96c5be3eea2450e 100644 (file)
@@ -6,10 +6,12 @@ package test
 
 import (
        "fmt"
+       "sync"
        "time"
 
-       "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/lib/dispatchcloud/container"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "github.com/sirupsen/logrus"
 )
 
 // Queue is a test stub for container.Queue. The caller specifies the
@@ -20,15 +22,39 @@ type Queue struct {
 
        // ChooseType will be called for each entry in Containers. It
        // must not be nil.
-       ChooseType func(*arvados.Container) (arvados.InstanceType, error)
+       ChooseType func(*arvados.Container) ([]arvados.InstanceType, error)
 
-       entries map[string]container.QueueEnt
-       updTime time.Time
+       // Mimic railsapi implementation of MaxDispatchAttempts config
+       MaxDispatchAttempts int
+
+       Logger logrus.FieldLogger
+
+       entries      map[string]container.QueueEnt
+       updTime      time.Time
+       subscribers  map[<-chan struct{}]chan struct{}
+       stateChanges []QueueStateChange
+
+       mtx sync.Mutex
+}
+
+type QueueStateChange struct {
+       UUID string
+       From arvados.ContainerState
+       To   arvados.ContainerState
+}
+
+// All calls to Lock/Unlock/Cancel to date.
+func (q *Queue) StateChanges() []QueueStateChange {
+       q.mtx.Lock()
+       defer q.mtx.Unlock()
+       return q.stateChanges
 }
 
 // Entries returns the containers that were queued when Update was
 // last called.
 func (q *Queue) Entries() (map[string]container.QueueEnt, time.Time) {
+       q.mtx.Lock()
+       defer q.mtx.Unlock()
        updTime := q.updTime
        r := map[string]container.QueueEnt{}
        for uuid, ent := range q.entries {
@@ -42,44 +68,94 @@ func (q *Queue) Entries() (map[string]container.QueueEnt, time.Time) {
 // the state has been changed (via Lock, Unlock, or Cancel) since the
 // last Update, the updated state is returned.
 func (q *Queue) Get(uuid string) (arvados.Container, bool) {
+       q.mtx.Lock()
+       defer q.mtx.Unlock()
        ent, ok := q.entries[uuid]
        return ent.Container, ok
 }
 
 func (q *Queue) Forget(uuid string) {
+       q.mtx.Lock()
+       defer q.mtx.Unlock()
        delete(q.entries, uuid)
 }
 
 func (q *Queue) Lock(uuid string) error {
+       q.mtx.Lock()
+       defer q.mtx.Unlock()
        return q.changeState(uuid, arvados.ContainerStateQueued, arvados.ContainerStateLocked)
 }
 
 func (q *Queue) Unlock(uuid string) error {
+       q.mtx.Lock()
+       defer q.mtx.Unlock()
        return q.changeState(uuid, arvados.ContainerStateLocked, arvados.ContainerStateQueued)
 }
 
 func (q *Queue) Cancel(uuid string) error {
+       q.mtx.Lock()
+       defer q.mtx.Unlock()
        return q.changeState(uuid, q.entries[uuid].Container.State, arvados.ContainerStateCancelled)
 }
 
+func (q *Queue) Subscribe() <-chan struct{} {
+       q.mtx.Lock()
+       defer q.mtx.Unlock()
+       if q.subscribers == nil {
+               q.subscribers = map[<-chan struct{}]chan struct{}{}
+       }
+       ch := make(chan struct{}, 1)
+       q.subscribers[ch] = ch
+       return ch
+}
+
+func (q *Queue) Unsubscribe(ch <-chan struct{}) {
+       q.mtx.Lock()
+       defer q.mtx.Unlock()
+       delete(q.subscribers, ch)
+}
+
+// caller must have lock.
+func (q *Queue) notify() {
+       for _, ch := range q.subscribers {
+               select {
+               case ch <- struct{}{}:
+               default:
+               }
+       }
+}
+
+// caller must have lock.
 func (q *Queue) changeState(uuid string, from, to arvados.ContainerState) error {
        ent := q.entries[uuid]
+       q.stateChanges = append(q.stateChanges, QueueStateChange{uuid, from, to})
        if ent.Container.State != from {
-               return fmt.Errorf("lock failed: state=%q", ent.Container.State)
+               return fmt.Errorf("changeState failed: state=%q", ent.Container.State)
        }
        ent.Container.State = to
        q.entries[uuid] = ent
        for i, ctr := range q.Containers {
                if ctr.UUID == uuid {
-                       q.Containers[i].State = to
+                       if max := q.MaxDispatchAttempts; max > 0 && ctr.LockCount >= max && to == arvados.ContainerStateQueued {
+                               q.Containers[i].State = arvados.ContainerStateCancelled
+                               q.Containers[i].RuntimeStatus = map[string]interface{}{"error": fmt.Sprintf("Failed to start: lock_count == %d", ctr.LockCount)}
+                       } else {
+                               q.Containers[i].State = to
+                               if to == arvados.ContainerStateLocked {
+                                       q.Containers[i].LockCount++
+                               }
+                       }
                        break
                }
        }
+       q.notify()
        return nil
 }
 
 // Update rebuilds the current entries from the Containers slice.
 func (q *Queue) Update() error {
+       q.mtx.Lock()
+       defer q.mtx.Unlock()
        updTime := time.Now()
        upd := map[string]container.QueueEnt{}
        for _, ctr := range q.Containers {
@@ -87,14 +163,22 @@ func (q *Queue) Update() error {
                if !exists && (ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled) {
                        continue
                }
-               it, _ := q.ChooseType(&ctr)
-               upd[ctr.UUID] = container.QueueEnt{
-                       Container:    ctr,
-                       InstanceType: it,
+               if ent, ok := upd[ctr.UUID]; ok {
+                       ent.Container = ctr
+                       upd[ctr.UUID] = ent
+               } else {
+                       types, _ := q.ChooseType(&ctr)
+                       ctr.Mounts = nil
+                       upd[ctr.UUID] = container.QueueEnt{
+                               Container:     ctr,
+                               InstanceTypes: types,
+                               FirstSeenAt:   time.Now(),
+                       }
                }
        }
        q.entries = upd
        q.updTime = updTime
+       q.notify()
        return nil
 }
 
@@ -105,12 +189,43 @@ func (q *Queue) Update() error {
 //
 // The resulting changes are not exposed through Get() or Entries()
 // until the next call to Update().
-func (q *Queue) Notify(upd arvados.Container) {
+//
+// Return value is true unless the update is rejected (invalid state
+// transition).
+func (q *Queue) Notify(upd arvados.Container) bool {
+       q.mtx.Lock()
+       defer q.mtx.Unlock()
        for i, ctr := range q.Containers {
                if ctr.UUID == upd.UUID {
-                       q.Containers[i] = upd
-                       return
+                       if allowContainerUpdate[ctr.State][upd.State] {
+                               q.Containers[i] = upd
+                               return true
+                       }
+                       if q.Logger != nil {
+                               q.Logger.WithField("ContainerUUID", ctr.UUID).Infof("test.Queue rejected update from %s to %s", ctr.State, upd.State)
+                       }
+                       return false
                }
        }
        q.Containers = append(q.Containers, upd)
+       return true
+}
+
+var allowContainerUpdate = map[arvados.ContainerState]map[arvados.ContainerState]bool{
+       arvados.ContainerStateQueued: {
+               arvados.ContainerStateQueued:    true,
+               arvados.ContainerStateLocked:    true,
+               arvados.ContainerStateCancelled: true,
+       },
+       arvados.ContainerStateLocked: {
+               arvados.ContainerStateQueued:    true,
+               arvados.ContainerStateLocked:    true,
+               arvados.ContainerStateRunning:   true,
+               arvados.ContainerStateCancelled: true,
+       },
+       arvados.ContainerStateRunning: {
+               arvados.ContainerStateRunning:   true,
+               arvados.ContainerStateCancelled: true,
+               arvados.ContainerStateComplete:  true,
+       },
 }