"git.arvados.org/arvados.git/lib/dispatchcloud/container"
"git.arvados.org/arvados.git/sdk/go/arvados"
+ "github.com/sirupsen/logrus"
)
// Queue is a test stub for container.Queue. The caller specifies the
// must not be nil.
ChooseType func(*arvados.Container) (arvados.InstanceType, error)
- entries map[string]container.QueueEnt
- updTime time.Time
- subscribers map[<-chan struct{}]chan struct{}
+ // Mimic railsapi implementation of MaxDispatchAttempts config
+ MaxDispatchAttempts int
+
+ Logger logrus.FieldLogger
+
+ entries map[string]container.QueueEnt
+ updTime time.Time
+ subscribers map[<-chan struct{}]chan struct{}
+ stateChanges []QueueStateChange
mtx sync.Mutex
}
+type QueueStateChange struct {
+ UUID string
+ From arvados.ContainerState
+ To arvados.ContainerState
+}
+
+// All calls to Lock/Unlock/Cancel to date.
+func (q *Queue) StateChanges() []QueueStateChange {
+ q.mtx.Lock()
+ defer q.mtx.Unlock()
+ return q.stateChanges
+}
+
// Entries returns the containers that were queued when Update was
// last called.
func (q *Queue) Entries() (map[string]container.QueueEnt, time.Time) {
// caller must have lock.
func (q *Queue) changeState(uuid string, from, to arvados.ContainerState) error {
ent := q.entries[uuid]
+ q.stateChanges = append(q.stateChanges, QueueStateChange{uuid, from, to})
if ent.Container.State != from {
return fmt.Errorf("changeState failed: state=%q", ent.Container.State)
}
q.entries[uuid] = ent
for i, ctr := range q.Containers {
if ctr.UUID == uuid {
- q.Containers[i].State = to
+ if max := q.MaxDispatchAttempts; max > 0 && ctr.LockCount >= max && to == arvados.ContainerStateQueued {
+ q.Containers[i].State = arvados.ContainerStateCancelled
+ q.Containers[i].RuntimeStatus = map[string]interface{}{"error": fmt.Sprintf("Failed to start: lock_count == %d", ctr.LockCount)}
+ } else {
+ q.Containers[i].State = to
+ if to == arvados.ContainerStateLocked {
+ q.Containers[i].LockCount++
+ }
+ }
break
}
}
upd[ctr.UUID] = ent
} else {
it, _ := q.ChooseType(&ctr)
+ ctr.Mounts = nil
upd[ctr.UUID] = container.QueueEnt{
Container: ctr,
InstanceType: it,
+ FirstSeenAt: time.Now(),
}
}
}
defer q.mtx.Unlock()
for i, ctr := range q.Containers {
if ctr.UUID == upd.UUID {
- if ctr.State != arvados.ContainerStateComplete && ctr.State != arvados.ContainerStateCancelled {
+ if allowContainerUpdate[ctr.State][upd.State] {
q.Containers[i] = upd
return true
}
+ if q.Logger != nil {
+ q.Logger.WithField("ContainerUUID", ctr.UUID).Infof("test.Queue rejected update from %s to %s", ctr.State, upd.State)
+ }
return false
}
}
q.Containers = append(q.Containers, upd)
return true
}
+
+var allowContainerUpdate = map[arvados.ContainerState]map[arvados.ContainerState]bool{
+ arvados.ContainerStateQueued: {
+ arvados.ContainerStateQueued: true,
+ arvados.ContainerStateLocked: true,
+ arvados.ContainerStateCancelled: true,
+ },
+ arvados.ContainerStateLocked: {
+ arvados.ContainerStateQueued: true,
+ arvados.ContainerStateLocked: true,
+ arvados.ContainerStateRunning: true,
+ arvados.ContainerStateCancelled: true,
+ },
+ arvados.ContainerStateRunning: {
+ arvados.ContainerStateRunning: true,
+ arvados.ContainerStateCancelled: true,
+ arvados.ContainerStateComplete: true,
+ },
+}