Merge branch '21535-multi-wf-delete'
[arvados.git] / lib / dispatchcloud / test / queue.go
index 11d410fb1b9a931b8b65cb990aea1298babf7269..ea2b98236ffe34d325fc2c69f96c5be3eea2450e 100644 (file)
@@ -11,6 +11,7 @@ import (
 
        "git.arvados.org/arvados.git/lib/dispatchcloud/container"
        "git.arvados.org/arvados.git/sdk/go/arvados"
+       "github.com/sirupsen/logrus"
 )
 
 // Queue is a test stub for container.Queue. The caller specifies the
@@ -21,15 +22,34 @@ type Queue struct {
 
        // ChooseType will be called for each entry in Containers. It
        // must not be nil.
-       ChooseType func(*arvados.Container) (arvados.InstanceType, error)
+       ChooseType func(*arvados.Container) ([]arvados.InstanceType, error)
 
-       entries     map[string]container.QueueEnt
-       updTime     time.Time
-       subscribers map[<-chan struct{}]chan struct{}
+       // Mimic railsapi implementation of MaxDispatchAttempts config
+       MaxDispatchAttempts int
+
+       Logger logrus.FieldLogger
+
+       entries      map[string]container.QueueEnt
+       updTime      time.Time
+       subscribers  map[<-chan struct{}]chan struct{}
+       stateChanges []QueueStateChange
 
        mtx sync.Mutex
 }
 
+type QueueStateChange struct {
+       UUID string
+       From arvados.ContainerState
+       To   arvados.ContainerState
+}
+
+// All calls to Lock/Unlock/Cancel to date.
+func (q *Queue) StateChanges() []QueueStateChange {
+       q.mtx.Lock()
+       defer q.mtx.Unlock()
+       return q.stateChanges
+}
+
 // Entries returns the containers that were queued when Update was
 // last called.
 func (q *Queue) Entries() (map[string]container.QueueEnt, time.Time) {
@@ -108,6 +128,7 @@ func (q *Queue) notify() {
 // caller must have lock.
 func (q *Queue) changeState(uuid string, from, to arvados.ContainerState) error {
        ent := q.entries[uuid]
+       q.stateChanges = append(q.stateChanges, QueueStateChange{uuid, from, to})
        if ent.Container.State != from {
                return fmt.Errorf("changeState failed: state=%q", ent.Container.State)
        }
@@ -115,7 +136,15 @@ func (q *Queue) changeState(uuid string, from, to arvados.ContainerState) error
        q.entries[uuid] = ent
        for i, ctr := range q.Containers {
                if ctr.UUID == uuid {
-                       q.Containers[i].State = to
+                       if max := q.MaxDispatchAttempts; max > 0 && ctr.LockCount >= max && to == arvados.ContainerStateQueued {
+                               q.Containers[i].State = arvados.ContainerStateCancelled
+                               q.Containers[i].RuntimeStatus = map[string]interface{}{"error": fmt.Sprintf("Failed to start: lock_count == %d", ctr.LockCount)}
+                       } else {
+                               q.Containers[i].State = to
+                               if to == arvados.ContainerStateLocked {
+                                       q.Containers[i].LockCount++
+                               }
+                       }
                        break
                }
        }
@@ -138,10 +167,12 @@ func (q *Queue) Update() error {
                        ent.Container = ctr
                        upd[ctr.UUID] = ent
                } else {
-                       it, _ := q.ChooseType(&ctr)
+                       types, _ := q.ChooseType(&ctr)
+                       ctr.Mounts = nil
                        upd[ctr.UUID] = container.QueueEnt{
-                               Container:    ctr,
-                               InstanceType: it,
+                               Container:     ctr,
+                               InstanceTypes: types,
+                               FirstSeenAt:   time.Now(),
                        }
                }
        }
@@ -166,13 +197,35 @@ func (q *Queue) Notify(upd arvados.Container) bool {
        defer q.mtx.Unlock()
        for i, ctr := range q.Containers {
                if ctr.UUID == upd.UUID {
-                       if ctr.State != arvados.ContainerStateComplete && ctr.State != arvados.ContainerStateCancelled {
+                       if allowContainerUpdate[ctr.State][upd.State] {
                                q.Containers[i] = upd
                                return true
                        }
+                       if q.Logger != nil {
+                               q.Logger.WithField("ContainerUUID", ctr.UUID).Infof("test.Queue rejected update from %s to %s", ctr.State, upd.State)
+                       }
                        return false
                }
        }
        q.Containers = append(q.Containers, upd)
        return true
 }
+
+var allowContainerUpdate = map[arvados.ContainerState]map[arvados.ContainerState]bool{
+       arvados.ContainerStateQueued: {
+               arvados.ContainerStateQueued:    true,
+               arvados.ContainerStateLocked:    true,
+               arvados.ContainerStateCancelled: true,
+       },
+       arvados.ContainerStateLocked: {
+               arvados.ContainerStateQueued:    true,
+               arvados.ContainerStateLocked:    true,
+               arvados.ContainerStateRunning:   true,
+               arvados.ContainerStateCancelled: true,
+       },
+       arvados.ContainerStateRunning: {
+               arvados.ContainerStateRunning:   true,
+               arvados.ContainerStateCancelled: true,
+               arvados.ContainerStateComplete:  true,
+       },
+}