Merge branch '17995-filter-by-comparing-attrs'
[arvados.git] / lib / dispatchcloud / test / queue.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package test
6
7 import (
8         "fmt"
9         "sync"
10         "time"
11
12         "git.arvados.org/arvados.git/lib/dispatchcloud/container"
13         "git.arvados.org/arvados.git/sdk/go/arvados"
14         "github.com/sirupsen/logrus"
15 )
16
17 // Queue is a test stub for container.Queue. The caller specifies the
18 // initial queue state.
19 type Queue struct {
20         // Containers represent the API server database contents.
21         Containers []arvados.Container
22
23         // ChooseType will be called for each entry in Containers. It
24         // must not be nil.
25         ChooseType func(*arvados.Container) (arvados.InstanceType, error)
26
27         Logger logrus.FieldLogger
28
29         entries      map[string]container.QueueEnt
30         updTime      time.Time
31         subscribers  map[<-chan struct{}]chan struct{}
32         stateChanges []QueueStateChange
33
34         mtx sync.Mutex
35 }
36
37 type QueueStateChange struct {
38         UUID string
39         From arvados.ContainerState
40         To   arvados.ContainerState
41 }
42
43 // All calls to Lock/Unlock/Cancel to date.
44 func (q *Queue) StateChanges() []QueueStateChange {
45         q.mtx.Lock()
46         defer q.mtx.Unlock()
47         return q.stateChanges
48 }
49
50 // Entries returns the containers that were queued when Update was
51 // last called.
52 func (q *Queue) Entries() (map[string]container.QueueEnt, time.Time) {
53         q.mtx.Lock()
54         defer q.mtx.Unlock()
55         updTime := q.updTime
56         r := map[string]container.QueueEnt{}
57         for uuid, ent := range q.entries {
58                 r[uuid] = ent
59         }
60         return r, updTime
61 }
62
63 // Get returns the container from the cached queue, i.e., as it was
64 // when Update was last called -- just like a container.Queue does. If
65 // the state has been changed (via Lock, Unlock, or Cancel) since the
66 // last Update, the updated state is returned.
67 func (q *Queue) Get(uuid string) (arvados.Container, bool) {
68         q.mtx.Lock()
69         defer q.mtx.Unlock()
70         ent, ok := q.entries[uuid]
71         return ent.Container, ok
72 }
73
74 func (q *Queue) Forget(uuid string) {
75         q.mtx.Lock()
76         defer q.mtx.Unlock()
77         delete(q.entries, uuid)
78 }
79
80 func (q *Queue) Lock(uuid string) error {
81         q.mtx.Lock()
82         defer q.mtx.Unlock()
83         return q.changeState(uuid, arvados.ContainerStateQueued, arvados.ContainerStateLocked)
84 }
85
86 func (q *Queue) Unlock(uuid string) error {
87         q.mtx.Lock()
88         defer q.mtx.Unlock()
89         return q.changeState(uuid, arvados.ContainerStateLocked, arvados.ContainerStateQueued)
90 }
91
92 func (q *Queue) Cancel(uuid string) error {
93         q.mtx.Lock()
94         defer q.mtx.Unlock()
95         return q.changeState(uuid, q.entries[uuid].Container.State, arvados.ContainerStateCancelled)
96 }
97
98 func (q *Queue) Subscribe() <-chan struct{} {
99         q.mtx.Lock()
100         defer q.mtx.Unlock()
101         if q.subscribers == nil {
102                 q.subscribers = map[<-chan struct{}]chan struct{}{}
103         }
104         ch := make(chan struct{}, 1)
105         q.subscribers[ch] = ch
106         return ch
107 }
108
109 func (q *Queue) Unsubscribe(ch <-chan struct{}) {
110         q.mtx.Lock()
111         defer q.mtx.Unlock()
112         delete(q.subscribers, ch)
113 }
114
115 // caller must have lock.
116 func (q *Queue) notify() {
117         for _, ch := range q.subscribers {
118                 select {
119                 case ch <- struct{}{}:
120                 default:
121                 }
122         }
123 }
124
125 // caller must have lock.
126 func (q *Queue) changeState(uuid string, from, to arvados.ContainerState) error {
127         ent := q.entries[uuid]
128         q.stateChanges = append(q.stateChanges, QueueStateChange{uuid, from, to})
129         if ent.Container.State != from {
130                 return fmt.Errorf("changeState failed: state=%q", ent.Container.State)
131         }
132         ent.Container.State = to
133         q.entries[uuid] = ent
134         for i, ctr := range q.Containers {
135                 if ctr.UUID == uuid {
136                         q.Containers[i].State = to
137                         break
138                 }
139         }
140         q.notify()
141         return nil
142 }
143
144 // Update rebuilds the current entries from the Containers slice.
145 func (q *Queue) Update() error {
146         q.mtx.Lock()
147         defer q.mtx.Unlock()
148         updTime := time.Now()
149         upd := map[string]container.QueueEnt{}
150         for _, ctr := range q.Containers {
151                 _, exists := q.entries[ctr.UUID]
152                 if !exists && (ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled) {
153                         continue
154                 }
155                 if ent, ok := upd[ctr.UUID]; ok {
156                         ent.Container = ctr
157                         upd[ctr.UUID] = ent
158                 } else {
159                         it, _ := q.ChooseType(&ctr)
160                         upd[ctr.UUID] = container.QueueEnt{
161                                 Container:    ctr,
162                                 InstanceType: it,
163                                 FirstSeenAt:  time.Now(),
164                         }
165                 }
166         }
167         q.entries = upd
168         q.updTime = updTime
169         q.notify()
170         return nil
171 }
172
173 // Notify adds/updates an entry in the Containers slice.  This
174 // simulates the effect of an API update from someone other than the
175 // dispatcher -- e.g., crunch-run updating state to "Complete" when a
176 // container exits.
177 //
178 // The resulting changes are not exposed through Get() or Entries()
179 // until the next call to Update().
180 //
181 // Return value is true unless the update is rejected (invalid state
182 // transition).
183 func (q *Queue) Notify(upd arvados.Container) bool {
184         q.mtx.Lock()
185         defer q.mtx.Unlock()
186         for i, ctr := range q.Containers {
187                 if ctr.UUID == upd.UUID {
188                         if allowContainerUpdate[ctr.State][upd.State] {
189                                 q.Containers[i] = upd
190                                 return true
191                         }
192                         if q.Logger != nil {
193                                 q.Logger.WithField("ContainerUUID", ctr.UUID).Infof("test.Queue rejected update from %s to %s", ctr.State, upd.State)
194                         }
195                         return false
196                 }
197         }
198         q.Containers = append(q.Containers, upd)
199         return true
200 }
201
202 var allowContainerUpdate = map[arvados.ContainerState]map[arvados.ContainerState]bool{
203         arvados.ContainerStateQueued: {
204                 arvados.ContainerStateQueued:    true,
205                 arvados.ContainerStateLocked:    true,
206                 arvados.ContainerStateCancelled: true,
207         },
208         arvados.ContainerStateLocked: {
209                 arvados.ContainerStateQueued:    true,
210                 arvados.ContainerStateLocked:    true,
211                 arvados.ContainerStateRunning:   true,
212                 arvados.ContainerStateCancelled: true,
213         },
214         arvados.ContainerStateRunning: {
215                 arvados.ContainerStateRunning:   true,
216                 arvados.ContainerStateCancelled: true,
217                 arvados.ContainerStateComplete:  true,
218         },
219 }