Merge branch '16723-kill-vs-requeue'
[arvados.git] / lib / dispatchcloud / test / queue.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package test
6
7 import (
8         "fmt"
9         "sync"
10         "time"
11
12         "git.arvados.org/arvados.git/lib/dispatchcloud/container"
13         "git.arvados.org/arvados.git/sdk/go/arvados"
14         "github.com/sirupsen/logrus"
15 )
16
17 // Queue is a test stub for container.Queue. The caller specifies the
18 // initial queue state.
19 type Queue struct {
20         // Containers represent the API server database contents.
21         Containers []arvados.Container
22
23         // ChooseType will be called for each entry in Containers. It
24         // must not be nil.
25         ChooseType func(*arvados.Container) (arvados.InstanceType, error)
26
27         Logger logrus.FieldLogger
28
29         entries     map[string]container.QueueEnt
30         updTime     time.Time
31         subscribers map[<-chan struct{}]chan struct{}
32
33         mtx sync.Mutex
34 }
35
36 // Entries returns the containers that were queued when Update was
37 // last called.
38 func (q *Queue) Entries() (map[string]container.QueueEnt, time.Time) {
39         q.mtx.Lock()
40         defer q.mtx.Unlock()
41         updTime := q.updTime
42         r := map[string]container.QueueEnt{}
43         for uuid, ent := range q.entries {
44                 r[uuid] = ent
45         }
46         return r, updTime
47 }
48
49 // Get returns the container from the cached queue, i.e., as it was
50 // when Update was last called -- just like a container.Queue does. If
51 // the state has been changed (via Lock, Unlock, or Cancel) since the
52 // last Update, the updated state is returned.
53 func (q *Queue) Get(uuid string) (arvados.Container, bool) {
54         q.mtx.Lock()
55         defer q.mtx.Unlock()
56         ent, ok := q.entries[uuid]
57         return ent.Container, ok
58 }
59
60 func (q *Queue) Forget(uuid string) {
61         q.mtx.Lock()
62         defer q.mtx.Unlock()
63         delete(q.entries, uuid)
64 }
65
66 func (q *Queue) Lock(uuid string) error {
67         q.mtx.Lock()
68         defer q.mtx.Unlock()
69         return q.changeState(uuid, arvados.ContainerStateQueued, arvados.ContainerStateLocked)
70 }
71
72 func (q *Queue) Unlock(uuid string) error {
73         q.mtx.Lock()
74         defer q.mtx.Unlock()
75         return q.changeState(uuid, arvados.ContainerStateLocked, arvados.ContainerStateQueued)
76 }
77
78 func (q *Queue) Cancel(uuid string) error {
79         q.mtx.Lock()
80         defer q.mtx.Unlock()
81         return q.changeState(uuid, q.entries[uuid].Container.State, arvados.ContainerStateCancelled)
82 }
83
84 func (q *Queue) Subscribe() <-chan struct{} {
85         q.mtx.Lock()
86         defer q.mtx.Unlock()
87         if q.subscribers == nil {
88                 q.subscribers = map[<-chan struct{}]chan struct{}{}
89         }
90         ch := make(chan struct{}, 1)
91         q.subscribers[ch] = ch
92         return ch
93 }
94
95 func (q *Queue) Unsubscribe(ch <-chan struct{}) {
96         q.mtx.Lock()
97         defer q.mtx.Unlock()
98         delete(q.subscribers, ch)
99 }
100
101 // caller must have lock.
102 func (q *Queue) notify() {
103         for _, ch := range q.subscribers {
104                 select {
105                 case ch <- struct{}{}:
106                 default:
107                 }
108         }
109 }
110
111 // caller must have lock.
112 func (q *Queue) changeState(uuid string, from, to arvados.ContainerState) error {
113         ent := q.entries[uuid]
114         if ent.Container.State != from {
115                 return fmt.Errorf("changeState failed: state=%q", ent.Container.State)
116         }
117         ent.Container.State = to
118         q.entries[uuid] = ent
119         for i, ctr := range q.Containers {
120                 if ctr.UUID == uuid {
121                         q.Containers[i].State = to
122                         break
123                 }
124         }
125         q.notify()
126         return nil
127 }
128
129 // Update rebuilds the current entries from the Containers slice.
130 func (q *Queue) Update() error {
131         q.mtx.Lock()
132         defer q.mtx.Unlock()
133         updTime := time.Now()
134         upd := map[string]container.QueueEnt{}
135         for _, ctr := range q.Containers {
136                 _, exists := q.entries[ctr.UUID]
137                 if !exists && (ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled) {
138                         continue
139                 }
140                 if ent, ok := upd[ctr.UUID]; ok {
141                         ent.Container = ctr
142                         upd[ctr.UUID] = ent
143                 } else {
144                         it, _ := q.ChooseType(&ctr)
145                         upd[ctr.UUID] = container.QueueEnt{
146                                 Container:    ctr,
147                                 InstanceType: it,
148                         }
149                 }
150         }
151         q.entries = upd
152         q.updTime = updTime
153         q.notify()
154         return nil
155 }
156
157 // Notify adds/updates an entry in the Containers slice.  This
158 // simulates the effect of an API update from someone other than the
159 // dispatcher -- e.g., crunch-run updating state to "Complete" when a
160 // container exits.
161 //
162 // The resulting changes are not exposed through Get() or Entries()
163 // until the next call to Update().
164 //
165 // Return value is true unless the update is rejected (invalid state
166 // transition).
167 func (q *Queue) Notify(upd arvados.Container) bool {
168         q.mtx.Lock()
169         defer q.mtx.Unlock()
170         for i, ctr := range q.Containers {
171                 if ctr.UUID == upd.UUID {
172                         if allowContainerUpdate[ctr.State][upd.State] {
173                                 q.Containers[i] = upd
174                                 return true
175                         } else {
176                                 if q.Logger != nil {
177                                         q.Logger.WithField("ContainerUUID", ctr.UUID).Infof("test.Queue rejected update from %s to %s", ctr.State, upd.State)
178                                 }
179                                 return false
180                         }
181                 }
182         }
183         q.Containers = append(q.Containers, upd)
184         return true
185 }
186
187 var allowContainerUpdate = map[arvados.ContainerState]map[arvados.ContainerState]bool{
188         arvados.ContainerStateQueued: {
189                 arvados.ContainerStateQueued:    true,
190                 arvados.ContainerStateLocked:    true,
191                 arvados.ContainerStateCancelled: true,
192         },
193         arvados.ContainerStateLocked: {
194                 arvados.ContainerStateQueued:    true,
195                 arvados.ContainerStateLocked:    true,
196                 arvados.ContainerStateRunning:   true,
197                 arvados.ContainerStateCancelled: true,
198         },
199         arvados.ContainerStateRunning: {
200                 arvados.ContainerStateRunning:   true,
201                 arvados.ContainerStateCancelled: true,
202                 arvados.ContainerStateComplete:  true,
203         },
204 }