Merge branch '20978-instance-types'
[arvados.git] / lib / dispatchcloud / test / queue.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package test
6
7 import (
8         "fmt"
9         "sync"
10         "time"
11
12         "git.arvados.org/arvados.git/lib/dispatchcloud/container"
13         "git.arvados.org/arvados.git/sdk/go/arvados"
14         "github.com/sirupsen/logrus"
15 )
16
17 // Queue is a test stub for container.Queue. The caller specifies the
18 // initial queue state.
19 type Queue struct {
20         // Containers represent the API server database contents.
21         Containers []arvados.Container
22
23         // ChooseType will be called for each entry in Containers. It
24         // must not be nil.
25         ChooseType func(*arvados.Container) ([]arvados.InstanceType, error)
26
27         // Mimic railsapi implementation of MaxDispatchAttempts config
28         MaxDispatchAttempts int
29
30         Logger logrus.FieldLogger
31
32         entries      map[string]container.QueueEnt
33         updTime      time.Time
34         subscribers  map[<-chan struct{}]chan struct{}
35         stateChanges []QueueStateChange
36
37         mtx sync.Mutex
38 }
39
40 type QueueStateChange struct {
41         UUID string
42         From arvados.ContainerState
43         To   arvados.ContainerState
44 }
45
46 // All calls to Lock/Unlock/Cancel to date.
47 func (q *Queue) StateChanges() []QueueStateChange {
48         q.mtx.Lock()
49         defer q.mtx.Unlock()
50         return q.stateChanges
51 }
52
53 // Entries returns the containers that were queued when Update was
54 // last called.
55 func (q *Queue) Entries() (map[string]container.QueueEnt, time.Time) {
56         q.mtx.Lock()
57         defer q.mtx.Unlock()
58         updTime := q.updTime
59         r := map[string]container.QueueEnt{}
60         for uuid, ent := range q.entries {
61                 r[uuid] = ent
62         }
63         return r, updTime
64 }
65
66 // Get returns the container from the cached queue, i.e., as it was
67 // when Update was last called -- just like a container.Queue does. If
68 // the state has been changed (via Lock, Unlock, or Cancel) since the
69 // last Update, the updated state is returned.
70 func (q *Queue) Get(uuid string) (arvados.Container, bool) {
71         q.mtx.Lock()
72         defer q.mtx.Unlock()
73         ent, ok := q.entries[uuid]
74         return ent.Container, ok
75 }
76
77 func (q *Queue) Forget(uuid string) {
78         q.mtx.Lock()
79         defer q.mtx.Unlock()
80         delete(q.entries, uuid)
81 }
82
83 func (q *Queue) Lock(uuid string) error {
84         q.mtx.Lock()
85         defer q.mtx.Unlock()
86         return q.changeState(uuid, arvados.ContainerStateQueued, arvados.ContainerStateLocked)
87 }
88
89 func (q *Queue) Unlock(uuid string) error {
90         q.mtx.Lock()
91         defer q.mtx.Unlock()
92         return q.changeState(uuid, arvados.ContainerStateLocked, arvados.ContainerStateQueued)
93 }
94
95 func (q *Queue) Cancel(uuid string) error {
96         q.mtx.Lock()
97         defer q.mtx.Unlock()
98         return q.changeState(uuid, q.entries[uuid].Container.State, arvados.ContainerStateCancelled)
99 }
100
101 func (q *Queue) Subscribe() <-chan struct{} {
102         q.mtx.Lock()
103         defer q.mtx.Unlock()
104         if q.subscribers == nil {
105                 q.subscribers = map[<-chan struct{}]chan struct{}{}
106         }
107         ch := make(chan struct{}, 1)
108         q.subscribers[ch] = ch
109         return ch
110 }
111
112 func (q *Queue) Unsubscribe(ch <-chan struct{}) {
113         q.mtx.Lock()
114         defer q.mtx.Unlock()
115         delete(q.subscribers, ch)
116 }
117
118 // caller must have lock.
119 func (q *Queue) notify() {
120         for _, ch := range q.subscribers {
121                 select {
122                 case ch <- struct{}{}:
123                 default:
124                 }
125         }
126 }
127
128 // caller must have lock.
129 func (q *Queue) changeState(uuid string, from, to arvados.ContainerState) error {
130         ent := q.entries[uuid]
131         q.stateChanges = append(q.stateChanges, QueueStateChange{uuid, from, to})
132         if ent.Container.State != from {
133                 return fmt.Errorf("changeState failed: state=%q", ent.Container.State)
134         }
135         ent.Container.State = to
136         q.entries[uuid] = ent
137         for i, ctr := range q.Containers {
138                 if ctr.UUID == uuid {
139                         if max := q.MaxDispatchAttempts; max > 0 && ctr.LockCount >= max && to == arvados.ContainerStateQueued {
140                                 q.Containers[i].State = arvados.ContainerStateCancelled
141                                 q.Containers[i].RuntimeStatus = map[string]interface{}{"error": fmt.Sprintf("Failed to start: lock_count == %d", ctr.LockCount)}
142                         } else {
143                                 q.Containers[i].State = to
144                                 if to == arvados.ContainerStateLocked {
145                                         q.Containers[i].LockCount++
146                                 }
147                         }
148                         break
149                 }
150         }
151         q.notify()
152         return nil
153 }
154
155 // Update rebuilds the current entries from the Containers slice.
156 func (q *Queue) Update() error {
157         q.mtx.Lock()
158         defer q.mtx.Unlock()
159         updTime := time.Now()
160         upd := map[string]container.QueueEnt{}
161         for _, ctr := range q.Containers {
162                 _, exists := q.entries[ctr.UUID]
163                 if !exists && (ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled) {
164                         continue
165                 }
166                 if ent, ok := upd[ctr.UUID]; ok {
167                         ent.Container = ctr
168                         upd[ctr.UUID] = ent
169                 } else {
170                         types, _ := q.ChooseType(&ctr)
171                         ctr.Mounts = nil
172                         upd[ctr.UUID] = container.QueueEnt{
173                                 Container:     ctr,
174                                 InstanceTypes: types,
175                                 FirstSeenAt:   time.Now(),
176                         }
177                 }
178         }
179         q.entries = upd
180         q.updTime = updTime
181         q.notify()
182         return nil
183 }
184
185 // Notify adds/updates an entry in the Containers slice.  This
186 // simulates the effect of an API update from someone other than the
187 // dispatcher -- e.g., crunch-run updating state to "Complete" when a
188 // container exits.
189 //
190 // The resulting changes are not exposed through Get() or Entries()
191 // until the next call to Update().
192 //
193 // Return value is true unless the update is rejected (invalid state
194 // transition).
195 func (q *Queue) Notify(upd arvados.Container) bool {
196         q.mtx.Lock()
197         defer q.mtx.Unlock()
198         for i, ctr := range q.Containers {
199                 if ctr.UUID == upd.UUID {
200                         if allowContainerUpdate[ctr.State][upd.State] {
201                                 q.Containers[i] = upd
202                                 return true
203                         }
204                         if q.Logger != nil {
205                                 q.Logger.WithField("ContainerUUID", ctr.UUID).Infof("test.Queue rejected update from %s to %s", ctr.State, upd.State)
206                         }
207                         return false
208                 }
209         }
210         q.Containers = append(q.Containers, upd)
211         return true
212 }
213
214 var allowContainerUpdate = map[arvados.ContainerState]map[arvados.ContainerState]bool{
215         arvados.ContainerStateQueued: {
216                 arvados.ContainerStateQueued:    true,
217                 arvados.ContainerStateLocked:    true,
218                 arvados.ContainerStateCancelled: true,
219         },
220         arvados.ContainerStateLocked: {
221                 arvados.ContainerStateQueued:    true,
222                 arvados.ContainerStateLocked:    true,
223                 arvados.ContainerStateRunning:   true,
224                 arvados.ContainerStateCancelled: true,
225         },
226         arvados.ContainerStateRunning: {
227                 arvados.ContainerStateRunning:   true,
228                 arvados.ContainerStateCancelled: true,
229                 arvados.ContainerStateComplete:  true,
230         },
231 }