1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
12 "git.arvados.org/arvados.git/lib/dispatchcloud/container"
13 "git.arvados.org/arvados.git/sdk/go/arvados"
14 "github.com/sirupsen/logrus"
17 // Queue is a test stub for container.Queue. The caller specifies the
18 // initial queue state.
20 // Containers represent the API server database contents.
21 Containers []arvados.Container
23 // ChooseType will be called for each entry in Containers. It
25 ChooseType func(*arvados.Container) (arvados.InstanceType, error)
27 // Mimic railsapi implementation of MaxDispatchAttempts config
28 MaxDispatchAttempts int
30 Logger logrus.FieldLogger
32 entries map[string]container.QueueEnt
34 subscribers map[<-chan struct{}]chan struct{}
35 stateChanges []QueueStateChange
40 type QueueStateChange struct {
42 From arvados.ContainerState
43 To arvados.ContainerState
46 // All calls to Lock/Unlock/Cancel to date.
47 func (q *Queue) StateChanges() []QueueStateChange {
53 // Entries returns the containers that were queued when Update was
55 func (q *Queue) Entries() (map[string]container.QueueEnt, time.Time) {
59 r := map[string]container.QueueEnt{}
60 for uuid, ent := range q.entries {
66 // Get returns the container from the cached queue, i.e., as it was
67 // when Update was last called -- just like a container.Queue does. If
68 // the state has been changed (via Lock, Unlock, or Cancel) since the
69 // last Update, the updated state is returned.
70 func (q *Queue) Get(uuid string) (arvados.Container, bool) {
73 ent, ok := q.entries[uuid]
74 return ent.Container, ok
77 func (q *Queue) Forget(uuid string) {
80 delete(q.entries, uuid)
83 func (q *Queue) Lock(uuid string) error {
86 return q.changeState(uuid, arvados.ContainerStateQueued, arvados.ContainerStateLocked)
89 func (q *Queue) Unlock(uuid string) error {
92 return q.changeState(uuid, arvados.ContainerStateLocked, arvados.ContainerStateQueued)
95 func (q *Queue) Cancel(uuid string) error {
98 return q.changeState(uuid, q.entries[uuid].Container.State, arvados.ContainerStateCancelled)
101 func (q *Queue) Subscribe() <-chan struct{} {
104 if q.subscribers == nil {
105 q.subscribers = map[<-chan struct{}]chan struct{}{}
107 ch := make(chan struct{}, 1)
108 q.subscribers[ch] = ch
112 func (q *Queue) Unsubscribe(ch <-chan struct{}) {
115 delete(q.subscribers, ch)
118 // caller must have lock.
119 func (q *Queue) notify() {
120 for _, ch := range q.subscribers {
122 case ch <- struct{}{}:
128 // caller must have lock.
129 func (q *Queue) changeState(uuid string, from, to arvados.ContainerState) error {
130 ent := q.entries[uuid]
131 q.stateChanges = append(q.stateChanges, QueueStateChange{uuid, from, to})
132 if ent.Container.State != from {
133 return fmt.Errorf("changeState failed: state=%q", ent.Container.State)
135 ent.Container.State = to
136 q.entries[uuid] = ent
137 for i, ctr := range q.Containers {
138 if ctr.UUID == uuid {
139 if max := q.MaxDispatchAttempts; max > 0 && ctr.LockCount >= max && to == arvados.ContainerStateQueued {
140 q.Containers[i].State = arvados.ContainerStateCancelled
141 q.Containers[i].RuntimeStatus = map[string]interface{}{"error": fmt.Sprintf("Failed to start: lock_count == %d", ctr.LockCount)}
143 q.Containers[i].State = to
144 if to == arvados.ContainerStateLocked {
145 q.Containers[i].LockCount++
155 // Update rebuilds the current entries from the Containers slice.
156 func (q *Queue) Update() error {
159 updTime := time.Now()
160 upd := map[string]container.QueueEnt{}
161 for _, ctr := range q.Containers {
162 _, exists := q.entries[ctr.UUID]
163 if !exists && (ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled) {
166 if ent, ok := upd[ctr.UUID]; ok {
170 it, _ := q.ChooseType(&ctr)
172 upd[ctr.UUID] = container.QueueEnt{
175 FirstSeenAt: time.Now(),
185 // Notify adds/updates an entry in the Containers slice. This
186 // simulates the effect of an API update from someone other than the
187 // dispatcher -- e.g., crunch-run updating state to "Complete" when a
190 // The resulting changes are not exposed through Get() or Entries()
191 // until the next call to Update().
193 // Return value is true unless the update is rejected (invalid state
195 func (q *Queue) Notify(upd arvados.Container) bool {
198 for i, ctr := range q.Containers {
199 if ctr.UUID == upd.UUID {
200 if allowContainerUpdate[ctr.State][upd.State] {
201 q.Containers[i] = upd
205 q.Logger.WithField("ContainerUUID", ctr.UUID).Infof("test.Queue rejected update from %s to %s", ctr.State, upd.State)
210 q.Containers = append(q.Containers, upd)
214 var allowContainerUpdate = map[arvados.ContainerState]map[arvados.ContainerState]bool{
215 arvados.ContainerStateQueued: {
216 arvados.ContainerStateQueued: true,
217 arvados.ContainerStateLocked: true,
218 arvados.ContainerStateCancelled: true,
220 arvados.ContainerStateLocked: {
221 arvados.ContainerStateQueued: true,
222 arvados.ContainerStateLocked: true,
223 arvados.ContainerStateRunning: true,
224 arvados.ContainerStateCancelled: true,
226 arvados.ContainerStateRunning: {
227 arvados.ContainerStateRunning: true,
228 arvados.ContainerStateCancelled: true,
229 arvados.ContainerStateComplete: true,