X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b6d7efab2c4bffa3fabd55b166e44cca8ac1391f..3a9428dd1db2fd393dd90cd3ffc744c0bf45fe28:/lib/dispatchcloud/test/queue.go diff --git a/lib/dispatchcloud/test/queue.go b/lib/dispatchcloud/test/queue.go index 11d410fb1b..2be8246bd6 100644 --- a/lib/dispatchcloud/test/queue.go +++ b/lib/dispatchcloud/test/queue.go @@ -11,6 +11,7 @@ import ( "git.arvados.org/arvados.git/lib/dispatchcloud/container" "git.arvados.org/arvados.git/sdk/go/arvados" + "github.com/sirupsen/logrus" ) // Queue is a test stub for container.Queue. The caller specifies the @@ -23,13 +24,32 @@ type Queue struct { // must not be nil. ChooseType func(*arvados.Container) (arvados.InstanceType, error) - entries map[string]container.QueueEnt - updTime time.Time - subscribers map[<-chan struct{}]chan struct{} + // Mimic railsapi implementation of MaxDispatchAttempts config + MaxDispatchAttempts int + + Logger logrus.FieldLogger + + entries map[string]container.QueueEnt + updTime time.Time + subscribers map[<-chan struct{}]chan struct{} + stateChanges []QueueStateChange mtx sync.Mutex } +type QueueStateChange struct { + UUID string + From arvados.ContainerState + To arvados.ContainerState +} + +// All calls to Lock/Unlock/Cancel to date. +func (q *Queue) StateChanges() []QueueStateChange { + q.mtx.Lock() + defer q.mtx.Unlock() + return q.stateChanges +} + // Entries returns the containers that were queued when Update was // last called. func (q *Queue) Entries() (map[string]container.QueueEnt, time.Time) { @@ -108,6 +128,7 @@ func (q *Queue) notify() { // caller must have lock. func (q *Queue) changeState(uuid string, from, to arvados.ContainerState) error { ent := q.entries[uuid] + q.stateChanges = append(q.stateChanges, QueueStateChange{uuid, from, to}) if ent.Container.State != from { return fmt.Errorf("changeState failed: state=%q", ent.Container.State) } @@ -115,7 +136,15 @@ func (q *Queue) changeState(uuid string, from, to arvados.ContainerState) error q.entries[uuid] = ent for i, ctr := range q.Containers { if ctr.UUID == uuid { - q.Containers[i].State = to + if max := q.MaxDispatchAttempts; max > 0 && ctr.LockCount >= max && to == arvados.ContainerStateQueued { + q.Containers[i].State = arvados.ContainerStateCancelled + q.Containers[i].RuntimeStatus = map[string]interface{}{"error": fmt.Sprintf("Failed to start: lock_count == %d", ctr.LockCount)} + } else { + q.Containers[i].State = to + if to == arvados.ContainerStateLocked { + q.Containers[i].LockCount++ + } + } break } } @@ -139,9 +168,11 @@ func (q *Queue) Update() error { upd[ctr.UUID] = ent } else { it, _ := q.ChooseType(&ctr) + ctr.Mounts = nil upd[ctr.UUID] = container.QueueEnt{ Container: ctr, InstanceType: it, + FirstSeenAt: time.Now(), } } } @@ -166,13 +197,35 @@ func (q *Queue) Notify(upd arvados.Container) bool { defer q.mtx.Unlock() for i, ctr := range q.Containers { if ctr.UUID == upd.UUID { - if ctr.State != arvados.ContainerStateComplete && ctr.State != arvados.ContainerStateCancelled { + if allowContainerUpdate[ctr.State][upd.State] { q.Containers[i] = upd return true } + if q.Logger != nil { + q.Logger.WithField("ContainerUUID", ctr.UUID).Infof("test.Queue rejected update from %s to %s", ctr.State, upd.State) + } return false } } q.Containers = append(q.Containers, upd) return true } + +var allowContainerUpdate = map[arvados.ContainerState]map[arvados.ContainerState]bool{ + arvados.ContainerStateQueued: { + arvados.ContainerStateQueued: true, + arvados.ContainerStateLocked: true, + arvados.ContainerStateCancelled: true, + }, + arvados.ContainerStateLocked: { + arvados.ContainerStateQueued: true, + arvados.ContainerStateLocked: true, + arvados.ContainerStateRunning: true, + arvados.ContainerStateCancelled: true, + }, + arvados.ContainerStateRunning: { + arvados.ContainerStateRunning: true, + arvados.ContainerStateCancelled: true, + arvados.ContainerStateComplete: true, + }, +}